diff --git a/common/celery_config.py b/common/utils/celery_utils.py similarity index 50% rename from common/celery_config.py rename to common/utils/celery_utils.py index 7e97c2c..bd3e9dc 100644 --- a/common/celery_config.py +++ b/common/utils/celery_utils.py @@ -1,21 +1,30 @@ from celery import Celery from kombu import Queue +from werkzeug.local import LocalProxy + +celery_app = Celery() def init_celery(celery, app): + celery_app.main = app.name celery_config = { + 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), + 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), + 'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings', + 'routing_key': 'embeddings.create_embeddings'}}, } - celery.conf.update(**celery_config) + celery_app.conf.update(**celery_config) # Setting up Celery task queues - celery.conf.task_queues = ( - Queue('embeddings', routing_key='embeddings.key', queue_arguments={'x-max-priority': 10}), - Queue('llm_interactions', routing_key='llm_interactions.key', queue_arguments={'x-max-priority': 5}), + celery_app.conf.task_queues = ( + Queue('default', routing_key='task.#'), + Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}), + Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}), ) # Ensuring tasks execute with Flask application context @@ -28,5 +37,11 @@ def init_celery(celery, app): def make_celery(app_name, config): - celery = Celery(app_name, broker=config['CELERY_BROKER_URL'], backend=config['CELERY_RESULT_BACKEND']) - return celery + return celery_app + + +def _get_current_celery(): + return celery_app + + +current_celery = LocalProxy(_get_current_celery) diff --git a/config/logging_config.py b/config/logging_config.py index 4ace9c8..5cf2fb5 100644 --- a/config/logging_config.py +++ b/config/logging_config.py @@ -2,10 +2,18 @@ LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'handlers': { - 'file': { + 'file_app': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', - 'filename': 'app.log', + 'filename': 'logs/eveai_app.log', + 'maxBytes': 1024*1024*5, # 5MB + 'backupCount': 10, + 'formatter': 'standard', + }, + 'file_workers': { + 'level': 'DEBUG', + 'class': 'logging.handlers.RotatingFileHandler', + 'filename': 'logs/eveai_workers.log', 'maxBytes': 1024*1024*5, # 5MB 'backupCount': 10, 'formatter': 'standard', @@ -22,10 +30,20 @@ LOGGING = { }, }, 'loggers': { - '': { # root logger - 'handlers': ['file', 'console'], + 'eveai_app': { # logger for the eveai_app + 'handlers': ['file_app', 'console'], 'level': 'DEBUG', - 'propagate': True + 'propagate': False + }, + 'eveai_workers': { # logger for the eveai_workers + 'handlers': ['file_workers', 'console'], + 'level': 'DEBUG', + 'propagate': False + }, + '': { # root logger + 'handlers': ['console'], + 'level': 'WARNING', # Set higher level for root to minimize noise + 'propagate': False } } } \ No newline at end of file diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index 4b7149c..7b1520a 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -11,7 +11,7 @@ from common.models.user import User, Role from config.logging_config import LOGGING from common.utils.security import set_tenant_session_data from .errors import register_error_handlers -from common.celery_config import make_celery, init_celery +from common.utils.celery_utils import make_celery, init_celery def create_app(config_file=None): @@ -29,6 +29,13 @@ def create_app(config_file=None): pass logging.config.dictConfig(LOGGING) + print(__name__) + logger = logging.getLogger(__name__) + + logger.info("eveai_app starting up") + + # Register extensions + register_extensions(app) app.celery = make_celery(app.name, app.config) diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index 0b2ecfc..a047d8a 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -10,6 +10,7 @@ from common.models.document import Document, DocumentLanguage, DocumentVersion from common.extensions import db from .document_forms import AddDocumentForm from common.utils.middleware import mw_before_request +from common.utils.celery_utils import current_celery document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @@ -68,8 +69,10 @@ def add_document(): # Save the file and process the document if error is None: flash('Document added successfully.', 'success') + current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' + f'Document Version {new_doc.id}') upload_file_for_version(new_doc_vers, file, extension) - task = current_app.celery.send_task('tasks.create_embeddings', args=[ + task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, session['default_embedding_model'], diff --git a/eveai_workers/__init__.py b/eveai_workers/__init__.py new file mode 100644 index 0000000..677feb4 --- /dev/null +++ b/eveai_workers/__init__.py @@ -0,0 +1,33 @@ +import logging +import logging.config +from flask import Flask + +from common.utils.celery_utils import make_celery, init_celery +from common.extensions import db +from config.logging_config import LOGGING + + +def create_app(config_file=None): + app = Flask(__name__) + + if config_file is None: + app.config.from_object('config.config.DevConfig') + else: + app.config.from_object(config_file) + + logging.config.dictConfig(LOGGING) + register_extensions(app) + + celery = make_celery(app.name, app.config) + init_celery(celery, app) + + from . import tasks + + return app, celery + + +def register_extensions(app): + db.init_app(app) + + +app, celery = create_app() diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 98c7a73..58c235b 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -1,16 +1,18 @@ from datetime import datetime as dt, timezone as tz -from flask import current_app from langchain_community.document_loaders.unstructured import UnstructuredAPIFileLoader +from flask import current_app import os -from celery import shared_task from common.utils.database import Database from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI -from eveai_app import db +from common.extensions import db +from common.utils.celery_utils import current_celery -@shared_task(name='create_embeddings', queue='embeddings') +@current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id, default_embedding_model): + import pydevd_pycharm + pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True) current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id} ' f'with model {default_embedding_model}') @@ -45,7 +47,7 @@ def create_embeddings(tenant_id, document_version_id, default_embedding_model): api_key = current_app.config.get('UNSTRUCTURED_API_KEY') file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, - document_version.file_path) + document_version.file_name) with open(file_path, 'rb') as f: loader = UnstructuredAPIFileLoader(f, url=url, @@ -61,7 +63,7 @@ def create_embeddings(tenant_id, document_version_id, default_embedding_model): print(documents) -@shared_task(name='ask_eve_ai', queue='llm_interactions') +@current_celery.task(name='ask_eve_ai', queue='llm_interactions') def ask_eve_ai(query): # Interaction logic with LLMs like GPT (Langchain API calls, etc.) pass diff --git a/scripts/run_celery.py b/scripts/run_celery.py deleted file mode 100644 index 8a7a44c..0000000 --- a/scripts/run_celery.py +++ /dev/null @@ -1,5 +0,0 @@ -from eveai_app import create_app - -flask_app = create_app() -celery_app = flask_app.extensions['celery'] -print(flask_app.extensions) diff --git a/scripts/run_flask_server.py b/scripts/run_eveai_app.py similarity index 67% rename from scripts/run_flask_server.py rename to scripts/run_eveai_app.py index bd74a4f..6bf0c3c 100644 --- a/scripts/run_flask_server.py +++ b/scripts/run_eveai_app.py @@ -5,7 +5,7 @@ from gevent.pywsgi import WSGIServer app = create_app() if __name__ == '__main__': - print("Server starting on port 5000") - http_server = WSGIServer(('0.0.0.0', 5000), app) # Wrap up the Flask App using Gevent + print("Server starting on port 5001") + http_server = WSGIServer(('0.0.0.0', 5001), app) # Wrap up the Flask App using Gevent http_server.serve_forever() # Continuously listens for incoming requests diff --git a/scripts/run_eveai_workers.py b/scripts/run_eveai_workers.py new file mode 100644 index 0000000..e941105 --- /dev/null +++ b/scripts/run_eveai_workers.py @@ -0,0 +1,4 @@ +from eveai_workers import celery + +if __name__ == '__main__': + celery.start() diff --git a/scripts/start_embedding_queue.sh b/scripts/start_embedding_queue.sh deleted file mode 100755 index d3e6e9b..0000000 --- a/scripts/start_embedding_queue.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash -source .venv/bin/activate -celery -A app.celery_app worker --loglevel=info -Q embeddings \ No newline at end of file diff --git a/scripts/start_eveai_app.sh b/scripts/start_eveai_app.sh new file mode 100755 index 0000000..3368437 --- /dev/null +++ b/scripts/start_eveai_app.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/" || exit 1 +source "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/.venv/bin/activate" + +export PYTHONPATH="$PYTHONPATH:/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/" + +# Set flask environment variables +export FLASK_ENV=development # Use 'production' as appropriate +export FLASK_DEBUG=1 # Use 0 for production + +# Start Flask app +python scripts/run_eveai_app.py + +deactivate \ No newline at end of file diff --git a/scripts/start_eveai_workers.sh b/scripts/start_eveai_workers.sh new file mode 100755 index 0000000..26bbba3 --- /dev/null +++ b/scripts/start_eveai_workers.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/" || exit 1 +source "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/.venv/bin/activate" + +# Start a worker for the 'embeddings' queue with higher concurrency +celery -A eveai_workers.celery worker --loglevel=info -Q embeddings --autoscale=1,4 --hostname=embeddings_worker@%h & + +# Start a worker for the 'llm_interactions' queue with auto-scaling +celery -A eveai_workers.celery worker --loglevel=info - Q llm_interactions --autoscale=2,8 --hostname=interactions_worker@%h & + +# Wait for all background processes to finish +wait + +deactivate \ No newline at end of file diff --git a/scripts/start_flower.sh b/scripts/start_flower.sh index cae94e2..34cd793 100755 --- a/scripts/start_flower.sh +++ b/scripts/start_flower.sh @@ -1,3 +1,9 @@ #!/usr/bin/env bash -source .venv/bin/activate -celery -A app.celery_app flower + +cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/" || exit 1 +source "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/.venv/bin/activate" + +# on development machine, no authentication required +export FLOWER_UNAUTHENTICATED_API=True +# Start a worker for the 'embeddings' queue with higher concurrency +celery -A eveai_workers.celery flower \ No newline at end of file diff --git a/scripts/start_logdy.sh b/scripts/start_logdy.sh new file mode 100755 index 0000000..2980060 --- /dev/null +++ b/scripts/start_logdy.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +cd "/Volumes/OWC4M2_1/Dropbox/Josako's Dev/Josako/EveAI/Development/eveAI/logs/" || exit 1 + +logdy follow --full-read eveai_app.log eveai_workers.log +