from celery import Celery from kombu import Queue from werkzeug.local import LocalProxy from redbeat import RedBeatScheduler celery_app = Celery() def init_celery(celery, app, is_beat=False): celery_app.main = app.name app.logger.debug(f'CELERY_BROKER_URL: {app.config["CELERY_BROKER_URL"]}') app.logger.debug(f'CELERY_RESULT_BACKEND: {app.config["CELERY_RESULT_BACKEND"]}') celery_config = { 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), } if is_beat: # Add configurations specific to Beat scheduler celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler' celery_config['redbeat_lock_key'] = 'redbeat::lock' celery_config['beat_max_loop_interval'] = 10 # Adjust as needed celery_app.conf.update(**celery_config) # Task queues for workers only if not is_beat: celery_app.conf.task_queues = ( Queue('default', routing_key='task.#'), Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}), Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}), Queue('entitlements', routing_key='entitlements.#', queue_arguments={'x-max-priority': 10}), ) celery_app.conf.task_routes = { 'eveai_workers.*': { # All tasks from eveai_workers module 'queue': 'embeddings', 'routing_key': 'embeddings.#', }, 'eveai_chat_workers.*': { # All tasks from eveai_chat_workers module 'queue': 'llm_interactions', 'routing_key': 'llm_interactions.#', }, 'eveai_entitlements.*': { # All tasks from eveai_entitlements module 'queue': 'entitlements', 'routing_key': 'entitlements.#', } } # Ensure tasks execute with Flask context class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask # Original init_celery before updating for beat # def init_celery(celery, app): # celery_app.main = app.name # app.logger.debug(f'CELERY_BROKER_URL: {app.config["CELERY_BROKER_URL"]}') # app.logger.debug(f'CELERY_RESULT_BACKEND: {app.config["CELERY_RESULT_BACKEND"]}') # celery_config = { # 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), # 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), # 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), # 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), # 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), # 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), # 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), # 'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings', # 'routing_key': 'embeddings.create_embeddings'}}, # } # celery_app.conf.update(**celery_config) # # # Setting up Celery task queues # celery_app.conf.task_queues = ( # Queue('default', routing_key='task.#'), # Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}), # Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}), # ) # # # Ensuring tasks execute with Flask application context # class ContextTask(celery.Task): # def __call__(self, *args, **kwargs): # with app.app_context(): # return self.run(*args, **kwargs) # # celery.Task = ContextTask def make_celery(app_name, config): return celery_app def _get_current_celery(): return celery_app current_celery = LocalProxy(_get_current_celery)