Refactoring finished :-)

eveai_workers now working (with errors ;-) )
Remote debugging now available
This commit is contained in:
Josako
2024-05-07 22:51:48 +02:00
parent 494d5508ae
commit cd5afa0408
14 changed files with 147 additions and 31 deletions

View File

@@ -1,21 +1,30 @@
from celery import Celery
from kombu import Queue
from werkzeug.local import LocalProxy
celery_app = Celery()
def init_celery(celery, app):
celery_app.main = app.name
celery_config = {
'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'),
'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'),
'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']),
'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'),
'enable_utc': app.config.get('CELERY_ENABLE_UTC', True),
'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings',
'routing_key': 'embeddings.create_embeddings'}},
}
celery.conf.update(**celery_config)
celery_app.conf.update(**celery_config)
# Setting up Celery task queues
celery.conf.task_queues = (
Queue('embeddings', routing_key='embeddings.key', queue_arguments={'x-max-priority': 10}),
Queue('llm_interactions', routing_key='llm_interactions.key', queue_arguments={'x-max-priority': 5}),
celery_app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}),
Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}),
)
# Ensuring tasks execute with Flask application context
@@ -28,5 +37,11 @@ def init_celery(celery, app):
def make_celery(app_name, config):
celery = Celery(app_name, broker=config['CELERY_BROKER_URL'], backend=config['CELERY_RESULT_BACKEND'])
return celery
return celery_app
def _get_current_celery():
return celery_app
current_celery = LocalProxy(_get_current_celery)

View File

@@ -2,10 +2,18 @@ LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'file_app': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'app.log',
'filename': 'logs/eveai_app.log',
'maxBytes': 1024*1024*5, # 5MB
'backupCount': 10,
'formatter': 'standard',
},
'file_workers': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/eveai_workers.log',
'maxBytes': 1024*1024*5, # 5MB
'backupCount': 10,
'formatter': 'standard',
@@ -22,10 +30,20 @@ LOGGING = {
},
},
'loggers': {
'': { # root logger
'handlers': ['file', 'console'],
'eveai_app': { # logger for the eveai_app
'handlers': ['file_app', 'console'],
'level': 'DEBUG',
'propagate': True
'propagate': False
},
'eveai_workers': { # logger for the eveai_workers
'handlers': ['file_workers', 'console'],
'level': 'DEBUG',
'propagate': False
},
'': { # root logger
'handlers': ['console'],
'level': 'WARNING', # Set higher level for root to minimize noise
'propagate': False
}
}
}

View File

@@ -11,7 +11,7 @@ from common.models.user import User, Role
from config.logging_config import LOGGING
from common.utils.security import set_tenant_session_data
from .errors import register_error_handlers
from common.celery_config import make_celery, init_celery
from common.utils.celery_utils import make_celery, init_celery
def create_app(config_file=None):
@@ -29,6 +29,13 @@ def create_app(config_file=None):
pass
logging.config.dictConfig(LOGGING)
print(__name__)
logger = logging.getLogger(__name__)
logger.info("eveai_app starting up")
# Register extensions
register_extensions(app)
app.celery = make_celery(app.name, app.config)

View File

