- Debugging of redis setup issues

- Debugging of celery startup
- Moved flower to a standard image iso own build
This commit is contained in:
Josako
2025-09-02 10:25:17 +02:00
parent 593dd438aa
commit 84afc0b2ee
7 changed files with 99 additions and 48 deletions

View File

@@ -19,6 +19,7 @@ class SpecialistServices:
@staticmethod @staticmethod
def execute_specialist(tenant_id, specialist_id, specialist_arguments, session_id, user_timezone) -> Dict[str, Any]: def execute_specialist(tenant_id, specialist_id, specialist_arguments, session_id, user_timezone) -> Dict[str, Any]:
current_app.logger.debug(f"Before sending task for {specialist_id} with arguments {specialist_arguments}")
task = current_celery.send_task( task = current_celery.send_task(
'execute_specialist', 'execute_specialist',
args=[tenant_id, args=[tenant_id,
@@ -29,6 +30,7 @@ class SpecialistServices:
], ],
queue='llm_interactions' queue='llm_interactions'
) )
current_app.logger.debug(f"Task sent for {specialist_id}, task ID: {task.id}")
return { return {
'task_id': task.id, 'task_id': task.id,

View File

@@ -17,7 +17,7 @@ def get_redis_config(app):
config = { config = {
'host': redis_uri.hostname, 'host': redis_uri.hostname,
'port': int(redis_uri.port or 6379), 'port': int(redis_uri.port or 6379),
'db': 4, # Keep this for later use 'db': 0,
'redis_expiration_time': 3600, 'redis_expiration_time': 3600,
'distributed_lock': True, 'distributed_lock': True,
'thread_local_lock': False, 'thread_local_lock': False,
@@ -75,7 +75,7 @@ def create_cache_regions(app):
# Region for model-related caching (ModelVariables etc) # Region for model-related caching (ModelVariables etc)
model_region = make_region(name='eveai_model').configure( model_region = make_region(name='eveai_model').configure(
'dogpile.cache.redis', 'dogpile.cache.redis',
arguments={**redis_config, 'db': 6}, arguments=redis_config,
replace_existing_backend=True replace_existing_backend=True
) )
regions['eveai_model'] = model_region regions['eveai_model'] = model_region
@@ -83,7 +83,7 @@ def create_cache_regions(app):
# Region for eveai_chat_workers components (Specialists, Retrievers, ...) # Region for eveai_chat_workers components (Specialists, Retrievers, ...)
eveai_chat_workers_region = make_region(name='eveai_chat_workers').configure( eveai_chat_workers_region = make_region(name='eveai_chat_workers').configure(
'dogpile.cache.redis', 'dogpile.cache.redis',
arguments=redis_config, # arguments={**redis_config, 'db': 4}, # Different DB arguments=redis_config,
replace_existing_backend=True replace_existing_backend=True
) )
regions['eveai_chat_workers'] = eveai_chat_workers_region regions['eveai_chat_workers'] = eveai_chat_workers_region

View File

@@ -1,3 +1,6 @@
import atexit
import os
import ssl
import tempfile import tempfile
from celery import Celery from celery import Celery
@@ -6,66 +9,94 @@ from werkzeug.local import LocalProxy
from redbeat import RedBeatScheduler from redbeat import RedBeatScheduler
celery_app = Celery() celery_app = Celery()
_tmp_paths = []
def _create_ssl_cert_file(cert_data: str) -> str: def _create_ssl_cert_file(cert_data: str) -> str:
"""Create temporary certificate file for Celery SSL""" """Create temporary certificate file for Celery SSL"""
if not cert_data: if not cert_data:
return None return None
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') as cert_file: with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') as cert_file:
cert_file.write(cert_data) cert_file.write(cert_data)
return cert_file.name path = cert_file.name
_tmp_paths.append(path) # track for cleanup
return path
def _cleanup_tmp():
for p in _tmp_paths:
try:
os.remove(p)
except Exception:
pass
atexit.register(_cleanup_tmp)
def init_celery(celery, app, is_beat=False): def init_celery(celery, app, is_beat=False):
celery_app.main = app.name celery_app.main = app.name
celery_config = { celery_config = {
'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'),
'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'),
'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']),
'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'),
'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True),
# connection pools
# 'broker_pool_limit': app.config.get('CELERY_BROKER_POOL_LIMIT', 10),
} }
# Add broker transport options for SSL and connection pooling # Transport options (timeouts, max_connections for Redis transport)
broker_transport_options = { # broker_transport_options = {
'master_name': None, # 'master_name': None, # only relevant for Sentinel; otherwise harmless
'max_connections': 20, # 'max_connections': 20,
'retry_on_timeout': True, # 'retry_on_timeout': True,
'socket_connect_timeout': 5, # 'socket_connect_timeout': 5,
'socket_timeout': 5, # 'socket_timeout': 5,
} # }
# celery_config['broker_transport_options'] = broker_transport_options
#
# # Backend transport options (Redis backend accepts similar timeouts)
# result_backend_transport_options = {
# 'retry_on_timeout': True,
# 'socket_connect_timeout': 5,
# 'socket_timeout': 5,
# # max_connections may be supported on newer Celery/redis backends; harmless if ignored
# 'max_connections': 20,
# }
# celery_config['result_backend_transport_options'] = result_backend_transport_options
# TLS (only when cert is provided or your URLs are rediss://)
cert_data = app.config.get('REDIS_CERT_DATA') cert_data = app.config.get('REDIS_CERT_DATA')
ssl_opts = None
if cert_data: if cert_data:
try: try:
ssl_cert_file = _create_ssl_cert_file(cert_data) ca_path = _create_ssl_cert_file(cert_data)
if ssl_cert_file: if ca_path:
broker_transport_options.update({ ssl_opts = {
'ssl_cert_reqs': 'required', 'ssl_cert_reqs': ssl.CERT_REQUIRED, # <— constant, not string
'ssl_ca_certs': ssl_cert_file, 'ssl_ca_certs': ca_path,
'ssl_check_hostname': True, # 'ssl_check_hostname': True, # kombu/redis doesnt consistently honor this; CERT_REQUIRED is the key
}) }
app.logger.info("SSL configured for Celery Redis connection") app.logger.info("SSL configured for Celery Redis connection (CA provided)")
except Exception as e: except Exception as e:
app.logger.error(f"Failed to configure SSL for Celery: {e}") app.logger.error(f"Failed to configure SSL for Celery: {e}")
celery_config['broker_transport_options'] = broker_transport_options if ssl_opts is None:
celery_config['result_backend_transport_options'] = broker_transport_options ssl_opts = {'ssl_cert_reqs': ssl.CERT_REQUIRED}
celery_config['broker_use_ssl'] = ssl_opts
# Redis result backend needs its own key:
celery_config['redis_backend_use_ssl'] = ssl_opts
# Beat/RedBeat
if is_beat: if is_beat:
# Add configurations specific to Beat scheduler
celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler' celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler'
celery_config['redbeat_lock_key'] = 'redbeat::lock' celery_config['redbeat_lock_key'] = 'redbeat::lock'
celery_config['beat_max_loop_interval'] = 10 # Adjust as needed celery_config['beat_max_loop_interval'] = 10
celery_app.conf.update(**celery_config) celery_app.conf.update(**celery_config)
# Task queues for workers only # Queues for workers (note: Redis ignores routing_key and priority features like RabbitMQ)
if not is_beat: if not is_beat:
celery_app.conf.task_queues = ( celery_app.conf.task_queues = (
Queue('default', routing_key='task.#'), Queue('default', routing_key='task.#'),
@@ -98,6 +129,7 @@ def init_celery(celery, app, is_beat=False):
def make_celery(app_name, config): def make_celery(app_name, config):
# keep API but return the single instance
return celery_app return celery_app
@@ -105,4 +137,4 @@ def _get_current_celery():
return celery_app return celery_app
current_celery = LocalProxy(_get_current_celery) current_celery = LocalProxy(_get_current_celery)

View File

@@ -209,7 +209,7 @@ if [ $# -eq 0 ]; then
SERVICES=() SERVICES=()
while IFS= read -r line; do while IFS= read -r line; do
SERVICES+=("$line") SERVICES+=("$line")
done < <(yq e '.services | keys | .[]' compose_dev.yaml | grep -E '^(nginx|eveai_|flower|prometheus|grafana)') done < <(yq e '.services | keys | .[]' compose_dev.yaml | grep -E '^(nginx|eveai_|prometheus|grafana)')
else else
SERVICES=("$@") SERVICES=("$@")
fi fi
@@ -236,14 +236,14 @@ for SERVICE in "${SERVICES[@]}"; do
if [[ "$SERVICE" == "nginx" ]]; then if [[ "$SERVICE" == "nginx" ]]; then
./copy_specialist_svgs.sh ../config ../nginx/static/assets 2>/dev/null || echo "Warning: copy_specialist_svgs.sh not found or failed" ./copy_specialist_svgs.sh ../config ../nginx/static/assets 2>/dev/null || echo "Warning: copy_specialist_svgs.sh not found or failed"
fi fi
if [[ "$SERVICE" == "nginx" || "$SERVICE" == eveai_* || "$SERVICE" == "flower" || "$SERVICE" == "prometheus" || "$SERVICE" == "grafana" ]]; then if [[ "$SERVICE" == "nginx" || "$SERVICE" == eveai_* || "$SERVICE" == "prometheus" || "$SERVICE" == "grafana" ]]; then
if process_service "$SERVICE"; then if process_service "$SERVICE"; then
echo "✅ Successfully processed $SERVICE" echo "✅ Successfully processed $SERVICE"
else else
echo "❌ Failed to process $SERVICE" echo "❌ Failed to process $SERVICE"
fi fi
else else
echo "⏭️ Skipping $SERVICE as it's not nginx, flower, prometheus, grafana or doesn't start with eveai_" echo "⏭️ Skipping $SERVICE as it's not nginx, prometheus, grafana or doesn't start with eveai_"
fi fi
done done

View File

@@ -74,7 +74,7 @@ services:
WORKERS: 1 # Dev: lagere concurrency WORKERS: 1 # Dev: lagere concurrency
WORKER_CLASS: gevent WORKER_CLASS: gevent
WORKER_CONN: 100 WORKER_CONN: 100
LOGLEVEL: info # Lowercase voor gunicorn LOGLEVEL: debug # Lowercase voor gunicorn
MAX_REQUESTS: 1000 MAX_REQUESTS: 1000
MAX_REQUESTS_JITTER: 100 MAX_REQUESTS_JITTER: 100
volumes: volumes:
@@ -114,9 +114,10 @@ services:
COMPONENT_NAME: eveai_workers COMPONENT_NAME: eveai_workers
ROLE: worker ROLE: worker
CELERY_CONCURRENCY: 1 # Dev: lagere concurrency CELERY_CONCURRENCY: 1 # Dev: lagere concurrency
CELERY_LOGLEVEL: INFO # Uppercase voor celery CELERY_LOGLEVEL: DEBUG # Uppercase voor celery
CELERY_MAX_TASKS_PER_CHILD: 1000 CELERY_MAX_TASKS_PER_CHILD: 1000
CELERY_PREFETCH: 1 CELERY_PREFETCH: 1
CELERY_QUEUE_NAME: embeddings
volumes: volumes:
- ../eveai_workers:/app/eveai_workers - ../eveai_workers:/app/eveai_workers
- ../common:/app/common - ../common:/app/common
@@ -151,7 +152,7 @@ services:
WORKERS: 1 # Dev: lagere concurrency WORKERS: 1 # Dev: lagere concurrency
WORKER_CLASS: gevent WORKER_CLASS: gevent
WORKER_CONN: 100 WORKER_CONN: 100
LOGLEVEL: info # Lowercase voor gunicorn LOGLEVEL: debug # Lowercase voor gunicorn
MAX_REQUESTS: 1000 MAX_REQUESTS: 1000
MAX_REQUESTS_JITTER: 100 MAX_REQUESTS_JITTER: 100
volumes: volumes:
@@ -189,9 +190,10 @@ services:
COMPONENT_NAME: eveai_chat_workers COMPONENT_NAME: eveai_chat_workers
ROLE: worker ROLE: worker
CELERY_CONCURRENCY: 8 # Dev: lagere concurrency CELERY_CONCURRENCY: 8 # Dev: lagere concurrency
CELERY_LOGLEVEL: INFO # Uppercase voor celery CELERY_LOGLEVEL: DEBUG # Uppercase voor celery
CELERY_MAX_TASKS_PER_CHILD: 1000 CELERY_MAX_TASKS_PER_CHILD: 1000
CELERY_PREFETCH: 1 CELERY_PREFETCH: 1
CELERY_QUEUE_NAME: llm_interactions
volumes: volumes:
- ../eveai_chat_workers:/app/eveai_chat_workers - ../eveai_chat_workers:/app/eveai_chat_workers
- ../common:/app/common - ../common:/app/common
@@ -224,7 +226,7 @@ services:
WORKERS: 1 # Dev: lagere concurrency WORKERS: 1 # Dev: lagere concurrency
WORKER_CLASS: gevent WORKER_CLASS: gevent
WORKER_CONN: 100 WORKER_CONN: 100
LOGLEVEL: info # Lowercase voor gunicorn LOGLEVEL: debug # Lowercase voor gunicorn
MAX_REQUESTS: 1000 MAX_REQUESTS: 1000
MAX_REQUESTS_JITTER: 100 MAX_REQUESTS_JITTER: 100
volumes: volumes:
@@ -285,9 +287,10 @@ services:
COMPONENT_NAME: eveai_entitlements COMPONENT_NAME: eveai_entitlements
ROLE: worker ROLE: worker
CELERY_CONCURRENCY: 1 # Dev: lagere concurrency CELERY_CONCURRENCY: 1 # Dev: lagere concurrency
CELERY_LOGLEVEL: INFO # Uppercase voor celery CELERY_LOGLEVEL: DEBUG # Uppercase voor celery
CELERY_MAX_TASKS_PER_CHILD: 1000 CELERY_MAX_TASKS_PER_CHILD: 1000
CELERY_PREFETCH: 1 CELERY_PREFETCH: 1
CELERY_QUEUE_NAME: entitlements
volumes: volumes:
- ../eveai_entitlements:/app/eveai_entitlements - ../eveai_entitlements:/app/eveai_entitlements
- ../common:/app/common - ../common:/app/common
@@ -341,21 +344,33 @@ services:
networks: networks:
- eveai-dev-network - eveai-dev-network
# flower:
# image: ${REGISTRY_PREFIX:-}josakola/flower:latest
# build:
# context: ..
# dockerfile: ./docker/flower/Dockerfile
# environment:
# <<: *common-variables
# volumes:
# - ../scripts:/app/scripts
# ports:
# - "3007:5555" # Dev Flower volgens port schema
# depends_on:
# - redis
# networks:
# - eveai-dev-network
flower: flower:
image: ${REGISTRY_PREFIX:-}josakola/flower:latest image: mher/flower:latest
build:
context: ..
dockerfile: ./docker/flower/Dockerfile
environment: environment:
<<: *common-variables - CELERY_BROKER_URL=redis://redis:6379/0
volumes: - FLOWER_BASIC_AUTH=Felucia:Jungles
- ../scripts:/app/scripts - FLOWER_URL_PREFIX=/flower
- FLOWER_PORT=8080
ports: ports:
- "3007:5555" # Dev Flower volgens port schema - "3007:8080"
depends_on: depends_on:
- redis - redis
networks:
- eveai-dev-network
minio: minio:
image: minio/minio image: minio/minio

View File

@@ -233,6 +233,7 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict
'interaction_id': int - Created interaction ID 'interaction_id': int - Created interaction ID
} }
""" """
current_app.logger.debug(f'execute_specialist: Processing request for tenant {tenant_id} using specialist {specialist_id}')
task_id = self.request.id task_id = self.request.id
ept = ExecutionProgressTracker() ept = ExecutionProgressTracker()
ept.send_update(task_id, "EveAI Specialist Started", {}) ept.send_update(task_id, "EveAI Specialist Started", {})

View File

@@ -25,6 +25,7 @@ case "$ROLE" in
echo "[start] role=worker component=$COMPONENT_NAME" echo "[start] role=worker component=$COMPONENT_NAME"
CONCURRENCY="${CELERY_CONCURRENCY:-2}" CONCURRENCY="${CELERY_CONCURRENCY:-2}"
exec celery -A scripts.run worker \ exec celery -A scripts.run worker \
-Q ${CELERY_QUEUE_NAME} \
--loglevel="${CELERY_LOGLEVEL:-INFO}" \ --loglevel="${CELERY_LOGLEVEL:-INFO}" \
--concurrency="${CONCURRENCY}" \ --concurrency="${CONCURRENCY}" \
--max-tasks-per-child="${CELERY_MAX_TASKS_PER_CHILD:-1000}" \ --max-tasks-per-child="${CELERY_MAX_TASKS_PER_CHILD:-1000}" \