Refactoring part 2

Necessary changes to ensure correct working of eveai_app
This commit is contained in:
Josako
2024-05-06 23:07:45 +02:00
parent 8e5ad5f312
commit 131c609e68
9 changed files with 51 additions and 29 deletions

32
common/celery_config.py Normal file
View File

@@ -0,0 +1,32 @@
from celery import Celery
from kombu import Queue
def init_celery(celery, app):
celery_config = {
'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'),
'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'),
'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']),
'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'),
'enable_utc': app.config.get('CELERY_ENABLE_UTC', True),
}
celery.conf.update(**celery_config)
# Setting up Celery task queues
celery.conf.task_queues = (
Queue('embeddings', routing_key='embeddings.key', queue_arguments={'x-max-priority': 10}),
Queue('llm_interactions', routing_key='llm_interactions.key', queue_arguments={'x-max-priority': 5}),
)
# Ensuring tasks execute with Flask application context
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
def make_celery(app_name, config):
celery = Celery(app_name, broker=config['CELERY_BROKER_URL'], backend=config['CELERY_RESULT_BACKEND'])
return celery

View File

@@ -1,5 +1,5 @@
from flask import session
from common.models import User, Tenant
from common.models.user import Tenant
# Definition of Trigger Handlers

View File

@@ -46,24 +46,11 @@ class Config(object):
# Celery settings
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
# Define multiple queues
CELERY_TASK_QUEUES = {
'embeddings': {
'exchange': 'embeddings',
'routing_key': 'embeddings.key',
'queue_arguments': {'x-max-priority': 10}
},
'llm_interactions': {
'exchange': 'llm_interactions',
'routing_key': 'llm_interactions.key',
'queue_arguments': {'x-max-priority': 5}
}
}
class DevConfig(Config):
DEVELOPMENT = True

View File

@@ -11,8 +11,7 @@ from common.models.user import User, Role
from config.logging_config import LOGGING
from common.utils.security import set_tenant_session_data
from .errors import register_error_handlers
from eveai_workers.celery_utils import init_celery
from common.celery_config import make_celery, init_celery
def create_app(config_file=None):
app = Flask(__name__)
@@ -31,8 +30,11 @@ def create_app(config_file=None):
logging.config.dictConfig(LOGGING)
register_extensions(app)
# Initialize celery
init_celery(app)
app.celery = make_celery(app.name, app.config)
init_celery(app.celery, app)
print(app.celery.conf.broker_url)
print(app.celery.conf.result_backend)
# Setup Flask-Security-Too
user_datastore = SQLAlchemyUserDatastore(db, User, Role)

View File

@@ -6,11 +6,10 @@ from sqlalchemy import desc
from sqlalchemy.orm import joinedload
from werkzeug.utils import secure_filename
from common.models import Document, DocumentLanguage, DocumentVersion
from common.models.document import Document, DocumentLanguage, DocumentVersion
from common.extensions import db
from .document_forms import AddDocumentForm
from common.utils.middleware import mw_before_request
from eveai_workers.tasks import create_embeddings
document_bp = Blueprint('document_bp', __name__, url_prefix='/document')
@@ -70,11 +69,14 @@ def add_document():
if error is None:
flash('Document added successfully.', 'success')
upload_file_for_version(new_doc_vers, file, extension)
create_embeddings.delay(tenant_id=session['tenant']['id'],
document_version_id=new_doc_vers.id,
default_embedding_model=session['default_embedding_model'])
task = current_app.celery.send_task('tasks.create_embeddings', args=[
session['tenant']['id'],
new_doc_vers.id,
session['default_embedding_model'],
])
current_app.logger.info(f'Document processing started for tenant {session["tenant"]["id"]}, '
f'Document Version {new_doc_vers.id}')
f'Document Version {new_doc_vers.id}, '
f'Task ID {task.id}')
print('Processing should start soon')
else:
flash('Error adding document.', 'error')

View File

@@ -3,7 +3,7 @@ from flask_wtf import FlaskForm
from wtforms import (StringField, PasswordField, BooleanField, SubmitField, EmailField, IntegerField, DateField,
SelectField, SelectMultipleField, FieldList, FormField)
from wtforms.validators import DataRequired, Length, Email, NumberRange, Optional
from common.models import Role
from common.models.user import Role
class TenantForm(FlaskForm):

View File

@@ -4,7 +4,7 @@ from datetime import datetime as dt, timezone as tz
from flask import request, redirect, url_for, flash, render_template, Blueprint, session, current_app
from flask_security import hash_password, roles_required, roles_accepted
from common.models import User, Tenant, Role
from common.models.user import User, Tenant, Role
from common.extensions import db
from .user_forms import TenantForm, CreateUserForm, EditUserForm
from common.utils.database import Database

View File

@@ -5,7 +5,7 @@ import os
from celery import shared_task
from common.utils.database import Database
from common.models import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI
from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI
from eveai_app import db

View File

@@ -3,7 +3,6 @@ from gevent.pywsgi import WSGIServer
app = create_app()
celery_app = app.extensions['celery']
if __name__ == '__main__':
print("Server starting on port 5000")