Merge branch 'bugfix/TRA-86_Sometimes_client_becomes_unresponsive' into develop
# Conflicts: # config/static-manifest/manifest.json
This commit is contained in:
@@ -10,6 +10,13 @@ import time
|
|||||||
class ExecutionProgressTracker:
|
class ExecutionProgressTracker:
|
||||||
"""Tracks progress of specialist executions using Redis"""
|
"""Tracks progress of specialist executions using Redis"""
|
||||||
|
|
||||||
|
# Normalized processing types and aliases
|
||||||
|
PT_COMPLETE = 'EVEAI_COMPLETE'
|
||||||
|
PT_ERROR = 'EVEAI_ERROR'
|
||||||
|
|
||||||
|
_COMPLETE_ALIASES = {'EveAI Specialist Complete', 'Task Complete', 'task complete'}
|
||||||
|
_ERROR_ALIASES = {'EveAI Specialist Error', 'Task Error', 'task error'}
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
try:
|
try:
|
||||||
# Use shared pubsub pool (lazy connect; no eager ping)
|
# Use shared pubsub pool (lazy connect; no eager ping)
|
||||||
@@ -40,6 +47,16 @@ class ExecutionProgressTracker:
|
|||||||
# Exhausted retries
|
# Exhausted retries
|
||||||
raise last_exc
|
raise last_exc
|
||||||
|
|
||||||
|
def _normalize_processing_type(self, processing_type: str) -> str:
|
||||||
|
if not processing_type:
|
||||||
|
return processing_type
|
||||||
|
p = str(processing_type).strip()
|
||||||
|
if p in self._COMPLETE_ALIASES:
|
||||||
|
return self.PT_COMPLETE
|
||||||
|
if p in self._ERROR_ALIASES:
|
||||||
|
return self.PT_ERROR
|
||||||
|
return p
|
||||||
|
|
||||||
def send_update(self, ctask_id: str, processing_type: str, data: dict):
|
def send_update(self, ctask_id: str, processing_type: str, data: dict):
|
||||||
"""Send an update about execution progress"""
|
"""Send an update about execution progress"""
|
||||||
try:
|
try:
|
||||||
@@ -47,7 +64,7 @@ class ExecutionProgressTracker:
|
|||||||
f"{data}")
|
f"{data}")
|
||||||
key = self._get_key(ctask_id)
|
key = self._get_key(ctask_id)
|
||||||
|
|
||||||
|
processing_type = self._normalize_processing_type(processing_type)
|
||||||
update = {
|
update = {
|
||||||
'processing_type': processing_type,
|
'processing_type': processing_type,
|
||||||
'data': data,
|
'data': data,
|
||||||
@@ -96,14 +113,16 @@ class ExecutionProgressTracker:
|
|||||||
self._retry(lambda: pubsub.subscribe(key))
|
self._retry(lambda: pubsub.subscribe(key))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Hint client reconnect interval (optional but helpful)
|
||||||
|
yield "retry: 3000\n\n"
|
||||||
|
|
||||||
# First yield any existing updates
|
# First yield any existing updates
|
||||||
length = self._retry(lambda: self.redis.llen(key))
|
length = self._retry(lambda: self.redis.llen(key))
|
||||||
if length > 0:
|
if length > 0:
|
||||||
updates = self._retry(lambda: self.redis.lrange(key, 0, -1))
|
updates = self._retry(lambda: self.redis.lrange(key, 0, -1))
|
||||||
for update in updates:
|
for update in updates:
|
||||||
update_data = json.loads(update.decode('utf-8'))
|
update_data = json.loads(update.decode('utf-8'))
|
||||||
# Use processing_type for the event
|
update_data['processing_type'] = self._normalize_processing_type(update_data.get('processing_type'))
|
||||||
yield f"event: {update_data['processing_type']}\n"
|
|
||||||
yield f"data: {json.dumps(update_data)}\n\n"
|
yield f"data: {json.dumps(update_data)}\n\n"
|
||||||
|
|
||||||
# Then listen for new updates
|
# Then listen for new updates
|
||||||
@@ -121,13 +140,20 @@ class ExecutionProgressTracker:
|
|||||||
|
|
||||||
if message['type'] == 'message': # This is Redis pub/sub type
|
if message['type'] == 'message': # This is Redis pub/sub type
|
||||||
update_data = json.loads(message['data'].decode('utf-8'))
|
update_data = json.loads(message['data'].decode('utf-8'))
|
||||||
yield f"data: {message['data'].decode('utf-8')}\n\n"
|
update_data['processing_type'] = self._normalize_processing_type(update_data.get('processing_type'))
|
||||||
|
yield f"data: {json.dumps(update_data)}\n\n"
|
||||||
|
|
||||||
# Check processing_type for completion
|
# Unified completion check
|
||||||
if update_data['processing_type'] in ['Task Complete', 'Task Error', 'EveAI Specialist Complete']:
|
if update_data['processing_type'] in [self.PT_COMPLETE, self.PT_ERROR]:
|
||||||
|
# Give proxies/clients a chance to flush
|
||||||
|
yield ": closing\n\n"
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
pubsub.unsubscribe()
|
pubsub.unsubscribe()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
try:
|
||||||
|
pubsub.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|||||||
@@ -240,15 +240,15 @@ export default {
|
|||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
console.log('Progress update:', data);
|
console.log('Progress update:', data);
|
||||||
|
|
||||||
// Check voor processing_type om te bepalen welke handler te gebruiken
|
// Check voor processing_type om te bepalen welke handler te gebruiken (ondersteun legacy en genormaliseerde waarden)
|
||||||
if (data.processing_type === 'EveAI Specialist Complete') {
|
if (data.processing_type === 'EveAI Specialist Complete' || data.processing_type === 'EVEAI_COMPLETE') {
|
||||||
console.log('Detected specialist complete via processing_type');
|
console.log('Detected specialist complete via processing_type');
|
||||||
this.handleSpecialistComplete(event);
|
this.handleSpecialistComplete(event);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check voor andere completion statuses en errors
|
// Check voor andere completion statuses en errors
|
||||||
if (data.processing_type === 'EveAI Specialist Error')
|
if (data.processing_type === 'EveAI Specialist Error' || data.processing_type === 'EVEAI_ERROR')
|
||||||
{
|
{
|
||||||
console.log('Detected specialist error via processing_type or status');
|
console.log('Detected specialist error via processing_type or status');
|
||||||
this.handleSpecialistError(event);
|
this.handleSpecialistError(event);
|
||||||
|
|||||||
@@ -301,7 +301,8 @@ def task_progress_stream(task_id):
|
|||||||
mimetype='text/event-stream',
|
mimetype='text/event-stream',
|
||||||
headers={
|
headers={
|
||||||
'Cache-Control': 'no-cache',
|
'Cache-Control': 'no-cache',
|
||||||
'X-Accel-Buffering': 'no'
|
'X-Accel-Buffering': 'no',
|
||||||
|
'Connection': 'keep-alive'
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user