Files
eveAI/common/utils/celery_utils.py
2025-09-04 15:22:45 +02:00

112 lines
4.3 KiB
Python

import ssl
from celery import Celery
from kombu import Queue
from werkzeug.local import LocalProxy
from redbeat import RedBeatScheduler
celery_app = Celery()
def init_celery(celery, app, is_beat=False):
celery_app.main = app.name
celery_config = {
'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'),
'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'),
'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']),
'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'),
'enable_utc': app.config.get('CELERY_ENABLE_UTC', True),
# connection pools
# 'broker_pool_limit': app.config.get('CELERY_BROKER_POOL_LIMIT', 10),
}
# Transport options (timeouts, max_connections for Redis transport)
# broker_transport_options = {
# 'master_name': None, # only relevant for Sentinel; otherwise harmless
# 'max_connections': 20,
# 'retry_on_timeout': True,
# 'socket_connect_timeout': 5,
# 'socket_timeout': 5,
# }
# celery_config['broker_transport_options'] = broker_transport_options
#
# # Backend transport options (Redis backend accepts similar timeouts)
# result_backend_transport_options = {
# 'retry_on_timeout': True,
# 'socket_connect_timeout': 5,
# 'socket_timeout': 5,
# # max_connections may be supported on newer Celery/redis backends; harmless if ignored
# 'max_connections': 20,
# }
# celery_config['result_backend_transport_options'] = result_backend_transport_options
# TLS (only when cert is provided or your URLs are rediss://)
ssl_opts = None
cert_path = app.config.get('REDIS_CA_CERT_PATH')
if cert_path:
ssl_opts = {
'ssl_cert_reqs': ssl.CERT_REQUIRED,
'ssl_ca_certs': cert_path,
'ssl_check_hostname': app.config.get('REDIS_SSL_CHECK_HOSTNAME', True),
}
app.logger.info(
"SSL configured for Celery Redis connection (CA: %s, hostname-check: %s)",
cert_path,
'enabled' if app.config.get('REDIS_SSL_CHECK_HOSTNAME', True) else 'disabled (IP)'
)
celery_config['broker_use_ssl'] = ssl_opts
celery_config['redis_backend_use_ssl'] = ssl_opts
# Beat/RedBeat
if is_beat:
celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler'
celery_config['redbeat_lock_key'] = 'redbeat::lock'
celery_config['beat_max_loop_interval'] = 10
celery_app.conf.update(**celery_config)
# Queues for workers (note: Redis ignores routing_key and priority features like RabbitMQ)
if not is_beat:
celery_app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}),
Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}),
Queue('entitlements', routing_key='entitlements.#', queue_arguments={'x-max-priority': 10}),
)
celery_app.conf.task_routes = {
'eveai_workers.*': { # All tasks from eveai_workers module
'queue': 'embeddings',
'routing_key': 'embeddings.#',
},
'eveai_chat_workers.*': { # All tasks from eveai_chat_workers module
'queue': 'llm_interactions',
'routing_key': 'llm_interactions.#',
},
'eveai_entitlements.*': { # All tasks from eveai_entitlements module
'queue': 'entitlements',
'routing_key': 'entitlements.#',
}
}
# Ensure tasks execute with Flask context
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
def make_celery(app_name, config):
# keep API but return the single instance
return celery_app
def _get_current_celery():
return celery_app
current_celery = LocalProxy(_get_current_celery)