From 84afc0b2ee230b4980d37e5cdf54797ed4575822 Mon Sep 17 00:00:00 2001 From: Josako Date: Tue, 2 Sep 2025 10:25:17 +0200 Subject: [PATCH] - Debugging of redis setup issues - Debugging of celery startup - Moved flower to a standard image iso own build --- .../interaction/specialist_services.py | 2 + common/utils/cache/regions.py | 6 +- common/utils/celery_utils.py | 84 +++++++++++++------ docker/build_and_push_eveai.sh | 6 +- docker/compose_dev.yaml | 47 +++++++---- eveai_chat_workers/tasks.py | 1 + scripts/start.sh | 1 + 7 files changed, 99 insertions(+), 48 deletions(-) diff --git a/common/services/interaction/specialist_services.py b/common/services/interaction/specialist_services.py index fda1f30..b82640f 100644 --- a/common/services/interaction/specialist_services.py +++ b/common/services/interaction/specialist_services.py @@ -19,6 +19,7 @@ class SpecialistServices: @staticmethod def execute_specialist(tenant_id, specialist_id, specialist_arguments, session_id, user_timezone) -> Dict[str, Any]: + current_app.logger.debug(f"Before sending task for {specialist_id} with arguments {specialist_arguments}") task = current_celery.send_task( 'execute_specialist', args=[tenant_id, @@ -29,6 +30,7 @@ class SpecialistServices: ], queue='llm_interactions' ) + current_app.logger.debug(f"Task sent for {specialist_id}, task ID: {task.id}") return { 'task_id': task.id, diff --git a/common/utils/cache/regions.py b/common/utils/cache/regions.py index 52a05e1..e2a9724 100644 --- a/common/utils/cache/regions.py +++ b/common/utils/cache/regions.py @@ -17,7 +17,7 @@ def get_redis_config(app): config = { 'host': redis_uri.hostname, 'port': int(redis_uri.port or 6379), - 'db': 4, # Keep this for later use + 'db': 0, 'redis_expiration_time': 3600, 'distributed_lock': True, 'thread_local_lock': False, @@ -75,7 +75,7 @@ def create_cache_regions(app): # Region for model-related caching (ModelVariables etc) model_region = make_region(name='eveai_model').configure( 'dogpile.cache.redis', - arguments={**redis_config, 'db': 6}, + arguments=redis_config, replace_existing_backend=True ) regions['eveai_model'] = model_region @@ -83,7 +83,7 @@ def create_cache_regions(app): # Region for eveai_chat_workers components (Specialists, Retrievers, ...) eveai_chat_workers_region = make_region(name='eveai_chat_workers').configure( 'dogpile.cache.redis', - arguments=redis_config, # arguments={**redis_config, 'db': 4}, # Different DB + arguments=redis_config, replace_existing_backend=True ) regions['eveai_chat_workers'] = eveai_chat_workers_region diff --git a/common/utils/celery_utils.py b/common/utils/celery_utils.py index d1bfa32..3a14fc2 100644 --- a/common/utils/celery_utils.py +++ b/common/utils/celery_utils.py @@ -1,3 +1,6 @@ +import atexit +import os +import ssl import tempfile from celery import Celery @@ -6,66 +9,94 @@ from werkzeug.local import LocalProxy from redbeat import RedBeatScheduler celery_app = Celery() - +_tmp_paths = [] def _create_ssl_cert_file(cert_data: str) -> str: """Create temporary certificate file for Celery SSL""" if not cert_data: return None - with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') as cert_file: cert_file.write(cert_data) - return cert_file.name + path = cert_file.name + _tmp_paths.append(path) # track for cleanup + return path + +def _cleanup_tmp(): + for p in _tmp_paths: + try: + os.remove(p) + except Exception: + pass + +atexit.register(_cleanup_tmp) def init_celery(celery, app, is_beat=False): celery_app.main = app.name celery_config = { - 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), + 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), + # connection pools + # 'broker_pool_limit': app.config.get('CELERY_BROKER_POOL_LIMIT', 10), } - # Add broker transport options for SSL and connection pooling - broker_transport_options = { - 'master_name': None, - 'max_connections': 20, - 'retry_on_timeout': True, - 'socket_connect_timeout': 5, - 'socket_timeout': 5, - } + # Transport options (timeouts, max_connections for Redis transport) + # broker_transport_options = { + # 'master_name': None, # only relevant for Sentinel; otherwise harmless + # 'max_connections': 20, + # 'retry_on_timeout': True, + # 'socket_connect_timeout': 5, + # 'socket_timeout': 5, + # } + # celery_config['broker_transport_options'] = broker_transport_options + # + # # Backend transport options (Redis backend accepts similar timeouts) + # result_backend_transport_options = { + # 'retry_on_timeout': True, + # 'socket_connect_timeout': 5, + # 'socket_timeout': 5, + # # max_connections may be supported on newer Celery/redis backends; harmless if ignored + # 'max_connections': 20, + # } + # celery_config['result_backend_transport_options'] = result_backend_transport_options + # TLS (only when cert is provided or your URLs are rediss://) cert_data = app.config.get('REDIS_CERT_DATA') + ssl_opts = None if cert_data: try: - ssl_cert_file = _create_ssl_cert_file(cert_data) - if ssl_cert_file: - broker_transport_options.update({ - 'ssl_cert_reqs': 'required', - 'ssl_ca_certs': ssl_cert_file, - 'ssl_check_hostname': True, - }) - app.logger.info("SSL configured for Celery Redis connection") + ca_path = _create_ssl_cert_file(cert_data) + if ca_path: + ssl_opts = { + 'ssl_cert_reqs': ssl.CERT_REQUIRED, # <— constant, not string + 'ssl_ca_certs': ca_path, + # 'ssl_check_hostname': True, # kombu/redis doesn’t consistently honor this; CERT_REQUIRED is the key + } + app.logger.info("SSL configured for Celery Redis connection (CA provided)") except Exception as e: app.logger.error(f"Failed to configure SSL for Celery: {e}") - celery_config['broker_transport_options'] = broker_transport_options - celery_config['result_backend_transport_options'] = broker_transport_options + if ssl_opts is None: + ssl_opts = {'ssl_cert_reqs': ssl.CERT_REQUIRED} + celery_config['broker_use_ssl'] = ssl_opts + # Redis result backend needs its own key: + celery_config['redis_backend_use_ssl'] = ssl_opts + # Beat/RedBeat if is_beat: - # Add configurations specific to Beat scheduler celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler' celery_config['redbeat_lock_key'] = 'redbeat::lock' - celery_config['beat_max_loop_interval'] = 10 # Adjust as needed + celery_config['beat_max_loop_interval'] = 10 celery_app.conf.update(**celery_config) - # Task queues for workers only + # Queues for workers (note: Redis ignores routing_key and priority features like RabbitMQ) if not is_beat: celery_app.conf.task_queues = ( Queue('default', routing_key='task.#'), @@ -98,6 +129,7 @@ def init_celery(celery, app, is_beat=False): def make_celery(app_name, config): + # keep API but return the single instance return celery_app @@ -105,4 +137,4 @@ def _get_current_celery(): return celery_app -current_celery = LocalProxy(_get_current_celery) +current_celery = LocalProxy(_get_current_celery) \ No newline at end of file diff --git a/docker/build_and_push_eveai.sh b/docker/build_and_push_eveai.sh index 9cc4a15..e0c708c 100755 --- a/docker/build_and_push_eveai.sh +++ b/docker/build_and_push_eveai.sh @@ -209,7 +209,7 @@ if [ $# -eq 0 ]; then SERVICES=() while IFS= read -r line; do SERVICES+=("$line") - done < <(yq e '.services | keys | .[]' compose_dev.yaml | grep -E '^(nginx|eveai_|flower|prometheus|grafana)') + done < <(yq e '.services | keys | .[]' compose_dev.yaml | grep -E '^(nginx|eveai_|prometheus|grafana)') else SERVICES=("$@") fi @@ -236,14 +236,14 @@ for SERVICE in "${SERVICES[@]}"; do if [[ "$SERVICE" == "nginx" ]]; then ./copy_specialist_svgs.sh ../config ../nginx/static/assets 2>/dev/null || echo "Warning: copy_specialist_svgs.sh not found or failed" fi - if [[ "$SERVICE" == "nginx" || "$SERVICE" == eveai_* || "$SERVICE" == "flower" || "$SERVICE" == "prometheus" || "$SERVICE" == "grafana" ]]; then + if [[ "$SERVICE" == "nginx" || "$SERVICE" == eveai_* || "$SERVICE" == "prometheus" || "$SERVICE" == "grafana" ]]; then if process_service "$SERVICE"; then echo "✅ Successfully processed $SERVICE" else echo "❌ Failed to process $SERVICE" fi else - echo "⏭️ Skipping $SERVICE as it's not nginx, flower, prometheus, grafana or doesn't start with eveai_" + echo "⏭️ Skipping $SERVICE as it's not nginx, prometheus, grafana or doesn't start with eveai_" fi done diff --git a/docker/compose_dev.yaml b/docker/compose_dev.yaml index 69d4d94..13e1c9c 100644 --- a/docker/compose_dev.yaml +++ b/docker/compose_dev.yaml @@ -74,7 +74,7 @@ services: WORKERS: 1 # Dev: lagere concurrency WORKER_CLASS: gevent WORKER_CONN: 100 - LOGLEVEL: info # Lowercase voor gunicorn + LOGLEVEL: debug # Lowercase voor gunicorn MAX_REQUESTS: 1000 MAX_REQUESTS_JITTER: 100 volumes: @@ -114,9 +114,10 @@ services: COMPONENT_NAME: eveai_workers ROLE: worker CELERY_CONCURRENCY: 1 # Dev: lagere concurrency - CELERY_LOGLEVEL: INFO # Uppercase voor celery + CELERY_LOGLEVEL: DEBUG # Uppercase voor celery CELERY_MAX_TASKS_PER_CHILD: 1000 CELERY_PREFETCH: 1 + CELERY_QUEUE_NAME: embeddings volumes: - ../eveai_workers:/app/eveai_workers - ../common:/app/common @@ -151,7 +152,7 @@ services: WORKERS: 1 # Dev: lagere concurrency WORKER_CLASS: gevent WORKER_CONN: 100 - LOGLEVEL: info # Lowercase voor gunicorn + LOGLEVEL: debug # Lowercase voor gunicorn MAX_REQUESTS: 1000 MAX_REQUESTS_JITTER: 100 volumes: @@ -189,9 +190,10 @@ services: COMPONENT_NAME: eveai_chat_workers ROLE: worker CELERY_CONCURRENCY: 8 # Dev: lagere concurrency - CELERY_LOGLEVEL: INFO # Uppercase voor celery + CELERY_LOGLEVEL: DEBUG # Uppercase voor celery CELERY_MAX_TASKS_PER_CHILD: 1000 CELERY_PREFETCH: 1 + CELERY_QUEUE_NAME: llm_interactions volumes: - ../eveai_chat_workers:/app/eveai_chat_workers - ../common:/app/common @@ -224,7 +226,7 @@ services: WORKERS: 1 # Dev: lagere concurrency WORKER_CLASS: gevent WORKER_CONN: 100 - LOGLEVEL: info # Lowercase voor gunicorn + LOGLEVEL: debug # Lowercase voor gunicorn MAX_REQUESTS: 1000 MAX_REQUESTS_JITTER: 100 volumes: @@ -285,9 +287,10 @@ services: COMPONENT_NAME: eveai_entitlements ROLE: worker CELERY_CONCURRENCY: 1 # Dev: lagere concurrency - CELERY_LOGLEVEL: INFO # Uppercase voor celery + CELERY_LOGLEVEL: DEBUG # Uppercase voor celery CELERY_MAX_TASKS_PER_CHILD: 1000 CELERY_PREFETCH: 1 + CELERY_QUEUE_NAME: entitlements volumes: - ../eveai_entitlements:/app/eveai_entitlements - ../common:/app/common @@ -341,21 +344,33 @@ services: networks: - eveai-dev-network +# flower: +# image: ${REGISTRY_PREFIX:-}josakola/flower:latest +# build: +# context: .. +# dockerfile: ./docker/flower/Dockerfile +# environment: +# <<: *common-variables +# volumes: +# - ../scripts:/app/scripts +# ports: +# - "3007:5555" # Dev Flower volgens port schema +# depends_on: +# - redis +# networks: +# - eveai-dev-network + flower: - image: ${REGISTRY_PREFIX:-}josakola/flower:latest - build: - context: .. - dockerfile: ./docker/flower/Dockerfile + image: mher/flower:latest environment: - <<: *common-variables - volumes: - - ../scripts:/app/scripts + - CELERY_BROKER_URL=redis://redis:6379/0 + - FLOWER_BASIC_AUTH=Felucia:Jungles + - FLOWER_URL_PREFIX=/flower + - FLOWER_PORT=8080 ports: - - "3007:5555" # Dev Flower volgens port schema + - "3007:8080" depends_on: - redis - networks: - - eveai-dev-network minio: image: minio/minio diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index 0f41a90..380cee8 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -233,6 +233,7 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict 'interaction_id': int - Created interaction ID } """ + current_app.logger.debug(f'execute_specialist: Processing request for tenant {tenant_id} using specialist {specialist_id}') task_id = self.request.id ept = ExecutionProgressTracker() ept.send_update(task_id, "EveAI Specialist Started", {}) diff --git a/scripts/start.sh b/scripts/start.sh index 3381f71..3299b54 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -25,6 +25,7 @@ case "$ROLE" in echo "[start] role=worker component=$COMPONENT_NAME" CONCURRENCY="${CELERY_CONCURRENCY:-2}" exec celery -A scripts.run worker \ + -Q ${CELERY_QUEUE_NAME} \ --loglevel="${CELERY_LOGLEVEL:-INFO}" \ --concurrency="${CONCURRENCY}" \ --max-tasks-per-child="${CELERY_MAX_TASKS_PER_CHILD:-1000}" \