140 lines
5.1 KiB
Python
140 lines
5.1 KiB
Python
import atexit
|
||
import os
|
||
import ssl
|
||
import tempfile
|
||
|
||
from celery import Celery
|
||
from kombu import Queue
|
||
from werkzeug.local import LocalProxy
|
||
from redbeat import RedBeatScheduler
|
||
|
||
celery_app = Celery()
|
||
_tmp_paths = []
|
||
|
||
def _create_ssl_cert_file(cert_data: str) -> str:
|
||
"""Create temporary certificate file for Celery SSL"""
|
||
if not cert_data:
|
||
return None
|
||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') as cert_file:
|
||
cert_file.write(cert_data)
|
||
path = cert_file.name
|
||
_tmp_paths.append(path) # track for cleanup
|
||
return path
|
||
|
||
def _cleanup_tmp():
|
||
for p in _tmp_paths:
|
||
try:
|
||
os.remove(p)
|
||
except Exception:
|
||
pass
|
||
|
||
atexit.register(_cleanup_tmp)
|
||
|
||
|
||
def init_celery(celery, app, is_beat=False):
|
||
celery_app.main = app.name
|
||
|
||
celery_config = {
|
||
'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
|
||
'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
|
||
'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'),
|
||
'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'),
|
||
'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']),
|
||
'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'),
|
||
'enable_utc': app.config.get('CELERY_ENABLE_UTC', True),
|
||
# connection pools
|
||
# 'broker_pool_limit': app.config.get('CELERY_BROKER_POOL_LIMIT', 10),
|
||
}
|
||
|
||
# Transport options (timeouts, max_connections for Redis transport)
|
||
# broker_transport_options = {
|
||
# 'master_name': None, # only relevant for Sentinel; otherwise harmless
|
||
# 'max_connections': 20,
|
||
# 'retry_on_timeout': True,
|
||
# 'socket_connect_timeout': 5,
|
||
# 'socket_timeout': 5,
|
||
# }
|
||
# celery_config['broker_transport_options'] = broker_transport_options
|
||
#
|
||
# # Backend transport options (Redis backend accepts similar timeouts)
|
||
# result_backend_transport_options = {
|
||
# 'retry_on_timeout': True,
|
||
# 'socket_connect_timeout': 5,
|
||
# 'socket_timeout': 5,
|
||
# # max_connections may be supported on newer Celery/redis backends; harmless if ignored
|
||
# 'max_connections': 20,
|
||
# }
|
||
# celery_config['result_backend_transport_options'] = result_backend_transport_options
|
||
|
||
# TLS (only when cert is provided or your URLs are rediss://)
|
||
cert_data = app.config.get('REDIS_CERT_DATA')
|
||
ssl_opts = None
|
||
if cert_data:
|
||
try:
|
||
ca_path = _create_ssl_cert_file(cert_data)
|
||
if ca_path:
|
||
ssl_opts = {
|
||
'ssl_cert_reqs': ssl.CERT_REQUIRED, # <— constant, not string
|
||
'ssl_ca_certs': ca_path,
|
||
# 'ssl_check_hostname': True, # kombu/redis doesn’t consistently honor this; CERT_REQUIRED is the key
|
||
}
|
||
app.logger.info("SSL configured for Celery Redis connection (CA provided)")
|
||
except Exception as e:
|
||
app.logger.error(f"Failed to configure SSL for Celery: {e}")
|
||
|
||
if ssl_opts is None:
|
||
ssl_opts = {'ssl_cert_reqs': ssl.CERT_REQUIRED}
|
||
celery_config['broker_use_ssl'] = ssl_opts
|
||
# Redis result backend needs its own key:
|
||
celery_config['redis_backend_use_ssl'] = ssl_opts
|
||
|
||
# Beat/RedBeat
|
||
if is_beat:
|
||
celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler'
|
||
celery_config['redbeat_lock_key'] = 'redbeat::lock'
|
||
celery_config['beat_max_loop_interval'] = 10
|
||
|
||
celery_app.conf.update(**celery_config)
|
||
|
||
# Queues for workers (note: Redis ignores routing_key and priority features like RabbitMQ)
|
||
if not is_beat:
|
||
celery_app.conf.task_queues = (
|
||
Queue('default', routing_key='task.#'),
|
||
Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}),
|
||
Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}),
|
||
Queue('entitlements', routing_key='entitlements.#', queue_arguments={'x-max-priority': 10}),
|
||
)
|
||
celery_app.conf.task_routes = {
|
||
'eveai_workers.*': { # All tasks from eveai_workers module
|
||
'queue': 'embeddings',
|
||
'routing_key': 'embeddings.#',
|
||
},
|
||
'eveai_chat_workers.*': { # All tasks from eveai_chat_workers module
|
||
'queue': 'llm_interactions',
|
||
'routing_key': 'llm_interactions.#',
|
||
},
|
||
'eveai_entitlements.*': { # All tasks from eveai_entitlements module
|
||
'queue': 'entitlements',
|
||
'routing_key': 'entitlements.#',
|
||
}
|
||
}
|
||
|
||
# Ensure tasks execute with Flask context
|
||
class ContextTask(celery.Task):
|
||
def __call__(self, *args, **kwargs):
|
||
with app.app_context():
|
||
return self.run(*args, **kwargs)
|
||
|
||
celery.Task = ContextTask
|
||
|
||
|
||
def make_celery(app_name, config):
|
||
# keep API but return the single instance
|
||
return celery_app
|
||
|
||
|
||
def _get_current_celery():
|
||
return celery_app
|
||
|
||
|
||
current_celery = LocalProxy(_get_current_celery) |