- eveai_chat_client updated to retrieve static files from the correct (bunny.net) location when a STATIC_URL is defined.
- Defined locations for crewai crew memory. This failed in k8s. - Redis connection for pub/sub in ExecutionProgressTracker adapted to conform to TLS-enabled connections
This commit is contained in:
@@ -4,6 +4,7 @@ from typing import Generator
|
||||
from redis import Redis, RedisError
|
||||
import json
|
||||
from flask import current_app
|
||||
import time
|
||||
|
||||
|
||||
class ExecutionProgressTracker:
|
||||
@@ -11,22 +12,33 @@ class ExecutionProgressTracker:
|
||||
|
||||
def __init__(self):
|
||||
try:
|
||||
redis_url = current_app.config['SPECIALIST_EXEC_PUBSUB']
|
||||
|
||||
self.redis = Redis.from_url(redis_url, socket_timeout=5)
|
||||
# Test the connection
|
||||
self.redis.ping()
|
||||
|
||||
# Use shared pubsub pool (lazy connect; no eager ping)
|
||||
from common.utils.redis_pubsub_pool import get_pubsub_client
|
||||
self.redis = get_pubsub_client(current_app)
|
||||
self.expiry = 3600 # 1 hour expiry
|
||||
except RedisError as e:
|
||||
current_app.logger.error(f"Failed to connect to Redis: {str(e)}")
|
||||
raise
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Unexpected error during Redis initialization: {str(e)}")
|
||||
current_app.logger.error(f"Error initializing ExecutionProgressTracker: {str(e)}")
|
||||
raise
|
||||
|
||||
def _get_key(self, execution_id: str) -> str:
|
||||
return f"specialist_execution:{execution_id}"
|
||||
prefix = current_app.config.get('REDIS_PREFIXES', {}).get('pubsub_execution', 'pubsub:execution:')
|
||||
return f"{prefix}{execution_id}"
|
||||
|
||||
def _retry(self, op, attempts: int = 3, base_delay: float = 0.1):
|
||||
"""Retry wrapper for Redis operations with exponential backoff."""
|
||||
last_exc = None
|
||||
for i in range(attempts):
|
||||
try:
|
||||
return op()
|
||||
except RedisError as e:
|
||||
last_exc = e
|
||||
if i == attempts - 1:
|
||||
break
|
||||
delay = base_delay * (3 ** i) # 0.1, 0.3, 0.9
|
||||
current_app.logger.warning(f"Redis operation failed (attempt {i+1}/{attempts}): {e}. Retrying in {delay}s")
|
||||
time.sleep(delay)
|
||||
# Exhausted retries
|
||||
raise last_exc
|
||||
|
||||
def send_update(self, ctask_id: str, processing_type: str, data: dict):
|
||||
"""Send an update about execution progress"""
|
||||
@@ -35,12 +47,6 @@ class ExecutionProgressTracker:
|
||||
f"{data}")
|
||||
key = self._get_key(ctask_id)
|
||||
|
||||
# First verify Redis is still connected
|
||||
try:
|
||||
self.redis.ping()
|
||||
except RedisError:
|
||||
current_app.logger.error("Lost Redis connection. Attempting to reconnect...")
|
||||
self.__init__() # Reinitialize connection
|
||||
|
||||
update = {
|
||||
'processing_type': processing_type,
|
||||
@@ -50,7 +56,7 @@ class ExecutionProgressTracker:
|
||||
|
||||
# Log initial state
|
||||
try:
|
||||
orig_len = self.redis.llen(key)
|
||||
orig_len = self._retry(lambda: self.redis.llen(key))
|
||||
|
||||
# Try to serialize the update and check the result
|
||||
try:
|
||||
@@ -60,13 +66,16 @@ class ExecutionProgressTracker:
|
||||
raise
|
||||
|
||||
# Store update in list with pipeline for atomicity
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.rpush(key, serialized_update)
|
||||
pipe.publish(key, serialized_update)
|
||||
pipe.expire(key, self.expiry)
|
||||
results = pipe.execute()
|
||||
def _pipeline_op():
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.rpush(key, serialized_update)
|
||||
pipe.publish(key, serialized_update)
|
||||
pipe.expire(key, self.expiry)
|
||||
return pipe.execute()
|
||||
|
||||
new_len = self.redis.llen(key)
|
||||
results = self._retry(_pipeline_op)
|
||||
|
||||
new_len = self._retry(lambda: self.redis.llen(key))
|
||||
|
||||
if new_len <= orig_len:
|
||||
current_app.logger.error(
|
||||
@@ -83,13 +92,14 @@ class ExecutionProgressTracker:
|
||||
def get_updates(self, ctask_id: str) -> Generator[str, None, None]:
|
||||
key = self._get_key(ctask_id)
|
||||
pubsub = self.redis.pubsub()
|
||||
pubsub.subscribe(key)
|
||||
# Subscribe with retry
|
||||
self._retry(lambda: pubsub.subscribe(key))
|
||||
|
||||
try:
|
||||
# First yield any existing updates
|
||||
length = self.redis.llen(key)
|
||||
length = self._retry(lambda: self.redis.llen(key))
|
||||
if length > 0:
|
||||
updates = self.redis.lrange(key, 0, -1)
|
||||
updates = self._retry(lambda: self.redis.lrange(key, 0, -1))
|
||||
for update in updates:
|
||||
update_data = json.loads(update.decode('utf-8'))
|
||||
# Use processing_type for the event
|
||||
@@ -98,7 +108,13 @@ class ExecutionProgressTracker:
|
||||
|
||||
# Then listen for new updates
|
||||
while True:
|
||||
message = pubsub.get_message(timeout=30) # message['type'] is Redis pub/sub type
|
||||
try:
|
||||
message = pubsub.get_message(timeout=30) # message['type'] is Redis pub/sub type
|
||||
except RedisError as e:
|
||||
current_app.logger.warning(f"Redis pubsub get_message error: {e}. Continuing...")
|
||||
time.sleep(0.3)
|
||||
continue
|
||||
|
||||
if message is None:
|
||||
yield ": keepalive\n\n"
|
||||
continue
|
||||
@@ -111,4 +127,7 @@ class ExecutionProgressTracker:
|
||||
if update_data['processing_type'] in ['Task Complete', 'Task Error']:
|
||||
break
|
||||
finally:
|
||||
pubsub.unsubscribe()
|
||||
try:
|
||||
pubsub.unsubscribe()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
84
common/utils/redis_pubsub_pool.py
Normal file
84
common/utils/redis_pubsub_pool.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import ssl
|
||||
from typing import Dict, Any
|
||||
|
||||
import redis
|
||||
from flask import Flask
|
||||
|
||||
|
||||
def _build_pubsub_redis_config(app: Flask) -> Dict[str, Any]:
|
||||
"""Build Redis ConnectionPool config for the pubsub/EPT workload using app.config.
|
||||
Does not modify cache or session pools.
|
||||
"""
|
||||
cfg = app.config
|
||||
|
||||
config: Dict[str, Any] = {
|
||||
'host': cfg['REDIS_URL'],
|
||||
'port': cfg['REDIS_PORT'],
|
||||
'db': int(cfg.get('REDIS_SPECIALIST_EXEC_DB', '0')),
|
||||
'max_connections': int(cfg.get('REDIS_PUBSUB_MAX_CONNECTIONS', 200)),
|
||||
'retry_on_timeout': True,
|
||||
'socket_keepalive': True,
|
||||
'socket_keepalive_options': {},
|
||||
'socket_timeout': float(cfg.get('REDIS_PUBSUB_SOCKET_TIMEOUT', 10.0)),
|
||||
'socket_connect_timeout': float(cfg.get('REDIS_PUBSUB_CONNECT_TIMEOUT', 3.0)),
|
||||
}
|
||||
|
||||
# Authentication if present
|
||||
un = cfg.get('REDIS_USER')
|
||||
pw = cfg.get('REDIS_PASS')
|
||||
if un and pw:
|
||||
config.update({'username': un, 'password': pw})
|
||||
|
||||
# TLS when configured
|
||||
cert_path = cfg.get('REDIS_CA_CERT_PATH')
|
||||
if cfg.get('REDIS_SCHEME') == 'rediss' and cert_path:
|
||||
config.update({
|
||||
'connection_class': redis.SSLConnection,
|
||||
'ssl_cert_reqs': ssl.CERT_REQUIRED,
|
||||
'ssl_check_hostname': cfg.get('REDIS_SSL_CHECK_HOSTNAME', True),
|
||||
'ssl_ca_certs': cert_path,
|
||||
})
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def create_pubsub_pool(app: Flask) -> redis.ConnectionPool:
|
||||
"""Create and store the dedicated pubsub ConnectionPool in app.extensions."""
|
||||
if not hasattr(app, 'extensions'):
|
||||
app.extensions = {}
|
||||
|
||||
# Reuse existing if already created
|
||||
pool = app.extensions.get('redis_pubsub_pool')
|
||||
if pool is not None:
|
||||
return pool
|
||||
|
||||
config = _build_pubsub_redis_config(app)
|
||||
pool = redis.ConnectionPool(**config)
|
||||
app.extensions['redis_pubsub_pool'] = pool
|
||||
|
||||
# Log a concise, non-sensitive summary
|
||||
try:
|
||||
summary = {
|
||||
'scheme': app.config.get('REDIS_SCHEME'),
|
||||
'host': app.config.get('REDIS_URL'),
|
||||
'port': app.config.get('REDIS_PORT'),
|
||||
'db': app.config.get('REDIS_SPECIALIST_EXEC_DB', '0'),
|
||||
'ssl_check_hostname': app.config.get('REDIS_SSL_CHECK_HOSTNAME'),
|
||||
'ca_present': bool(app.config.get('REDIS_CA_CERT_PATH')),
|
||||
'max_connections': app.config.get('REDIS_PUBSUB_MAX_CONNECTIONS'),
|
||||
'socket_timeout': app.config.get('REDIS_PUBSUB_SOCKET_TIMEOUT'),
|
||||
'socket_connect_timeout': app.config.get('REDIS_PUBSUB_CONNECT_TIMEOUT'),
|
||||
}
|
||||
app.logger.info(f"Initialized Redis pubsub pool: {summary}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return pool
|
||||
|
||||
|
||||
def get_pubsub_client(app: Flask) -> redis.Redis:
|
||||
"""Get a Redis client bound to the dedicated pubsub pool."""
|
||||
pool = app.extensions.get('redis_pubsub_pool')
|
||||
if pool is None:
|
||||
pool = create_pubsub_pool(app)
|
||||
return redis.Redis(connection_pool=pool)
|
||||
Reference in New Issue
Block a user