import ssl from typing import Dict, Any import redis from flask import Flask def _build_pubsub_redis_config(app: Flask) -> Dict[str, Any]: """Build Redis ConnectionPool config for the pubsub/EPT workload using app.config. Does not modify cache or session pools. """ cfg = app.config config: Dict[str, Any] = { 'host': cfg['REDIS_URL'], 'port': cfg['REDIS_PORT'], 'db': int(cfg.get('REDIS_SPECIALIST_EXEC_DB', '0')), 'max_connections': int(cfg.get('REDIS_PUBSUB_MAX_CONNECTIONS', 200)), 'retry_on_timeout': True, 'socket_keepalive': True, 'socket_keepalive_options': {}, 'socket_timeout': float(cfg.get('REDIS_PUBSUB_SOCKET_TIMEOUT', 10.0)), 'socket_connect_timeout': float(cfg.get('REDIS_PUBSUB_CONNECT_TIMEOUT', 3.0)), } # Authentication if present un = cfg.get('REDIS_USER') pw = cfg.get('REDIS_PASS') if un and pw: config.update({'username': un, 'password': pw}) # TLS when configured cert_path = cfg.get('REDIS_CA_CERT_PATH') if cfg.get('REDIS_SCHEME') == 'rediss' and cert_path: config.update({ 'connection_class': redis.SSLConnection, 'ssl_cert_reqs': ssl.CERT_REQUIRED, 'ssl_check_hostname': cfg.get('REDIS_SSL_CHECK_HOSTNAME', True), 'ssl_ca_certs': cert_path, }) return config def create_pubsub_pool(app: Flask) -> redis.ConnectionPool: """Create and store the dedicated pubsub ConnectionPool in app.extensions.""" if not hasattr(app, 'extensions'): app.extensions = {} # Reuse existing if already created pool = app.extensions.get('redis_pubsub_pool') if pool is not None: return pool config = _build_pubsub_redis_config(app) pool = redis.ConnectionPool(**config) app.extensions['redis_pubsub_pool'] = pool # Log a concise, non-sensitive summary try: summary = { 'scheme': app.config.get('REDIS_SCHEME'), 'host': app.config.get('REDIS_URL'), 'port': app.config.get('REDIS_PORT'), 'db': app.config.get('REDIS_SPECIALIST_EXEC_DB', '0'), 'ssl_check_hostname': app.config.get('REDIS_SSL_CHECK_HOSTNAME'), 'ca_present': bool(app.config.get('REDIS_CA_CERT_PATH')), 'max_connections': app.config.get('REDIS_PUBSUB_MAX_CONNECTIONS'), 'socket_timeout': app.config.get('REDIS_PUBSUB_SOCKET_TIMEOUT'), 'socket_connect_timeout': app.config.get('REDIS_PUBSUB_CONNECT_TIMEOUT'), } app.logger.info(f"Initialized Redis pubsub pool: {summary}") except Exception: pass return pool def get_pubsub_client(app: Flask) -> redis.Redis: """Get a Redis client bound to the dedicated pubsub pool.""" pool = app.extensions.get('redis_pubsub_pool') if pool is None: pool = create_pubsub_pool(app) return redis.Redis(connection_pool=pool)