# common/utils/execution_progress.py from datetime import datetime as dt, timezone as tz from typing import Generator from redis import Redis, RedisError import json from flask import current_app class ExecutionProgressTracker: """Tracks progress of specialist executions using Redis""" def __init__(self): try: redis_url = current_app.config['SPECIALIST_EXEC_PUBSUB'] self.redis = Redis.from_url(redis_url, socket_timeout=5) # Test the connection self.redis.ping() self.expiry = 3600 # 1 hour expiry except RedisError as e: current_app.logger.error(f"Failed to connect to Redis: {str(e)}") raise except Exception as e: current_app.logger.error(f"Unexpected error during Redis initialization: {str(e)}") raise def _get_key(self, execution_id: str) -> str: return f"specialist_execution:{execution_id}" def send_update(self, ctask_id: str, processing_type: str, data: dict): """Send an update about execution progress""" try: current_app.logger.debug(f"Sending update for {ctask_id} with processing type {processing_type} and data:\n" f"{data}") key = self._get_key(ctask_id) # First verify Redis is still connected try: self.redis.ping() except RedisError: current_app.logger.error("Lost Redis connection. Attempting to reconnect...") self.__init__() # Reinitialize connection update = { 'processing_type': processing_type, 'data': data, 'timestamp': dt.now(tz=tz.utc) } # Log initial state try: orig_len = self.redis.llen(key) # Try to serialize the update and check the result try: serialized_update = json.dumps(update, default=str) # Add default handler for datetime except TypeError as e: current_app.logger.error(f"Failed to serialize update: {str(e)}") raise # Store update in list with pipeline for atomicity with self.redis.pipeline() as pipe: pipe.rpush(key, serialized_update) pipe.publish(key, serialized_update) pipe.expire(key, self.expiry) results = pipe.execute() new_len = self.redis.llen(key) if new_len <= orig_len: current_app.logger.error( f"List length did not increase as expected. Original: {orig_len}, New: {new_len}") except RedisError as e: current_app.logger.error(f"Redis operation failed: {str(e)}") raise except Exception as e: current_app.logger.error(f"Unexpected error in send_update: {str(e)}, type: {type(e)}") raise def get_updates(self, ctask_id: str) -> Generator[str, None, None]: key = self._get_key(ctask_id) pubsub = self.redis.pubsub() pubsub.subscribe(key) try: # First yield any existing updates length = self.redis.llen(key) if length > 0: updates = self.redis.lrange(key, 0, -1) for update in updates: update_data = json.loads(update.decode('utf-8')) # Use processing_type for the event yield f"event: {update_data['processing_type']}\n" yield f"data: {json.dumps(update_data)}\n\n" # Then listen for new updates while True: message = pubsub.get_message(timeout=30) # message['type'] is Redis pub/sub type if message is None: yield ": keepalive\n\n" continue if message['type'] == 'message': # This is Redis pub/sub type update_data = json.loads(message['data'].decode('utf-8')) yield f"data: {message['data'].decode('utf-8')}\n\n" # Check processing_type for completion if update_data['processing_type'] in ['Task Complete', 'Task Error']: break finally: pubsub.unsubscribe()