- Implementation of specialist execution api, including SSE protocol
- eveai_chat becomes deprecated and should be replaced with SSE - Adaptation of STANDARD_RAG specialist - Base class definition allowing to realise specialists with crewai framework - Implementation of SPIN_SPECIALIST - Implementation of test app for testing specialists (test_specialist_client). Also serves as an example for future SSE-based client - Improvements to startup scripts to better handle and scale multiple connections - Small improvements to the interaction forms and views - Caching implementation improved and augmented with additional caches
This commit is contained in:
112
common/utils/execution_progress.py
Normal file
112
common/utils/execution_progress.py
Normal file
@@ -0,0 +1,112 @@
|
||||
# common/utils/execution_progress.py
|
||||
from datetime import datetime as dt, timezone as tz
|
||||
from typing import Generator
|
||||
from redis import Redis, RedisError
|
||||
import json
|
||||
from flask import current_app
|
||||
|
||||
|
||||
class ExecutionProgressTracker:
|
||||
"""Tracks progress of specialist executions using Redis"""
|
||||
|
||||
def __init__(self):
|
||||
try:
|
||||
redis_url = current_app.config['SPECIALIST_EXEC_PUBSUB']
|
||||
|
||||
self.redis = Redis.from_url(redis_url, socket_timeout=5)
|
||||
# Test the connection
|
||||
self.redis.ping()
|
||||
|
||||
self.expiry = 3600 # 1 hour expiry
|
||||
except RedisError as e:
|
||||
current_app.logger.error(f"Failed to connect to Redis: {str(e)}")
|
||||
raise
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Unexpected error during Redis initialization: {str(e)}")
|
||||
raise
|
||||
|
||||
def _get_key(self, execution_id: str) -> str:
|
||||
return f"specialist_execution:{execution_id}"
|
||||
|
||||
def send_update(self, ctask_id: str, processing_type: str, data: dict):
|
||||
"""Send an update about execution progress"""
|
||||
try:
|
||||
key = self._get_key(ctask_id)
|
||||
|
||||
# First verify Redis is still connected
|
||||
try:
|
||||
self.redis.ping()
|
||||
except RedisError:
|
||||
current_app.logger.error("Lost Redis connection. Attempting to reconnect...")
|
||||
self.__init__() # Reinitialize connection
|
||||
|
||||
update = {
|
||||
'processing_type': processing_type,
|
||||
'data': data,
|
||||
'timestamp': dt.now(tz=tz.utc)
|
||||
}
|
||||
|
||||
# Log initial state
|
||||
try:
|
||||
orig_len = self.redis.llen(key)
|
||||
|
||||
# Try to serialize the update and check the result
|
||||
try:
|
||||
serialized_update = json.dumps(update, default=str) # Add default handler for datetime
|
||||
except TypeError as e:
|
||||
current_app.logger.error(f"Failed to serialize update: {str(e)}")
|
||||
raise
|
||||
|
||||
# Store update in list with pipeline for atomicity
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.rpush(key, serialized_update)
|
||||
pipe.publish(key, serialized_update)
|
||||
pipe.expire(key, self.expiry)
|
||||
results = pipe.execute()
|
||||
|
||||
new_len = self.redis.llen(key)
|
||||
|
||||
if new_len <= orig_len:
|
||||
current_app.logger.error(
|
||||
f"List length did not increase as expected. Original: {orig_len}, New: {new_len}")
|
||||
|
||||
except RedisError as e:
|
||||
current_app.logger.error(f"Redis operation failed: {str(e)}")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Unexpected error in send_update: {str(e)}, type: {type(e)}")
|
||||
raise
|
||||
|
||||
def get_updates(self, ctask_id: str) -> Generator[str, None, None]:
|
||||
key = self._get_key(ctask_id)
|
||||
pubsub = self.redis.pubsub()
|
||||
pubsub.subscribe(key)
|
||||
|
||||
try:
|
||||
# First yield any existing updates
|
||||
length = self.redis.llen(key)
|
||||
if length > 0:
|
||||
updates = self.redis.lrange(key, 0, -1)
|
||||
for update in updates:
|
||||
update_data = json.loads(update.decode('utf-8'))
|
||||
# Use processing_type for the event
|
||||
yield f"event: {update_data['processing_type']}\n"
|
||||
yield f"data: {json.dumps(update_data)}\n\n"
|
||||
|
||||
# Then listen for new updates
|
||||
while True:
|
||||
message = pubsub.get_message(timeout=30) # message['type'] is Redis pub/sub type
|
||||
if message is None:
|
||||
yield ": keepalive\n\n"
|
||||
continue
|
||||
|
||||
if message['type'] == 'message': # This is Redis pub/sub type
|
||||
update_data = json.loads(message['data'].decode('utf-8'))
|
||||
yield f"data: {message['data'].decode('utf-8')}\n\n"
|
||||
|
||||
# Check processing_type for completion
|
||||
if update_data['processing_type'] in ['Task Complete', 'Task Error']:
|
||||
break
|
||||
finally:
|
||||
pubsub.unsubscribe()
|
||||
Reference in New Issue
Block a user