@@ -10,6 +10,7 @@ from common.models.document import Document, DocumentLanguage, DocumentVersion
from common.extensions import db
from .document_forms import AddDocumentForm
from common.utils.middleware import mw_before_request
from common.utils.celery_utils import current_celery
document_bp = Blueprint('document_bp', __name__, url_prefix='/document')
@@ -68,8 +69,10 @@ def add_document():
# Save the file and process the document
if error is None:
flash('Document added successfully.', 'success')
current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, '
f'Document Version {new_doc.id}')
upload_file_for_version(new_doc_vers, file, extension)
task = current_app.celery.send_task('tasks.create_embeddings', args=[
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
session['tenant']['id'],
new_doc_vers.id,
session['default_embedding_model'],

33
eveai_workers/__init__.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
import logging.config
from flask import Flask
from common.utils.celery_utils import make_celery, init_celery
from common.extensions import db
from config.logging_config import LOGGING
def create_app(config_file=None):
app = Flask(__name__)
if config_file is None:
app.config.from_object('config.config.DevConfig')
else:
app.config.from_object(config_file)
logging.config.dictConfig(LOGGING)
register_extensions(app)
celery = make_celery(app.name, app.config)
init_celery(celery, app)
from . import tasks
return app, celery
def register_extensions(app):
db.init_app(app)
app, celery = create_app()

View File

@@ -1,16 +1,18 @@
from datetime import datetime as dt, timezone as tz
from flask import current_app
from langchain_community.document_loaders.unstructured import UnstructuredAPIFileLoader
from flask import current_app
import os
from celery import shared_task
from common.utils.database import Database
from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI
from eveai_app import db
from common.extensions import db
from common.utils.celery_utils import current_celery
@shared_task(name='create_embeddings', queue='embeddings')
@current_celery.task(name='create_embeddings', queue='embeddings')
def create_embeddings(tenant_id, document_version_id, default_embedding_model):
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True)
current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id} '
f'with model {default_embedding_model}')
@@ -45,7 +47,7 @@ def create_embeddings(tenant_id, document_version_id, default_embedding_model):
api_key = current_app.config.get('UNSTRUCTURED_API_KEY')
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
document_version.file_location,
document_version.file_path)
document_version.file_name)
with open(file_path, 'rb') as f:
loader = UnstructuredAPIFileLoader(f,
url=url,
@@ -61,7 +63,7 @@ def create_embeddings(tenant_id, document_version_id, default_embedding_model):
print(documents)
@shared_task(name='ask_eve_ai', queue='llm_interactions')
@current_celery.task(name='ask_eve_ai', queue='llm_interactions')
def ask_eve_ai(query):
# Interaction logic with LLMs like GPT (Langchain API calls, etc.)
pass

View File

@@ -1,5 +0,0 @@
from eveai_app import create_app
flask_app = create_app()
celery_app = flask_app.extensions['celery']
print(flask_app.extensions)

View File

@@ -5,7 +5,7 @@ from gevent.pywsgi import WSGIServer
app = create_app()
if __name__ == '__main__':
print("Server starting on port 5000")
http_server = WSGIServer(('0.0.0.0', 5000), app) # Wrap up the Flask App using Gevent
print("Server starting on port 5001")
http_server = WSGIServer(('0.0.0.0', 5001), app) # Wrap up the Flask App using Gevent
http_server.serve_forever() # Continuously listens for incoming requests

View File

@@ -0,0 +1,4 @@
from eveai_workers import celery
if __name__ == '__main__':
celery.start()

View File

@@ -1,3 +0,0 @@
#!/usr/bin/env bash
source .venv/bin/activate
celery -A app.celery_app worker --loglevel=info -Q embeddings

15
scripts/start_eveai_app.sh Executable file
View File

@@ -0,0 +1,15 @@
#!/usr/bin/env bash
cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/" || exit 1
source "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/.venv/bin/activate"
export PYTHONPATH="$PYTHONPATH:/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/"
# Set flask environment variables
export FLASK_ENV=development # Use 'production' as appropriate
export FLASK_DEBUG=1 # Use 0 for production
# Start Flask app
python scripts/run_eveai_app.py
deactivate

15
scripts/start_eveai_workers.sh Executable file
View File

@@ -0,0 +1,15 @@
#!/usr/bin/env bash
cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/" || exit 1
source "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/.venv/bin/activate"
# Start a worker for the 'embeddings' queue with higher concurrency
celery -A eveai_workers.celery worker --loglevel=info -Q embeddings --autoscale=1,4 --hostname=embeddings_worker@%h &
# Start a worker for the 'llm_interactions' queue with auto-scaling
celery -A eveai_workers.celery worker --loglevel=info - Q llm_interactions --autoscale=2,8 --hostname=interactions_worker@%h &
# Wait for all background processes to finish
wait
deactivate

View File

@@ -1,3 +1,9 @@
#!/usr/bin/env bash
source .venv/bin/activate
celery -A app.celery_app flower
cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/" || exit 1
source "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/.venv/bin/activate"
# on development machine, no authentication required
export FLOWER_UNAUTHENTICATED_API=True
# Start a worker for the 'embeddings' queue with higher concurrency
celery -A eveai_workers.celery flower

6
scripts/start_logdy.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/logs/" || exit 1
logdy follow --full-read eveai_app.log eveai_workers.log