From 42cb1de0fd4587a785fefa1bb806c8d1d4c38924 Mon Sep 17 00:00:00 2001 From: Josako Date: Fri, 12 Sep 2025 10:18:43 +0200 Subject: [PATCH] - eveai_chat_client updated to retrieve static files from the correct (bunny.net) location when a STATIC_URL is defined. - Defined locations for crewai crew memory. This failed in k8s. - Redis connection for pub/sub in ExecutionProgressTracker adapted to conform to TLS-enabled connections --- common/utils/execution_progress.py | 77 ++++++++++------- common/utils/redis_pubsub_pool.py | 84 +++++++++++++++++++ config/agents/globals/RAG_AGENT/1.1.0.yaml | 2 +- config/config.py | 8 +- docker/eveai_workers/Dockerfile | 10 +-- docker/rebuild_chat_client.sh | 4 +- .../Production Setup/cluster-install.md | 66 +++++++++++++++ .../assets/vue-components/ChatMessage.vue | 20 ++++- .../assets/vue-components/ProgressTracker.vue | 17 +++- eveai_chat_client/templates/chat.html | 4 +- eveai_chat_client/templates/scripts.html | 24 ++++++ eveai_chat_client/views/chat_views.py | 11 ++- eveai_chat_workers/__init__.py | 7 ++ .../specialists/crewai_base_classes.py | 14 +++- .../eveai-chat-workers/deployment.yaml | 8 ++ 15 files changed, 306 insertions(+), 50 deletions(-) create mode 100644 common/utils/redis_pubsub_pool.py diff --git a/common/utils/execution_progress.py b/common/utils/execution_progress.py index 434a6b8..646f6ca 100644 --- a/common/utils/execution_progress.py +++ b/common/utils/execution_progress.py @@ -4,6 +4,7 @@ from typing import Generator from redis import Redis, RedisError import json from flask import current_app +import time class ExecutionProgressTracker: @@ -11,22 +12,33 @@ class ExecutionProgressTracker: def __init__(self): try: - redis_url = current_app.config['SPECIALIST_EXEC_PUBSUB'] - - self.redis = Redis.from_url(redis_url, socket_timeout=5) - # Test the connection - self.redis.ping() - + # Use shared pubsub pool (lazy connect; no eager ping) + from common.utils.redis_pubsub_pool import get_pubsub_client + self.redis = get_pubsub_client(current_app) self.expiry = 3600 # 1 hour expiry - except RedisError as e: - current_app.logger.error(f"Failed to connect to Redis: {str(e)}") - raise except Exception as e: - current_app.logger.error(f"Unexpected error during Redis initialization: {str(e)}") + current_app.logger.error(f"Error initializing ExecutionProgressTracker: {str(e)}") raise def _get_key(self, execution_id: str) -> str: - return f"specialist_execution:{execution_id}" + prefix = current_app.config.get('REDIS_PREFIXES', {}).get('pubsub_execution', 'pubsub:execution:') + return f"{prefix}{execution_id}" + + def _retry(self, op, attempts: int = 3, base_delay: float = 0.1): + """Retry wrapper for Redis operations with exponential backoff.""" + last_exc = None + for i in range(attempts): + try: + return op() + except RedisError as e: + last_exc = e + if i == attempts - 1: + break + delay = base_delay * (3 ** i) # 0.1, 0.3, 0.9 + current_app.logger.warning(f"Redis operation failed (attempt {i+1}/{attempts}): {e}. Retrying in {delay}s") + time.sleep(delay) + # Exhausted retries + raise last_exc def send_update(self, ctask_id: str, processing_type: str, data: dict): """Send an update about execution progress""" @@ -35,12 +47,6 @@ class ExecutionProgressTracker: f"{data}") key = self._get_key(ctask_id) - # First verify Redis is still connected - try: - self.redis.ping() - except RedisError: - current_app.logger.error("Lost Redis connection. Attempting to reconnect...") - self.__init__() # Reinitialize connection update = { 'processing_type': processing_type, @@ -50,7 +56,7 @@ class ExecutionProgressTracker: # Log initial state try: - orig_len = self.redis.llen(key) + orig_len = self._retry(lambda: self.redis.llen(key)) # Try to serialize the update and check the result try: @@ -60,13 +66,16 @@ class ExecutionProgressTracker: raise # Store update in list with pipeline for atomicity - with self.redis.pipeline() as pipe: - pipe.rpush(key, serialized_update) - pipe.publish(key, serialized_update) - pipe.expire(key, self.expiry) - results = pipe.execute() + def _pipeline_op(): + with self.redis.pipeline() as pipe: + pipe.rpush(key, serialized_update) + pipe.publish(key, serialized_update) + pipe.expire(key, self.expiry) + return pipe.execute() - new_len = self.redis.llen(key) + results = self._retry(_pipeline_op) + + new_len = self._retry(lambda: self.redis.llen(key)) if new_len <= orig_len: current_app.logger.error( @@ -83,13 +92,14 @@ class ExecutionProgressTracker: def get_updates(self, ctask_id: str) -> Generator[str, None, None]: key = self._get_key(ctask_id) pubsub = self.redis.pubsub() - pubsub.subscribe(key) + # Subscribe with retry + self._retry(lambda: pubsub.subscribe(key)) try: # First yield any existing updates - length = self.redis.llen(key) + length = self._retry(lambda: self.redis.llen(key)) if length > 0: - updates = self.redis.lrange(key, 0, -1) + updates = self._retry(lambda: self.redis.lrange(key, 0, -1)) for update in updates: update_data = json.loads(update.decode('utf-8')) # Use processing_type for the event @@ -98,7 +108,13 @@ class ExecutionProgressTracker: # Then listen for new updates while True: - message = pubsub.get_message(timeout=30) # message['type'] is Redis pub/sub type + try: + message = pubsub.get_message(timeout=30) # message['type'] is Redis pub/sub type + except RedisError as e: + current_app.logger.warning(f"Redis pubsub get_message error: {e}. Continuing...") + time.sleep(0.3) + continue + if message is None: yield ": keepalive\n\n" continue @@ -111,4 +127,7 @@ class ExecutionProgressTracker: if update_data['processing_type'] in ['Task Complete', 'Task Error']: break finally: - pubsub.unsubscribe() + try: + pubsub.unsubscribe() + except Exception: + pass diff --git a/common/utils/redis_pubsub_pool.py b/common/utils/redis_pubsub_pool.py new file mode 100644 index 0000000..5074c23 --- /dev/null +++ b/common/utils/redis_pubsub_pool.py @@ -0,0 +1,84 @@ +import ssl +from typing import Dict, Any + +import redis +from flask import Flask + + +def _build_pubsub_redis_config(app: Flask) -> Dict[str, Any]: + """Build Redis ConnectionPool config for the pubsub/EPT workload using app.config. + Does not modify cache or session pools. + """ + cfg = app.config + + config: Dict[str, Any] = { + 'host': cfg['REDIS_URL'], + 'port': cfg['REDIS_PORT'], + 'db': int(cfg.get('REDIS_SPECIALIST_EXEC_DB', '0')), + 'max_connections': int(cfg.get('REDIS_PUBSUB_MAX_CONNECTIONS', 200)), + 'retry_on_timeout': True, + 'socket_keepalive': True, + 'socket_keepalive_options': {}, + 'socket_timeout': float(cfg.get('REDIS_PUBSUB_SOCKET_TIMEOUT', 10.0)), + 'socket_connect_timeout': float(cfg.get('REDIS_PUBSUB_CONNECT_TIMEOUT', 3.0)), + } + + # Authentication if present + un = cfg.get('REDIS_USER') + pw = cfg.get('REDIS_PASS') + if un and pw: + config.update({'username': un, 'password': pw}) + + # TLS when configured + cert_path = cfg.get('REDIS_CA_CERT_PATH') + if cfg.get('REDIS_SCHEME') == 'rediss' and cert_path: + config.update({ + 'connection_class': redis.SSLConnection, + 'ssl_cert_reqs': ssl.CERT_REQUIRED, + 'ssl_check_hostname': cfg.get('REDIS_SSL_CHECK_HOSTNAME', True), + 'ssl_ca_certs': cert_path, + }) + + return config + + +def create_pubsub_pool(app: Flask) -> redis.ConnectionPool: + """Create and store the dedicated pubsub ConnectionPool in app.extensions.""" + if not hasattr(app, 'extensions'): + app.extensions = {} + + # Reuse existing if already created + pool = app.extensions.get('redis_pubsub_pool') + if pool is not None: + return pool + + config = _build_pubsub_redis_config(app) + pool = redis.ConnectionPool(**config) + app.extensions['redis_pubsub_pool'] = pool + + # Log a concise, non-sensitive summary + try: + summary = { + 'scheme': app.config.get('REDIS_SCHEME'), + 'host': app.config.get('REDIS_URL'), + 'port': app.config.get('REDIS_PORT'), + 'db': app.config.get('REDIS_SPECIALIST_EXEC_DB', '0'), + 'ssl_check_hostname': app.config.get('REDIS_SSL_CHECK_HOSTNAME'), + 'ca_present': bool(app.config.get('REDIS_CA_CERT_PATH')), + 'max_connections': app.config.get('REDIS_PUBSUB_MAX_CONNECTIONS'), + 'socket_timeout': app.config.get('REDIS_PUBSUB_SOCKET_TIMEOUT'), + 'socket_connect_timeout': app.config.get('REDIS_PUBSUB_CONNECT_TIMEOUT'), + } + app.logger.info(f"Initialized Redis pubsub pool: {summary}") + except Exception: + pass + + return pool + + +def get_pubsub_client(app: Flask) -> redis.Redis: + """Get a Redis client bound to the dedicated pubsub pool.""" + pool = app.extensions.get('redis_pubsub_pool') + if pool is None: + pool = create_pubsub_pool(app) + return redis.Redis(connection_pool=pool) diff --git a/config/agents/globals/RAG_AGENT/1.1.0.yaml b/config/agents/globals/RAG_AGENT/1.1.0.yaml index 1676cd0..d6cb7c1 100644 --- a/config/agents/globals/RAG_AGENT/1.1.0.yaml +++ b/config/agents/globals/RAG_AGENT/1.1.0.yaml @@ -15,7 +15,7 @@ backstory: > include a salutation or closing greeting in your answer. {custom_backstory} full_model_name: "mistral.mistral-medium-latest" -temperature: 0.5 +temperature: 0.4 metadata: author: "Josako" date_added: "2025-01-08" diff --git a/config/config.py b/config/config.py index ea646c4..5aabbe6 100644 --- a/config/config.py +++ b/config/config.py @@ -99,6 +99,12 @@ class Config(object): 'ssl_check_hostname': REDIS_SSL_CHECK_HOSTNAME, } + # PubSub/EPT specific configuration (dedicated pool) + REDIS_SPECIALIST_EXEC_DB = environ.get('REDIS_SPECIALIST_EXEC_DB', '0') + REDIS_PUBSUB_MAX_CONNECTIONS = int(environ.get('REDIS_PUBSUB_MAX_CONNECTIONS', '200')) + REDIS_PUBSUB_SOCKET_TIMEOUT = float(environ.get('REDIS_PUBSUB_SOCKET_TIMEOUT', '10')) + REDIS_PUBSUB_CONNECT_TIMEOUT = float(environ.get('REDIS_PUBSUB_CONNECT_TIMEOUT', '3')) + REDIS_PREFIXES = { 'celery_app': 'celery:app:', 'celery_chat': 'celery:chat:', @@ -115,7 +121,7 @@ class Config(object): CELERY_RESULT_BACKEND_CHAT = f'{REDIS_BASE_URI}/0' # SSE PubSub settings - SPECIALIST_EXEC_PUBSUB = f'{REDIS_BASE_URI}/0' + SPECIALIST_EXEC_PUBSUB = f"{REDIS_BASE_URI}/{REDIS_SPECIALIST_EXEC_DB}" # eveai_model cache Redis setting MODEL_CACHE_URL = f'{REDIS_BASE_URI}/0' diff --git a/docker/eveai_workers/Dockerfile b/docker/eveai_workers/Dockerfile index 0512ee7..9ddbfd0 100644 --- a/docker/eveai_workers/Dockerfile +++ b/docker/eveai_workers/Dockerfile @@ -1,10 +1,10 @@ FROM registry.ask-eve-ai-local.com/josakola/eveai-base:latest -# Service-specific packages (ffmpeg only needed for this service) -USER root -RUN apt-get update && apt-get install -y --no-install-recommends \ - ffmpeg \ - && rm -rf /var/lib/apt/lists/* +# Service-specific packages: ffmpeg only needed for this service - maar op dit ogenblik overbodig. +#USER root +#RUN apt-get update && apt-get install -y --no-install-recommends \ +# ffmpeg \ +# && rm -rf /var/lib/apt/lists/* USER appuser # Copy the service-specific source code into the container. diff --git a/docker/rebuild_chat_client.sh b/docker/rebuild_chat_client.sh index f93ccf6..e58654a 100755 --- a/docker/rebuild_chat_client.sh +++ b/docker/rebuild_chat_client.sh @@ -7,7 +7,7 @@ echo "Copying client images" cp -fv ../eveai_chat_client/static/assets/img/* ../nginx/static/assets/img -dcdown eveai_chat_client nginx +pcdown cd ../nginx @@ -17,4 +17,4 @@ npm run build cd ../docker ./build_and_push_eveai.sh -b nginx -dcup eveai_chat_client nginx +pcup diff --git a/documentation/Production Setup/cluster-install.md b/documentation/Production Setup/cluster-install.md index ead182d..94c7f55 100644 --- a/documentation/Production Setup/cluster-install.md +++ b/documentation/Production Setup/cluster-install.md @@ -612,6 +612,12 @@ kubectl -n eveai-staging get jobs kubectl -n eveai-staging logs job/ ``` +#### Creating volume for eveai_chat_worker's crewai storage + +```bash +kubectl apply -n eveai-staging -f scaleway/manifests/base/applications/backend/eveai-chat-workers/pvc.yaml +``` + #### Application Services Deployment Use the staging overlay to deploy apps with registry rewrite and imagePullSecrets: ```bash @@ -861,3 +867,63 @@ curl https://evie-staging.askeveai.com/verify/ + + +## EveAI Chat Workers: Persistent logs storage and Celery process behavior + +This addendum describes how to enable persistent storage for CrewAI tuning runs under /app/logs for the eveai-chat-workers Deployment and clarifies Celery process behavior relevant to environment variables. + +### Celery prefork behavior and env variables +- Pool: prefork (default). Each worker process (child) handles multiple tasks sequentially. +- Implication: any environment variable changed inside a child process persists for subsequent tasks handled by that same child, until it is changed again or the process is recycled. +- Our practice: set required env vars (e.g., CREWAI_STORAGE_DIR/CREWAI_STORAGE_PATH) immediately before initializing CrewAI and restore them immediately after. This prevents leakage to the next task in the same process. +- CELERY_MAX_TASKS_PER_CHILD: the number of tasks a child will process before being recycled. Suggested starting range for heavy LLM/RAG workloads: 200–500; 1000 is acceptable if memory growth is stable. Monitor RSS and adjust. + +### Create and mount a PersistentVolumeClaim for /app/logs +We persist tuning outputs under /app/logs by mounting a PVC in the worker pod. + +Manifests added/updated (namespace: eveai-staging): +- scaleway/manifests/base/applications/backend/eveai-chat-workers/pvc.yaml +- scaleway/manifests/base/applications/backend/eveai-chat-workers/deployment.yaml (volume mount added) + +Apply with kubectl (no Kustomize required): + +```bash +# Create or update the PVC for logs +kubectl apply -n eveai-staging -f scaleway/manifests/base/applications/backend/eveai-chat-workers/pvc.yaml + +# Update the Deployment to mount the PVC at /app/logs +kubectl apply -n eveai-staging -f scaleway/manifests/base/applications/backend/eveai-chat-workers/deployment.yaml +``` + +Verify PVC is bound and the pod mounts the volume: + +```bash +# Check PVC status +kubectl get pvc -n eveai-staging eveai-chat-workers-logs -o wide + +# Inspect the pod to confirm the volume mount +kubectl get pods -n eveai-staging -l app=eveai-chat-workers -o name +kubectl describe pod -n eveai-staging + +# (Optional) Exec into the pod to check permissions and path +kubectl exec -n eveai-staging -it -- sh -lc 'id; ls -ld /app/logs' +``` + +Permissions and securityContext notes: +- The container runs as a non-root user (appuser) per Dockerfile.base. Some storage classes mount volumes owned by root. If you encounter permission issues (EACCES) writing to /app/logs: + - Option A: set a pod-level fsGroup so the mounted volume is group-writable by the container user. + - Option B: use an initContainer to chown/chmod /app/logs on the mounted volume. +- Keep monitoring PVC usage and set alerts to avoid running out of space. + +Retention / cleanup recommendation: +- For a 14-day retention, create a CronJob that runs daily to remove files older than 14 days and then delete empty directories, mounting the same PVC at /app/logs. Example command: + +```bash +find /app/logs -type f -mtime +14 -print -delete; find /app/logs -type d -empty -mtime +14 -print -delete +``` + +Operational checks after deployment: +1) Trigger a CrewAI tuning run; verify files appear under /app/logs and remain after pod restarts. +2) Trigger a non-tuning run; verify temporary directories are created and cleaned up automatically. +3) Monitor memory while varying CELERY_CONCURRENCY and CELERY_MAX_TASKS_PER_CHILD. diff --git a/eveai_chat_client/static/assets/vue-components/ChatMessage.vue b/eveai_chat_client/static/assets/vue-components/ChatMessage.vue index 98266ae..0f76421 100644 --- a/eveai_chat_client/static/assets/vue-components/ChatMessage.vue +++ b/eveai_chat_client/static/assets/vue-components/ChatMessage.vue @@ -6,7 +6,7 @@ @@ -105,7 +105,7 @@ @@ -124,7 +124,7 @@ @@ -166,11 +166,23 @@ export default { originalTexts ); + // Helper to build environment-aware static asset URLs + const staticUrl = (p) => { + try { + if (typeof window !== 'undefined' && typeof window.staticUrl === 'function') { + return window.staticUrl(p); + } + } catch (e) {} + const base = '/static/'; + return base + String(p || '').replace(/^\/+/, ''); + }; + return { messageTexts: translations, translationLoading: isLoading, translationError: error, - currentLanguage + currentLanguage, + staticUrl }; }, props: { diff --git a/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue b/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue index d0d09f5..0f9b021 100644 --- a/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue +++ b/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue @@ -14,7 +14,7 @@
Bezig met verwerken... @@ -55,7 +55,7 @@
- Bezig met verwerken...
@@ -72,6 +72,16 @@ import { useComponentTranslations } from '../js/services/LanguageProvider.js'; export default { name: 'ProgressTracker', setup() { + // Helper to build environment-aware static asset URLs + const staticUrl = (p) => { + try { + if (typeof window !== 'undefined' && typeof window.staticUrl === 'function') { + return window.staticUrl(p); + } + } catch (e) {} + const base = '/static/'; + return base + String(p || '').replace(/^\/+/, ''); + }; // Define original English texts (base language for developers) const originalTexts = { error: 'Error while processing', @@ -92,7 +102,8 @@ export default { statusTexts: translations, translationLoading: isLoading, translationError: error, - currentLanguage + currentLanguage, + staticUrl }; }, props: { diff --git a/eveai_chat_client/templates/chat.html b/eveai_chat_client/templates/chat.html index 58b256c..d37d098 100644 --- a/eveai_chat_client/templates/chat.html +++ b/eveai_chat_client/templates/chat.html @@ -28,7 +28,9 @@ tenantMake: { name: "{{ tenant_make.name|default('EveAI') }}", logo_url: "{{ tenant_make.logo_url|default('') }}" - } + }, + // Environment-aware static base provided by Flask's overridden url_for + staticBase: '{{ static_url }}' }; // Debug info om te controleren of chatConfig correct is ingesteld diff --git a/eveai_chat_client/templates/scripts.html b/eveai_chat_client/templates/scripts.html index af29df7..2984c9c 100644 --- a/eveai_chat_client/templates/scripts.html +++ b/eveai_chat_client/templates/scripts.html @@ -1,6 +1,30 @@ + + +