From 131c609e68d1edac817788108e70050054350b2d Mon Sep 17 00:00:00 2001 From: Josako Date: Mon, 6 May 2024 23:07:45 +0200 Subject: [PATCH] Refactoring part 2 Necessary changes to ensure correct working of eveai_app --- common/celery_config.py | 32 +++++++++++++++++++++++++++++++ common/utils/security.py | 2 +- config/config.py | 15 +-------------- eveai_app/__init__.py | 10 ++++++---- eveai_app/views/document_views.py | 14 ++++++++------ eveai_app/views/user_forms.py | 2 +- eveai_app/views/user_views.py | 2 +- eveai_workers/tasks.py | 2 +- scripts/run_flask_server.py | 1 - 9 files changed, 51 insertions(+), 29 deletions(-) create mode 100644 common/celery_config.py diff --git a/common/celery_config.py b/common/celery_config.py new file mode 100644 index 0000000..7e97c2c --- /dev/null +++ b/common/celery_config.py @@ -0,0 +1,32 @@ +from celery import Celery +from kombu import Queue + + +def init_celery(celery, app): + celery_config = { + 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), + 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), + 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), + 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), + 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), + } + celery.conf.update(**celery_config) + + # Setting up Celery task queues + celery.conf.task_queues = ( + Queue('embeddings', routing_key='embeddings.key', queue_arguments={'x-max-priority': 10}), + Queue('llm_interactions', routing_key='llm_interactions.key', queue_arguments={'x-max-priority': 5}), + ) + + # Ensuring tasks execute with Flask application context + class ContextTask(celery.Task): + def __call__(self, *args, **kwargs): + with app.app_context(): + return self.run(*args, **kwargs) + + celery.Task = ContextTask + + +def make_celery(app_name, config): + celery = Celery(app_name, broker=config['CELERY_BROKER_URL'], backend=config['CELERY_RESULT_BACKEND']) + return celery diff --git a/common/utils/security.py b/common/utils/security.py index 823cd56..536f2bc 100644 --- a/common/utils/security.py +++ b/common/utils/security.py @@ -1,5 +1,5 @@ from flask import session -from common.models import User, Tenant +from common.models.user import Tenant # Definition of Trigger Handlers diff --git a/config/config.py b/config/config.py index c5613a1..b20f5cf 100644 --- a/config/config.py +++ b/config/config.py @@ -46,24 +46,11 @@ class Config(object): # Celery settings CELERY_TASK_SERIALIZER = 'json' + CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TIMEZONE = 'UTC' CELERY_ENABLE_UTC = True - # Define multiple queues - CELERY_TASK_QUEUES = { - 'embeddings': { - 'exchange': 'embeddings', - 'routing_key': 'embeddings.key', - 'queue_arguments': {'x-max-priority': 10} - }, - 'llm_interactions': { - 'exchange': 'llm_interactions', - 'routing_key': 'llm_interactions.key', - 'queue_arguments': {'x-max-priority': 5} - } - } - class DevConfig(Config): DEVELOPMENT = True diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index 72f19a6..d8d3796 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -11,8 +11,7 @@ from common.models.user import User, Role from config.logging_config import LOGGING from common.utils.security import set_tenant_session_data from .errors import register_error_handlers -from eveai_workers.celery_utils import init_celery - +from common.celery_config import make_celery, init_celery def create_app(config_file=None): app = Flask(__name__) @@ -31,8 +30,11 @@ def create_app(config_file=None): logging.config.dictConfig(LOGGING) register_extensions(app) - # Initialize celery - init_celery(app) + app.celery = make_celery(app.name, app.config) + init_celery(app.celery, app) + + print(app.celery.conf.broker_url) + print(app.celery.conf.result_backend) # Setup Flask-Security-Too user_datastore = SQLAlchemyUserDatastore(db, User, Role) diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index adccb5d..0b2ecfc 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -6,11 +6,10 @@ from sqlalchemy import desc from sqlalchemy.orm import joinedload from werkzeug.utils import secure_filename -from common.models import Document, DocumentLanguage, DocumentVersion +from common.models.document import Document, DocumentLanguage, DocumentVersion from common.extensions import db from .document_forms import AddDocumentForm from common.utils.middleware import mw_before_request -from eveai_workers.tasks import create_embeddings document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @@ -70,11 +69,14 @@ def add_document(): if error is None: flash('Document added successfully.', 'success') upload_file_for_version(new_doc_vers, file, extension) - create_embeddings.delay(tenant_id=session['tenant']['id'], - document_version_id=new_doc_vers.id, - default_embedding_model=session['default_embedding_model']) + task = current_app.celery.send_task('tasks.create_embeddings', args=[ + session['tenant']['id'], + new_doc_vers.id, + session['default_embedding_model'], + ]) current_app.logger.info(f'Document processing started for tenant {session["tenant"]["id"]}, ' - f'Document Version {new_doc_vers.id}') + f'Document Version {new_doc_vers.id}, ' + f'Task ID {task.id}') print('Processing should start soon') else: flash('Error adding document.', 'error') diff --git a/eveai_app/views/user_forms.py b/eveai_app/views/user_forms.py index 41dda86..302c557 100644 --- a/eveai_app/views/user_forms.py +++ b/eveai_app/views/user_forms.py @@ -3,7 +3,7 @@ from flask_wtf import FlaskForm from wtforms import (StringField, PasswordField, BooleanField, SubmitField, EmailField, IntegerField, DateField, SelectField, SelectMultipleField, FieldList, FormField) from wtforms.validators import DataRequired, Length, Email, NumberRange, Optional -from common.models import Role +from common.models.user import Role class TenantForm(FlaskForm): diff --git a/eveai_app/views/user_views.py b/eveai_app/views/user_views.py index 7e28888..715bce8 100644 --- a/eveai_app/views/user_views.py +++ b/eveai_app/views/user_views.py @@ -4,7 +4,7 @@ from datetime import datetime as dt, timezone as tz from flask import request, redirect, url_for, flash, render_template, Blueprint, session, current_app from flask_security import hash_password, roles_required, roles_accepted -from common.models import User, Tenant, Role +from common.models.user import User, Tenant, Role from common.extensions import db from .user_forms import TenantForm, CreateUserForm, EditUserForm from common.utils.database import Database diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 1b99f88..98c7a73 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -5,7 +5,7 @@ import os from celery import shared_task from common.utils.database import Database -from common.models import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI +from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI from eveai_app import db diff --git a/scripts/run_flask_server.py b/scripts/run_flask_server.py index a3f8786..bd74a4f 100644 --- a/scripts/run_flask_server.py +++ b/scripts/run_flask_server.py @@ -3,7 +3,6 @@ from gevent.pywsgi import WSGIServer app = create_app() -celery_app = app.extensions['celery'] if __name__ == '__main__': print("Server starting on port 5000")