From 5465dae52f202bac989ce781bc153829eb95d652 Mon Sep 17 00:00:00 2001 From: Josako Date: Fri, 3 Oct 2025 08:58:44 +0200 Subject: [PATCH] - Optimisation and streamlining of messages in ExecutionProgressTracker (ept) - Adaptation of ProgressTracker to handle these optimised messages - Hardening SSE-streaming in eveai_chat_client --- common/utils/execution_progress.py | 38 ++++++++++++++++--- config/static-manifest/manifest.json | 4 +- docker/nginx/Dockerfile | 2 +- .../assets/vue-components/ProgressTracker.vue | 6 +-- eveai_chat_client/views/chat_views.py | 3 +- 5 files changed, 40 insertions(+), 13 deletions(-) diff --git a/common/utils/execution_progress.py b/common/utils/execution_progress.py index fdd1616..92eda38 100644 --- a/common/utils/execution_progress.py +++ b/common/utils/execution_progress.py @@ -10,6 +10,13 @@ import time class ExecutionProgressTracker: """Tracks progress of specialist executions using Redis""" + # Normalized processing types and aliases + PT_COMPLETE = 'EVEAI_COMPLETE' + PT_ERROR = 'EVEAI_ERROR' + + _COMPLETE_ALIASES = {'EveAI Specialist Complete', 'Task Complete', 'task complete'} + _ERROR_ALIASES = {'EveAI Specialist Error', 'Task Error', 'task error'} + def __init__(self): try: # Use shared pubsub pool (lazy connect; no eager ping) @@ -40,6 +47,16 @@ class ExecutionProgressTracker: # Exhausted retries raise last_exc + def _normalize_processing_type(self, processing_type: str) -> str: + if not processing_type: + return processing_type + p = str(processing_type).strip() + if p in self._COMPLETE_ALIASES: + return self.PT_COMPLETE + if p in self._ERROR_ALIASES: + return self.PT_ERROR + return p + def send_update(self, ctask_id: str, processing_type: str, data: dict): """Send an update about execution progress""" try: @@ -47,7 +64,7 @@ class ExecutionProgressTracker: f"{data}") key = self._get_key(ctask_id) - + processing_type = self._normalize_processing_type(processing_type) update = { 'processing_type': processing_type, 'data': data, @@ -96,14 +113,16 @@ class ExecutionProgressTracker: self._retry(lambda: pubsub.subscribe(key)) try: + # Hint client reconnect interval (optional but helpful) + yield "retry: 3000\n\n" + # First yield any existing updates length = self._retry(lambda: self.redis.llen(key)) if length > 0: updates = self._retry(lambda: self.redis.lrange(key, 0, -1)) for update in updates: update_data = json.loads(update.decode('utf-8')) - # Use processing_type for the event - yield f"event: {update_data['processing_type']}\n" + update_data['processing_type'] = self._normalize_processing_type(update_data.get('processing_type')) yield f"data: {json.dumps(update_data)}\n\n" # Then listen for new updates @@ -121,13 +140,20 @@ class ExecutionProgressTracker: if message['type'] == 'message': # This is Redis pub/sub type update_data = json.loads(message['data'].decode('utf-8')) - yield f"data: {message['data'].decode('utf-8')}\n\n" + update_data['processing_type'] = self._normalize_processing_type(update_data.get('processing_type')) + yield f"data: {json.dumps(update_data)}\n\n" - # Check processing_type for completion - if update_data['processing_type'] in ['Task Complete', 'Task Error', 'EveAI Specialist Complete']: + # Unified completion check + if update_data['processing_type'] in [self.PT_COMPLETE, self.PT_ERROR]: + # Give proxies/clients a chance to flush + yield ": closing\n\n" break finally: try: pubsub.unsubscribe() except Exception: pass + try: + pubsub.close() + except Exception: + pass diff --git a/config/static-manifest/manifest.json b/config/static-manifest/manifest.json index 5df93f6..9043959 100644 --- a/config/static-manifest/manifest.json +++ b/config/static-manifest/manifest.json @@ -1,6 +1,6 @@ { - "dist/chat-client.js": "dist/chat-client.13481d75.js", - "dist/chat-client.css": "dist/chat-client.7d8832b6.css", + "dist/chat-client.js": "dist/chat-client.824c5b9d.js", + "dist/chat-client.css": "dist/chat-client.b7de7a18.css", "dist/main.js": "dist/main.f3dde0f6.js", "dist/main.css": "dist/main.c40e57ad.css" } \ No newline at end of file diff --git a/docker/nginx/Dockerfile b/docker/nginx/Dockerfile index bacef45..cc291e2 100644 --- a/docker/nginx/Dockerfile +++ b/docker/nginx/Dockerfile @@ -16,7 +16,7 @@ RUN mkdir -p /etc/nginx/static /etc/nginx/public COPY ../../nginx/static /etc/nginx/static # Copy public files -COPY ../../nginx/public /etc/nginx/public +# COPY ../../nginx/public /etc/nginx/public # Copy site-specific configurations RUN mkdir -p /etc/nginx/sites-enabled diff --git a/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue b/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue index 8ccefc1..875e948 100644 --- a/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue +++ b/eveai_chat_client/static/assets/vue-components/ProgressTracker.vue @@ -240,15 +240,15 @@ export default { const data = JSON.parse(event.data); console.log('Progress update:', data); - // Check voor processing_type om te bepalen welke handler te gebruiken - if (data.processing_type === 'EveAI Specialist Complete') { + // Check voor processing_type om te bepalen welke handler te gebruiken (ondersteun legacy en genormaliseerde waarden) + if (data.processing_type === 'EveAI Specialist Complete' || data.processing_type === 'EVEAI_COMPLETE') { console.log('Detected specialist complete via processing_type'); this.handleSpecialistComplete(event); return; } // Check voor andere completion statuses en errors - if (data.processing_type === 'EveAI Specialist Error') + if (data.processing_type === 'EveAI Specialist Error' || data.processing_type === 'EVEAI_ERROR') { console.log('Detected specialist error via processing_type or status'); this.handleSpecialistError(event); diff --git a/eveai_chat_client/views/chat_views.py b/eveai_chat_client/views/chat_views.py index 1a8e440..314e76f 100644 --- a/eveai_chat_client/views/chat_views.py +++ b/eveai_chat_client/views/chat_views.py @@ -301,7 +301,8 @@ def task_progress_stream(task_id): mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', - 'X-Accel-Buffering': 'no' + 'X-Accel-Buffering': 'no', + 'Connection': 'keep-alive' } ) except Exception as e: