From 9f5f090f0cdcdb8c0b9a71a306fa09f5c9b30e44 Mon Sep 17 00:00:00 2001 From: Josako Date: Fri, 11 Oct 2024 16:33:36 +0200 Subject: [PATCH] - License Usage Calculation realised - View License Usages - Celery Beat container added - First schedule in Celery Beat for calculating usage (hourly) - repopack can now split for different components - Various fixes as consequece of changing file_location / file_name ==> bucket_name / object_name - Celery Routing / Queuing updated --- .gitignore | 1 + .repopackignore => .repopackignore_base | 5 +- .repopackignore_components | 12 +++ .repopackignore_docker | 12 +++ .repopackignore_eveai_api | 11 +++ .repopackignore_eveai_app | 11 +++ .repopackignore_eveai_beat | 11 +++ .repopackignore_eveai_chat | 11 +++ .repopackignore_eveai_chat_workers | 11 +++ .repopackignore_eveai_entitlements | 11 +++ .repopackignore_eveai_workers | 11 +++ .repopackignore_full | 4 + .repopackignore_integrations | 13 +++ .repopackignore_nginx | 11 +++ CHANGELOG.md | 15 +++ common/models/document.py | 6 -- common/models/entitlements.py | 4 +- common/utils/celery_utils.py | 76 ++++++++++++-- common/utils/document_utils.py | 19 ++-- common/utils/minio_utils.py | 4 +- common/utils/nginx_utils.py | 1 - config/logging_config.py | 26 +++++ docker/compose_dev.yaml | 53 ++++++++++ docker/compose_stackhero.yaml | 22 +++++ docker/eveai_beat/Dockerfile | 65 ++++++++++++ docker/eveai_entitlements/Dockerfile | 69 +++++++++++++ eveai_api/views/healthz_views.py | 2 +- eveai_app/__init__.py | 2 + eveai_app/temp | 1 - .../administration/trigger_actions.html | 22 +++++ .../templates/document/document_versions.html | 2 +- .../templates/entitlements/view_usages.html | 28 ++++++ eveai_app/templates/navbar.html | 2 + eveai_app/views/administration_forms.py | 7 ++ eveai_app/views/administration_views.py | 39 ++++++++ eveai_app/views/document_views.py | 11 +-- eveai_app/views/entitlements_views.py | 32 ++++-- eveai_beat/__init__.py | 44 +++++++++ eveai_beat/schedule.py | 17 ++++ eveai_chat/socket_handlers/chat_handler.py | 18 ++-- eveai_entitlements/tasks.py | 99 ++++++++++++------- eveai_workers/Processors/audio_processor.py | 6 +- eveai_workers/Processors/html_processor.py | 6 +- eveai_workers/Processors/pdf_processor.py | 6 +- eveai_workers/Processors/srt_processor.py | 6 +- eveai_workers/__init__.py | 1 + eveai_workers/tasks.py | 59 +++++------ ..._set_storage_dirty_flag_for_all_tenants.py | 24 +++++ ...3_licenseusage_correct_mb_fields_to_be_.py | 46 +++++++++ migrations/tenant/env.py | 45 +++++---- ..._documentversion_update_to_bucket_name_.py | 2 +- requirements.txt | 1 + scripts/entrypoint_no_db.sh | 10 ++ scripts/repopack_eveai.sh | 42 +++++--- scripts/start_eveai_beat.sh | 17 ++++ scripts/start_eveai_entitlements.sh | 17 ++++ scripts/start_flower.sh | 0 57 files changed, 935 insertions(+), 174 deletions(-) rename .repopackignore => .repopackignore_base (91%) create mode 100644 .repopackignore_components create mode 100644 .repopackignore_docker create mode 100644 .repopackignore_eveai_api create mode 100644 .repopackignore_eveai_app create mode 100644 .repopackignore_eveai_beat create mode 100644 .repopackignore_eveai_chat create mode 100644 .repopackignore_eveai_chat_workers create mode 100644 .repopackignore_eveai_entitlements create mode 100644 .repopackignore_eveai_workers create mode 100644 .repopackignore_full create mode 100644 .repopackignore_integrations create mode 100644 .repopackignore_nginx create mode 100644 docker/eveai_beat/Dockerfile create mode 100644 docker/eveai_entitlements/Dockerfile delete mode 100644 eveai_app/temp create mode 100644 eveai_app/templates/administration/trigger_actions.html create mode 100644 eveai_app/templates/entitlements/view_usages.html create mode 100644 eveai_app/views/administration_forms.py create mode 100644 eveai_app/views/administration_views.py create mode 100644 eveai_beat/__init__.py create mode 100644 eveai_beat/schedule.py create mode 100644 migrations/public/versions/02debd224316_set_storage_dirty_flag_for_all_tenants.py create mode 100644 migrations/public/versions/a678c84d5633_licenseusage_correct_mb_fields_to_be_.py create mode 100755 scripts/entrypoint_no_db.sh create mode 100755 scripts/start_eveai_beat.sh create mode 100755 scripts/start_eveai_entitlements.sh mode change 100644 => 100755 scripts/start_flower.sh diff --git a/.gitignore b/.gitignore index 7dd3f01..7e7f193 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,4 @@ migrations/public/.DS_Store scripts/.DS_Store scripts/__pycache__/run_eveai_app.cpython-312.pyc /eveai_repo.txt +*repo.txt diff --git a/.repopackignore b/.repopackignore_base similarity index 91% rename from .repopackignore rename to .repopackignore_base index a2a3938..01e4b4e 100644 --- a/.repopackignore +++ b/.repopackignore_base @@ -15,7 +15,6 @@ migrations/ nginx/mime.types *.gitignore* .python-version -.repopackignore +.repopackignore* repopack.config.json - - +*repo.txt diff --git a/.repopackignore_components b/.repopackignore_components new file mode 100644 index 0000000..309c6ad --- /dev/null +++ b/.repopackignore_components @@ -0,0 +1,12 @@ +docker/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_docker b/.repopackignore_docker new file mode 100644 index 0000000..909a1d6 --- /dev/null +++ b/.repopackignore_docker @@ -0,0 +1,12 @@ +common/ +config/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +nginx/ diff --git a/.repopackignore_eveai_api b/.repopackignore_eveai_api new file mode 100644 index 0000000..900294e --- /dev/null +++ b/.repopackignore_eveai_api @@ -0,0 +1,11 @@ +docker/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_eveai_app b/.repopackignore_eveai_app new file mode 100644 index 0000000..79f8305 --- /dev/null +++ b/.repopackignore_eveai_app @@ -0,0 +1,11 @@ +docker/ +eveai_api/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_eveai_beat b/.repopackignore_eveai_beat new file mode 100644 index 0000000..c12c4a2 --- /dev/null +++ b/.repopackignore_eveai_beat @@ -0,0 +1,11 @@ +docker/ +eveai_api/ +eveai_app/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_eveai_chat b/.repopackignore_eveai_chat new file mode 100644 index 0000000..4ed2168 --- /dev/null +++ b/.repopackignore_eveai_chat @@ -0,0 +1,11 @@ +docker/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_eveai_chat_workers b/.repopackignore_eveai_chat_workers new file mode 100644 index 0000000..58d11e4 --- /dev/null +++ b/.repopackignore_eveai_chat_workers @@ -0,0 +1,11 @@ +docker/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_eveai_entitlements b/.repopackignore_eveai_entitlements new file mode 100644 index 0000000..40c17f6 --- /dev/null +++ b/.repopackignore_eveai_entitlements @@ -0,0 +1,11 @@ +docker/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_workers/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_eveai_workers b/.repopackignore_eveai_workers new file mode 100644 index 0000000..c8e225d --- /dev/null +++ b/.repopackignore_eveai_workers @@ -0,0 +1,11 @@ +docker/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +instance/ +integrations/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_full b/.repopackignore_full new file mode 100644 index 0000000..20dc5a1 --- /dev/null +++ b/.repopackignore_full @@ -0,0 +1,4 @@ +docker +integrations +nginx +scripts \ No newline at end of file diff --git a/.repopackignore_integrations b/.repopackignore_integrations new file mode 100644 index 0000000..c3edcbf --- /dev/null +++ b/.repopackignore_integrations @@ -0,0 +1,13 @@ +common/ +config/ +docker/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +nginx/ +scripts/ \ No newline at end of file diff --git a/.repopackignore_nginx b/.repopackignore_nginx new file mode 100644 index 0000000..874ab13 --- /dev/null +++ b/.repopackignore_nginx @@ -0,0 +1,11 @@ +docker/ +eveai_api/ +eveai_app/ +eveai_beat/ +eveai_chat/ +eveai_chat_workers/ +eveai_entitlements/ +eveai_workers/ +instance/ +integrations/ +scripts/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e03d88..a72bbd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security - In case of vulnerabilities. +## [1.0.11-alfa] + +### Added +- License Usage Calculation realised +- View License Usages +- Celery Beat container added +- First schedule in Celery Beat for calculating usage (hourly) + +### Changed +- repopack can now split for different components + +### Fixed +- Various fixes as consequece of changing file_location / file_name ==> bucket_name / object_name +- Celery Routing / Queuing updated + ## [1.0.10-alfa] ### Added diff --git a/common/models/document.py b/common/models/document.py index 259ede7..1eb9243 100644 --- a/common/models/document.py +++ b/common/models/document.py @@ -56,12 +56,6 @@ class DocumentVersion(db.Model): def __repr__(self): return f".{self.id}>" - def calc_file_location(self): - return f"{self.document.tenant_id}/{self.document.id}/{self.language}" - - def calc_file_name(self): - return f"{self.id}.{self.file_type}" - class Embedding(db.Model): __tablename__ = 'embeddings' diff --git a/common/models/entitlements.py b/common/models/entitlements.py index d87d63b..cf3dc26 100644 --- a/common/models/entitlements.py +++ b/common/models/entitlements.py @@ -94,8 +94,8 @@ class LicenseUsage(db.Model): id = db.Column(db.Integer, primary_key=True) license_id = db.Column(db.Integer, db.ForeignKey('public.license.id'), nullable=False) tenant_id = db.Column(db.Integer, db.ForeignKey('public.tenant.id'), nullable=False) - storage_mb_used = db.Column(db.Integer, default=0) - embedding_mb_used = db.Column(db.Integer, default=0) + storage_mb_used = db.Column(db.Float, default=0) + embedding_mb_used = db.Column(db.Float, default=0) embedding_prompt_tokens_used = db.Column(db.Integer, default=0) embedding_completion_tokens_used = db.Column(db.Integer, default=0) embedding_total_tokens_used = db.Column(db.Integer, default=0) diff --git a/common/utils/celery_utils.py b/common/utils/celery_utils.py index 8224fbb..6b2ef0b 100644 --- a/common/utils/celery_utils.py +++ b/common/utils/celery_utils.py @@ -1,14 +1,16 @@ from celery import Celery from kombu import Queue from werkzeug.local import LocalProxy +from redbeat import RedBeatScheduler celery_app = Celery() -def init_celery(celery, app): +def init_celery(celery, app, is_beat=False): celery_app.main = app.name app.logger.debug(f'CELERY_BROKER_URL: {app.config["CELERY_BROKER_URL"]}') app.logger.debug(f'CELERY_RESULT_BACKEND: {app.config["CELERY_RESULT_BACKEND"]}') + celery_config = { 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), @@ -17,19 +19,40 @@ def init_celery(celery, app): 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), - 'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings', - 'routing_key': 'embeddings.create_embeddings'}}, } + + if is_beat: + # Add configurations specific to Beat scheduler + celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler' + celery_config['redbeat_lock_key'] = 'redbeat::lock' + celery_config['beat_max_loop_interval'] = 10 # Adjust as needed + celery_app.conf.update(**celery_config) - # Setting up Celery task queues - celery_app.conf.task_queues = ( - Queue('default', routing_key='task.#'), - Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}), - Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}), - ) + # Task queues for workers only + if not is_beat: + celery_app.conf.task_queues = ( + Queue('default', routing_key='task.#'), + Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}), + Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}), + Queue('entitlements', routing_key='entitlements.#', queue_arguments={'x-max-priority': 10}), + ) + celery_app.conf.task_routes = { + 'eveai_workers.*': { # All tasks from eveai_workers module + 'queue': 'embeddings', + 'routing_key': 'embeddings.#', + }, + 'eveai_chat_workers.*': { # All tasks from eveai_chat_workers module + 'queue': 'llm_interactions', + 'routing_key': 'llm_interactions.#', + }, + 'eveai_entitlements.*': { # All tasks from eveai_entitlements module + 'queue': 'entitlements', + 'routing_key': 'entitlements.#', + } + } - # Ensuring tasks execute with Flask application context + # Ensure tasks execute with Flask context class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): @@ -37,6 +60,39 @@ def init_celery(celery, app): celery.Task = ContextTask +# Original init_celery before updating for beat +# def init_celery(celery, app): +# celery_app.main = app.name +# app.logger.debug(f'CELERY_BROKER_URL: {app.config["CELERY_BROKER_URL"]}') +# app.logger.debug(f'CELERY_RESULT_BACKEND: {app.config["CELERY_RESULT_BACKEND"]}') +# celery_config = { +# 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), +# 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), +# 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), +# 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), +# 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), +# 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), +# 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), +# 'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings', +# 'routing_key': 'embeddings.create_embeddings'}}, +# } +# celery_app.conf.update(**celery_config) +# +# # Setting up Celery task queues +# celery_app.conf.task_queues = ( +# Queue('default', routing_key='task.#'), +# Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}), +# Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}), +# ) +# +# # Ensuring tasks execute with Flask application context +# class ContextTask(celery.Task): +# def __call__(self, *args, **kwargs): +# with app.app_context(): +# return self.run(*args, **kwargs) +# +# celery.Task = ContextTask + def make_celery(app_name, config): return celery_app diff --git a/common/utils/document_utils.py b/common/utils/document_utils.py index 17783ae..f73fb7b 100644 --- a/common/utils/document_utils.py +++ b/common/utils/document_utils.py @@ -99,12 +99,12 @@ def upload_file_for_version(doc_vers, file, extension, tenant_id): doc_vers.doc_id, doc_vers.language, doc_vers.id, - doc_vers.file_name, + f"{doc_vers.id}.{extension}", file ) doc_vers.bucket_name = bn doc_vers.object_name = on - doc_vers.file_size_mb = size / 1048576 # Convert bytes to MB + doc_vers.file_size = size / 1048576 # Convert bytes to MB db.session.commit() current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for ' @@ -222,10 +222,9 @@ def process_multiple_urls(urls, tenant_id, api_input): def start_embedding_task(tenant_id, doc_vers_id): - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - tenant_id, - doc_vers_id, - ]) + task = current_celery.send_task('create_embeddings', + args=[tenant_id, doc_vers_id,], + queue='embeddings') current_app.logger.info(f'Embedding creation started for tenant {tenant_id}, ' f'Document Version {doc_vers_id}. ' f'Embedding creation task: {task.id}') @@ -321,16 +320,16 @@ def refresh_document_with_info(doc_id, api_input): upload_file_for_version(new_doc_vers, file_content, extension, doc.tenant_id) - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - doc.tenant_id, - new_doc_vers.id, - ]) + task = current_celery.send_task('create_embeddings', args=[doc.tenant_id, new_doc_vers.id,], queue='embeddings') + current_app.logger.info(f'Embedding creation started for document {doc_id} on version {new_doc_vers.id} ' + f'with task id: {task.id}.') return new_doc_vers, task.id # Update the existing refresh_document function to use the new refresh_document_with_info def refresh_document(doc_id): + current_app.logger.info(f'Refreshing document {doc_id}') doc = Document.query.get_or_404(doc_id) old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first() diff --git a/common/utils/minio_utils.py b/common/utils/minio_utils.py index caec259..8c750cf 100644 --- a/common/utils/minio_utils.py +++ b/common/utils/minio_utils.py @@ -54,9 +54,7 @@ class MinioClient: except S3Error as err: raise Exception(f"Error occurred while uploading file: {err}") - def download_document_file(self, tenant_id, document_id, language, version_id, filename): - bucket_name = self.generate_bucket_name(tenant_id) - object_name = self.generate_object_name(document_id, language, version_id, filename) + def download_document_file(self, tenant_id, bucket_name, object_name): try: response = self.client.get_object(bucket_name, object_name) return response.read() diff --git a/common/utils/nginx_utils.py b/common/utils/nginx_utils.py index dab1ecb..c20d586 100644 --- a/common/utils/nginx_utils.py +++ b/common/utils/nginx_utils.py @@ -6,7 +6,6 @@ def prefixed_url_for(endpoint, **values): prefix = request.headers.get('X-Forwarded-Prefix', '') scheme = request.headers.get('X-Forwarded-Proto', request.scheme) host = request.headers.get('Host', request.host) - current_app.logger.debug(f'prefix: {prefix}, scheme: {scheme}, host: {host}') external = values.pop('_external', False) generated_url = url_for(endpoint, **values) diff --git a/config/logging_config.py b/config/logging_config.py index 7bffdeb..0378ee7 100644 --- a/config/logging_config.py +++ b/config/logging_config.py @@ -73,6 +73,22 @@ LOGGING = { 'backupCount': 10, 'formatter': 'standard', }, + 'file_beat': { + 'level': 'DEBUG', + 'class': 'logging.handlers.RotatingFileHandler', + 'filename': 'logs/eveai_beat.log', + 'maxBytes': 1024 * 1024 * 5, # 5MB + 'backupCount': 10, + 'formatter': 'standard', + }, + 'file_entitlements': { + 'level': 'DEBUG', + 'class': 'logging.handlers.RotatingFileHandler', + 'filename': 'logs/eveai_entitlements.log', + 'maxBytes': 1024 * 1024 * 5, # 5MB + 'backupCount': 10, + 'formatter': 'standard', + }, 'file_sqlalchemy': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', @@ -172,6 +188,16 @@ LOGGING = { 'level': 'DEBUG', 'propagate': False }, + 'eveai_beat': { # logger for the eveai_beat + 'handlers': ['file_beat', 'graylog', ] if env == 'production' else ['file_beat', ], + 'level': 'DEBUG', + 'propagate': False + }, + 'eveai_entitlements': { # logger for the eveai_entitlements + 'handlers': ['file_entitlements', 'graylog', ] if env == 'production' else ['file_entitlements', ], + 'level': 'DEBUG', + 'propagate': False + }, 'sqlalchemy.engine': { # logger for the sqlalchemy 'handlers': ['file_sqlalchemy', 'graylog', ] if env == 'production' else ['file_sqlalchemy', ], 'level': 'DEBUG', diff --git a/docker/compose_dev.yaml b/docker/compose_dev.yaml index d0d3c64..18fc5c8 100644 --- a/docker/compose_dev.yaml +++ b/docker/compose_dev.yaml @@ -231,6 +231,59 @@ services: networks: - eveai-network + eveai_beat: + image: josakola/eveai_beat:latest + build: + context: .. + dockerfile: ./docker/eveai_beat/Dockerfile + platforms: + - linux/amd64 + - linux/arm64 + environment: + <<: *common-variables + COMPONENT_NAME: eveai_beat + volumes: + - ../eveai_beat:/app/eveai_beat + - ../common:/app/common + - ../config:/app/config + - ../scripts:/app/scripts + - ../patched_packages:/app/patched_packages + - eveai_logs:/app/logs + depends_on: + redis: + condition: service_healthy + networks: + - eveai-network + + eveai_entitlements: + image: josakola/eveai_entitlements:latest + build: + context: .. + dockerfile: ./docker/eveai_entitlements/Dockerfile + platforms: + - linux/amd64 + - linux/arm64 + environment: + <<: *common-variables + COMPONENT_NAME: eveai_entitlements + volumes: + - ../eveai_entitlements:/app/eveai_entitlements + - ../common:/app/common + - ../config:/app/config + - ../scripts:/app/scripts + - ../patched_packages:/app/patched_packages + - eveai_logs:/app/logs + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + minio: + condition: service_healthy + networks: + - eveai-network + + db: hostname: db image: ankane/pgvector diff --git a/docker/compose_stackhero.yaml b/docker/compose_stackhero.yaml index e72ecea..d1c47da 100644 --- a/docker/compose_stackhero.yaml +++ b/docker/compose_stackhero.yaml @@ -145,6 +145,28 @@ services: networks: - eveai-network + eveai_beat: + platform: linux/amd64 + image: josakola/eveai_beat:latest + environment: + <<: *common-variables + COMPONENT_NAME: eveai_beat + volumes: + - eveai_logs:/app/logs + networks: + - eveai-network + + eveai_entitlements: + platform: linux/amd64 + image: josakola/eveai_entitlements:latest + environment: + <<: *common-variables + COMPONENT_NAME: eveai_entitlements + volumes: + - eveai_logs:/app/logs + networks: + - eveai-network + flower: image: josakola/flower:latest environment: diff --git a/docker/eveai_beat/Dockerfile b/docker/eveai_beat/Dockerfile new file mode 100644 index 0000000..d55810b --- /dev/null +++ b/docker/eveai_beat/Dockerfile @@ -0,0 +1,65 @@ +ARG PYTHON_VERSION=3.12.3 +FROM python:${PYTHON_VERSION}-slim as base + +# Prevents Python from writing pyc files. +ENV PYTHONDONTWRITEBYTECODE=1 + +# Keeps Python from buffering stdout and stderr to avoid situations where +# the application crashes without emitting any logs due to buffering. +ENV PYTHONUNBUFFERED=1 + +# Create directory for patched packages and set permissions +RUN mkdir -p /app/patched_packages && \ + chmod 777 /app/patched_packages + +# Ensure patches are applied to the application. +ENV PYTHONPATH=/app/patched_packages:$PYTHONPATH + +WORKDIR /app + +# Create a non-privileged user that the app will run under. +# See https://docs.docker.com/go/dockerfile-user-best-practices/ +ARG UID=10001 +RUN adduser \ + --disabled-password \ + --gecos "" \ + --home "/nonexistent" \ + --shell "/bin/bash" \ + --no-create-home \ + --uid "${UID}" \ + appuser + +# Install necessary packages and build tools +#RUN apt-get update && apt-get install -y \ +# build-essential \ +# gcc \ +# && apt-get clean \ +# && rm -rf /var/lib/apt/lists/* + +# Create logs directory and set permissions +RUN mkdir -p /app/logs && chown -R appuser:appuser /app/logs + +# Install Python dependencies. + +# Download dependencies as a separate step to take advantage of Docker's caching. +# Leverage a cache mount to /root/.cache/pip to speed up subsequent builds. +# Leverage a bind mount to requirements.txt to avoid having to copy them into +# into this layer. + +COPY requirements.txt /app/ +RUN python -m pip install -r /app/requirements.txt + +# Copy the source code into the container. +COPY eveai_beat /app/eveai_beat +COPY common /app/common +COPY config /app/config +COPY scripts /app/scripts +COPY patched_packages /app/patched_packages +COPY --chown=root:root scripts/entrypoint_no_db.sh /app/scripts/ + +# Set ownership of the application directory to the non-privileged user +RUN chown -R appuser:appuser /app + +# Set entrypoint and command +ENTRYPOINT ["/app/scripts/entrypoint_no_db.sh"] +CMD ["/app/scripts/start_eveai_beat.sh"] diff --git a/docker/eveai_entitlements/Dockerfile b/docker/eveai_entitlements/Dockerfile new file mode 100644 index 0000000..eee5437 --- /dev/null +++ b/docker/eveai_entitlements/Dockerfile @@ -0,0 +1,69 @@ +ARG PYTHON_VERSION=3.12.3 +FROM python:${PYTHON_VERSION}-slim as base + +# Prevents Python from writing pyc files. +ENV PYTHONDONTWRITEBYTECODE=1 + +# Keeps Python from buffering stdout and stderr to avoid situations where +# the application crashes without emitting any logs due to buffering. +ENV PYTHONUNBUFFERED=1 + +# Create directory for patched packages and set permissions +RUN mkdir -p /app/patched_packages && \ + chmod 777 /app/patched_packages + +# Ensure patches are applied to the application. +ENV PYTHONPATH=/app/patched_packages:$PYTHONPATH + +WORKDIR /app + +# Create a non-privileged user that the app will run under. +# See https://docs.docker.com/go/dockerfile-user-best-practices/ +ARG UID=10001 +RUN adduser \ + --disabled-password \ + --gecos "" \ + --home "/nonexistent" \ + --shell "/bin/bash" \ + --no-create-home \ + --uid "${UID}" \ + appuser + +# Install necessary packages and build tools +RUN apt-get update && apt-get install -y \ + build-essential \ + gcc \ + postgresql-client \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Create logs directory and set permissions +RUN mkdir -p /app/logs && chown -R appuser:appuser /app/logs + +# Install Python dependencies. + +# Download dependencies as a separate step to take advantage of Docker's caching. +# Leverage a cache mount to /root/.cache/pip to speed up subsequent builds. +# Leverage a bind mount to requirements.txt to avoid having to copy them into +# into this layer. + +COPY requirements.txt /app/ +RUN python -m pip install -r /app/requirements.txt + +# Copy the source code into the container. +COPY eveai_entitlements /app/eveai_entitlements +COPY common /app/common +COPY config /app/config +COPY scripts /app/scripts +COPY patched_packages /app/patched_packages +COPY --chown=root:root scripts/entrypoint.sh /app/scripts/ + +# Set permissions for entrypoint script +RUN chmod 777 /app/scripts/entrypoint.sh + +# Set ownership of the application directory to the non-privileged user +RUN chown -R appuser:appuser /app + +# Set entrypoint and command +ENTRYPOINT ["/app/scripts/entrypoint.sh"] +CMD ["/app/scripts/start_eveai_entitlements.sh"] diff --git a/eveai_api/views/healthz_views.py b/eveai_api/views/healthz_views.py index b34c3f9..9a79a21 100644 --- a/eveai_api/views/healthz_views.py +++ b/eveai_api/views/healthz_views.py @@ -46,7 +46,7 @@ def check_database(): def check_celery(): try: # Send a simple task to Celery - result = current_celery.send_task('tasks.ping', queue='embeddings') + result = current_celery.send_task('ping', queue='eveai_workers.ping') response = result.get(timeout=10) # Wait for up to 10 seconds for a response return response == 'pong' except CeleryTimeoutError: diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index 1025ff4..3783538 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -136,6 +136,8 @@ def register_blueprints(app): app.register_blueprint(interaction_bp) from .views.entitlements_views import entitlements_bp app.register_blueprint(entitlements_bp) + from .views.administration_views import administration_bp + app.register_blueprint(administration_bp) from .views.healthz_views import healthz_bp, init_healtz app.register_blueprint(healthz_bp) init_healtz(app) diff --git a/eveai_app/temp b/eveai_app/temp deleted file mode 100644 index 3d3f011..0000000 --- a/eveai_app/temp +++ /dev/null @@ -1 +0,0 @@ -Settings(Settings({'deprecated_settings': set()}, {}, {'accept_content': ('json',), 'result_accept_content': None, 'enable_utc': True, 'imports': (), 'include': (), 'timezone': None, 'beat_max_loop_interval': 0, 'beat_schedule': {}, 'beat_scheduler': 'celery.beat:PersistentScheduler', 'beat_schedule_filename': 'celerybeat-schedule', 'beat_sync_every': 0, 'beat_cron_starting_deadline': None, 'broker_url': None, 'broker_read_url': None, 'broker_write_url': None, 'broker_transport': None, 'broker_transport_options': {}, 'broker_connection_timeout': 4, 'broker_connection_retry': True, 'broker_connection_retry_on_startup': None, 'broker_connection_max_retries': 100, 'broker_channel_error_retry': False, 'broker_failover_strategy': None, 'broker_heartbeat': 120, 'broker_heartbeat_checkrate': 3.0, 'broker_login_method': None, 'broker_pool_limit': 10, 'broker_use_ssl': False, 'broker_host': None, 'broker_port': None, 'broker_user': None, 'broker_password': None, 'broker_vhost': None, 'cache_backend': None, 'cache_backend_options': {}, 'cassandra_entry_ttl': None, 'cassandra_keyspace': None, 'cassandra_port': None, 'cassandra_read_consistency': None, 'cassandra_servers': None, 'cassandra_bundle_path': None, 'cassandra_table': None, 'cassandra_write_consistency': None, 'cassandra_auth_provider': None, 'cassandra_auth_kwargs': None, 'cassandra_options': {}, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_bucket': None, 's3_base_path': None, 's3_endpoint_url': None, 's3_region': None, 'azureblockblob_container_name': 'celery', 'azureblockblob_retry_initial_backoff_sec': 2, 'azureblockblob_retry_increment_base': 2, 'azureblockblob_retry_max_attempts': 3, 'azureblockblob_base_path': '', 'azureblockblob_connection_timeout': 20, 'azureblockblob_read_timeout': 120, 'gcs_bucket': None, 'gcs_project': None, 'gcs_base_path': '', 'gcs_ttl': 0, 'control_queue_ttl': 300.0, 'control_queue_expires': 10.0, 'control_exchange': 'celery', 'couchbase_backend_settings': None, 'arangodb_backend_settings': None, 'mongodb_backend_settings': None, 'cosmosdbsql_database_name': 'celerydb', 'cosmosdbsql_collection_name': 'celerycol', 'cosmosdbsql_consistency_level': 'Session', 'cosmosdbsql_max_retry_attempts': 9, 'cosmosdbsql_max_retry_wait_time': 30, 'event_queue_expires': 60.0, 'event_queue_ttl': 5.0, 'event_queue_prefix': 'celeryev', 'event_serializer': 'json', 'event_exchange': 'celeryev', 'redis_backend_use_ssl': None, 'redis_db': None, 'redis_host': None, 'redis_max_connections': None, 'redis_username': None, 'redis_password': None, 'redis_port': None, 'redis_socket_timeout': 120.0, 'redis_socket_connect_timeout': None, 'redis_retry_on_timeout': False, 'redis_socket_keepalive': False, 'result_backend': None, 'result_cache_max': -1, 'result_compression': None, 'result_exchange': 'celeryresults', 'result_exchange_type': 'direct', 'result_expires': datetime.timedelta(days=1), 'result_persistent': None, 'result_extended': False, 'result_serializer': 'json', 'result_backend_transport_options': {}, 'result_chord_retry_interval': 1.0, 'result_chord_join_timeout': 3.0, 'result_backend_max_sleep_between_retries_ms': 10000, 'result_backend_max_retries': inf, 'result_backend_base_sleep_between_retries_ms': 10, 'result_backend_always_retry': False, 'elasticsearch_retry_on_timeout': None, 'elasticsearch_max_retries': None, 'elasticsearch_timeout': None, 'elasticsearch_save_meta_as_text': True, 'security_certificate': None, 'security_cert_store': None, 'security_key': None, 'security_key_password': None, 'security_digest': 'sha256', 'database_url': None, 'database_engine_options': None, 'database_short_lived_sessions': False, 'database_table_schemas': None, 'database_table_names': None, 'task_acks_late': False, 'task_acks_on_failure_or_timeout': True, 'task_always_eager': False, 'task_annotations': None, 'task_compression': None, 'task_create_missing_queues': True, 'task_inherit_parent_priority': False, 'task_default_delivery_mode': 2, 'task_default_queue': 'celery', 'task_default_exchange': None, 'task_default_exchange_type': 'direct', 'task_default_routing_key': None, 'task_default_rate_limit': None, 'task_default_priority': None, 'task_eager_propagates': False, 'task_ignore_result': False, 'task_store_eager_result': False, 'task_protocol': 2, 'task_publish_retry': True, 'task_publish_retry_policy': {'max_retries': 3, 'interval_start': 0, 'interval_max': 1, 'interval_step': 0.2}, 'task_queues': None, 'task_queue_max_priority': None, 'task_reject_on_worker_lost': None, 'task_remote_tracebacks': False, 'task_routes': None, 'task_send_sent_event': False, 'task_serializer': 'json', 'task_soft_time_limit': None, 'task_time_limit': None, 'task_store_errors_even_if_ignored': False, 'task_track_started': False, 'task_allow_error_cb_on_chord_header': False, 'worker_agent': None, 'worker_autoscaler': 'celery.worker.autoscale:Autoscaler', 'worker_cancel_long_running_tasks_on_connection_loss': False, 'worker_concurrency': None, 'worker_consumer': 'celery.worker.consumer:Consumer', 'worker_direct': False, 'worker_disable_rate_limits': False, 'worker_deduplicate_successful_tasks': False, 'worker_enable_remote_control': True, 'worker_hijack_root_logger': True, 'worker_log_color': None, 'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'worker_lost_wait': 10.0, 'worker_max_memory_per_child': None, 'worker_max_tasks_per_child': None, 'worker_pool': 'prefork', 'worker_pool_putlocks': True, 'worker_pool_restarts': False, 'worker_proc_alive_timeout': 4.0, 'worker_prefetch_multiplier': 4, 'worker_enable_prefetch_count_reduction': True, 'worker_redirect_stdouts': True, 'worker_redirect_stdouts_level': 'WARNING', 'worker_send_task_events': False, 'worker_state_db': None, 'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'worker_timer': None, 'worker_timer_precision': 1.0, 'deprecated_settings': None})) diff --git a/eveai_app/templates/administration/trigger_actions.html b/eveai_app/templates/administration/trigger_actions.html new file mode 100644 index 0000000..8e41619 --- /dev/null +++ b/eveai_app/templates/administration/trigger_actions.html @@ -0,0 +1,22 @@ +{% extends 'base.html' %} +{% from "macros.html" import render_selectable_table, render_pagination, render_field %} +{% block title %}Trigger Actions{% endblock %} +{% block content_title %}Trigger Actions{% endblock %} +{% block content_description %}Manually trigger batch actions{% endblock %} +{% block content %} + + +
+
+ +
+
+ +{% endblock %} + +{% block content_footer %} +{% endblock %} + +{% block scripts %} +{% endblock %} + diff --git a/eveai_app/templates/document/document_versions.html b/eveai_app/templates/document/document_versions.html index 15adbf7..b32c6f9 100644 --- a/eveai_app/templates/document/document_versions.html +++ b/eveai_app/templates/document/document_versions.html @@ -10,7 +10,7 @@ {% block content %}
- {{ render_selectable_table(headers=["ID", "URL", "File Loc.", "File Name", "File Type", "Process.", "Proces. Start", "Proces. Finish", "Proces. Error"], rows=rows, selectable=True, id="versionsTable") }} + {{ render_selectable_table(headers=["ID", "URL", "Object Name", "File Type", "Process.", "Proces. Start", "Proces. Finish", "Proces. Error"], rows=rows, selectable=True, id="versionsTable") }}
diff --git a/eveai_app/templates/entitlements/view_usages.html b/eveai_app/templates/entitlements/view_usages.html new file mode 100644 index 0000000..1bfb432 --- /dev/null +++ b/eveai_app/templates/entitlements/view_usages.html @@ -0,0 +1,28 @@ +{% extends 'base.html' %} +{% from "macros.html" import render_selectable_table, render_pagination %} + +{% block title %}View License Usage{% endblock %} + +{% block content_title %}View License Usage{% endblock %} +{% block content_description %}View License Usage{% endblock %} + +{% block content %} + + {{ render_selectable_table(headers=["Usage ID", "Start Date", "End Date", "Storage (MiB)", "Embedding (MiB)", "Interaction (tokens)"], rows=rows, selectable=False, id="usagesTable") }} + + + + + + + + +{% endblock %} + +{% block content_footer %} + {{ render_pagination(pagination, 'user_bp.select_tenant') }} +{% endblock %} + +{% block scripts %} + +{% endblock %} \ No newline at end of file diff --git a/eveai_app/templates/navbar.html b/eveai_app/templates/navbar.html index 6bd40a2..a2e5443 100644 --- a/eveai_app/templates/navbar.html +++ b/eveai_app/templates/navbar.html @@ -98,6 +98,8 @@ {{ dropdown('Administration', 'settings', [ {'name': 'License Tier Registration', 'url': '/entitlements/license_tier', 'roles': ['Super User']}, {'name': 'All License Tiers', 'url': '/entitlements/view_license_tiers', 'roles': ['Super User']}, + {'name': 'Trigger Actions', 'url': '/administration/trigger_actions', 'roles': ['Super User']}, + {'name': 'Usage', 'url': '/entitlements/view_usages', 'roles': ['Super User', 'Tenant Admin']}, ]) }} {% endif %} {% if current_user.is_authenticated %} diff --git a/eveai_app/views/administration_forms.py b/eveai_app/views/administration_forms.py new file mode 100644 index 0000000..bf58bc8 --- /dev/null +++ b/eveai_app/views/administration_forms.py @@ -0,0 +1,7 @@ +from flask import current_app +from flask_wtf import FlaskForm +from wtforms.fields.simple import SubmitField + + +class TriggerActionForm(FlaskForm): + submit = SubmitField('Submit') diff --git a/eveai_app/views/administration_views.py b/eveai_app/views/administration_views.py new file mode 100644 index 0000000..add2960 --- /dev/null +++ b/eveai_app/views/administration_views.py @@ -0,0 +1,39 @@ +import uuid +from datetime import datetime as dt, timezone as tz +from flask import request, redirect, flash, render_template, Blueprint, session, current_app, jsonify +from flask_security import hash_password, roles_required, roles_accepted, current_user +from itsdangerous import URLSafeTimedSerializer +from sqlalchemy.exc import SQLAlchemyError + +from common.utils.celery_utils import current_celery +from common.utils.view_assistants import prepare_table_for_macro, form_validation_failed +from common.utils.nginx_utils import prefixed_url_for +from .administration_forms import TriggerActionForm + +administration_bp = Blueprint('administration_bp', __name__, url_prefix='/administration') + + +@administration_bp.route('/trigger_actions', methods=['GET']) +@roles_accepted('Super User') +def trigger_actions(): + form = TriggerActionForm() + return render_template('administration/trigger_actions.html', form=form) + + +@administration_bp.route('/handle_trigger_action', methods=['POST']) +@roles_accepted('Super User') +def handle_trigger_action(): + action = request.form['action'] + match action: + case 'update_usages': + try: + # Use send_task to trigger the task since it's part of another component (eveai_entitlements) + task = current_celery.send_task('update_usages', queue='entitlements') + + current_app.logger.info(f"Usage update task triggered: {task.id}") + flash('Usage update task has been triggered successfully!', 'success') + except Exception as e: + current_app.logger.error(f"Failed to trigger usage update task: {str(e)}") + flash(f'Failed to trigger usage update: {str(e)}', 'danger') + + return redirect(prefixed_url_for('administration_bp.trigger_actions')) diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index 8029c10..f31d511 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -268,8 +268,8 @@ def document_versions(document_id): pagination = query.paginate(page=page, per_page=per_page, error_out=False) doc_langs = pagination.items - rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''), ('file_location', ''), - ('file_name', ''), ('file_type', ''), + rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''), + ('object_name', ''), ('file_type', ''), ('processing', ''), ('processing_started_at', ''), ('processing_finished_at', ''), ('processing_error', '')]) @@ -349,10 +349,9 @@ def re_embed_latest_versions(): def process_version(version_id): - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - session['tenant']['id'], - version_id, - ]) + task = current_celery.send_task('create_embeddings', + args=[session['tenant']['id'], version_id,], + queue='embeddings') current_app.logger.info(f'Embedding creation retriggered by user {current_user.id}, {current_user.email} ' f'for tenant {session["tenant"]["id"]}, ' f'Document Version {version_id}. ' diff --git a/eveai_app/views/entitlements_views.py b/eveai_app/views/entitlements_views.py index 039cec6..962a6c9 100644 --- a/eveai_app/views/entitlements_views.py +++ b/eveai_app/views/entitlements_views.py @@ -2,18 +2,14 @@ import uuid from datetime import datetime as dt, timezone as tz from flask import request, redirect, flash, render_template, Blueprint, session, current_app, jsonify from flask_security import hash_password, roles_required, roles_accepted, current_user -from itsdangerous import URLSafeTimedSerializer from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy import or_ +from sqlalchemy import or_, desc import ast from common.models.entitlements import License, LicenseTier, LicenseUsage, BusinessEventLog from common.extensions import db, security, minio_client, simple_encryption -from common.utils.security_utils import send_confirmation_email, send_reset_email from .entitlements_forms import LicenseTierForm, LicenseForm -from common.utils.database import Database from common.utils.view_assistants import prepare_table_for_macro, form_validation_failed -from common.utils.simple_encryption import generate_api_key from common.utils.nginx_utils import prefixed_url_for entitlements_bp = Blueprint('entitlements_bp', __name__, url_prefix='/entitlements') @@ -174,14 +170,14 @@ def create_license(license_tier_id): db.session.add(new_license) db.session.commit() flash('License created successfully', 'success') - return redirect(prefixed_url_for('entitlements_bp/edit_license', license_id=new_license.id)) + return redirect(prefixed_url_for('entitlements_bp.edit_license', license_id=new_license.id)) except Exception as e: db.session.rollback() flash(f'Error creating license: {str(e)}', 'error') else: form_validation_failed(request, form) - return render_template('entitlements/license.html', form=form) + return render_template('entitlements/license.html', form=form, ext_disabled_fields=[]) @entitlements_bp.route('/license/', methods=['GET', 'POST']) @@ -215,3 +211,25 @@ def edit_license(license_id): return render_template('entitlements/license.html', form=form, license_tier_id=license_tier.id, ext_disabled_fields=disabled_fields) + + +@entitlements_bp.route('/view_usages') +@roles_accepted('Super User', 'Tenant Admin') +def view_usages(): + page = request.args.get('page', 1, type=int) + per_page = request.args.get('per_page', 10, type=int) + + tenant_id = session.get('tenant').get('id') + query = LicenseUsage.query.filter_by(tenant_id=tenant_id).order_by(desc(LicenseUsage.id)) + + pagination = query.paginate(page=page, per_page=per_page) + lus = pagination.items + + # prepare table data + + rows = prepare_table_for_macro(lus, [('id', ''), ('period_start_date', ''), ('period_end_date', ''), + ('storage_mb_used', ''), ('embedding_mb_used', ''), + ('interaction_total_tokens_used', '')]) + + # Render the users in a template + return render_template('entitlements/view_usages.html', rows=rows, pagination=pagination) diff --git a/eveai_beat/__init__.py b/eveai_beat/__init__.py new file mode 100644 index 0000000..9e5a932 --- /dev/null +++ b/eveai_beat/__init__.py @@ -0,0 +1,44 @@ +import logging +import logging.config +from flask import Flask +import os + +from common.utils.celery_utils import make_celery, init_celery +from config.logging_config import LOGGING +from config.config import get_config + + +def create_app(config_file=None): + app = Flask(__name__) + + environment = os.getenv('FLASK_ENV', 'development') + + match environment: + case 'development': + app.config.from_object(get_config('dev')) + case 'production': + app.config.from_object(get_config('prod')) + case _: + app.config.from_object(get_config('dev')) + + logging.config.dictConfig(LOGGING) + + register_extensions(app) + + celery = make_celery(app.name, app.config) + init_celery(celery, app, is_beat=True) + + from . import schedule + celery.conf.beat_schedule = schedule.beat_schedule + + app.logger.info("EveAI Beat Scheduler Started Successfully") + app.logger.info("-------------------------------------------------------------------------------------------------") + + return app, celery + + +def register_extensions(app): + pass + + +app, celery = create_app() diff --git a/eveai_beat/schedule.py b/eveai_beat/schedule.py new file mode 100644 index 0000000..6b79c5d --- /dev/null +++ b/eveai_beat/schedule.py @@ -0,0 +1,17 @@ +from celery.schedules import crontab + +# Define the Celery beat schedule here +beat_schedule = { + 'update-tenant-usages-every-hour': { + 'task': 'update_usages', + 'schedule': crontab(minute='0'), # Runs every hour + 'args': (), + 'options': {'queue': 'entitlements'} + }, + # 'send-invoices-every-month': { + # 'task': 'send_invoices', + # 'schedule': crontab(day_of_month=1, hour=0, minute=0), # Runs on the 1st of every month + # 'args': () + # }, + # Add more schedules as needed +} \ No newline at end of file diff --git a/eveai_chat/socket_handlers/chat_handler.py b/eveai_chat/socket_handlers/chat_handler.py index b4200ec..b367063 100644 --- a/eveai_chat/socket_handlers/chat_handler.py +++ b/eveai_chat/socket_handlers/chat_handler.py @@ -109,14 +109,16 @@ def handle_message(data): room = session.get('room') # Offload actual processing of question - task = current_celery.send_task('ask_question', queue='llm_interactions', args=[ - current_tenant_id, - data['message'], - data['language'], - session['session_id'], - data['timezone'], - room - ]) + task = current_celery.send_task('ask_question', + queue='llm_interactions', + args=[ + current_tenant_id, + data['message'], + data['language'], + session['session_id'], + data['timezone'], + room + ]) current_app.logger.debug(f'SocketIO: Message offloading for tenant {current_tenant_id}, ' f'Question: {task.id}') response = { diff --git a/eveai_entitlements/tasks.py b/eveai_entitlements/tasks.py index f0a1c73..67de8b7 100644 --- a/eveai_entitlements/tasks.py +++ b/eveai_entitlements/tasks.py @@ -5,13 +5,14 @@ from datetime import datetime as dt, timezone as tz, datetime from celery import states from dateutil.relativedelta import relativedelta from flask import current_app -from sqlalchemy import or_, and_ +from sqlalchemy import or_, and_, text from sqlalchemy.exc import SQLAlchemyError from common.extensions import db from common.models.user import Tenant from common.models.entitlements import BusinessEventLog, LicenseUsage, License from common.utils.celery_utils import current_celery -from common.utils.eveai_exceptions import EveAINoLicenseForTenant +from common.utils.eveai_exceptions import EveAINoLicenseForTenant, EveAIException +from common.utils.database import Database # Healthcheck task @@ -24,32 +25,54 @@ def ping(): def update_usages(): current_timestamp = dt.now(tz.utc) tenant_ids = get_all_tenant_ids() + + # List to collect all errors + error_list = [] + for tenant_id in tenant_ids: - tenant = Tenant.query.get(tenant_id) - if tenant.storage_dirty: - recalculate_storage_for_tenant(tenant) - check_and_create_license_usage_for_tenant(tenant_id) - logs = get_logs_for_processing(tenant_id, current_timestamp) - if not logs: - continue # If no logs to be processed, continu to the next tenant + try: + Database(tenant_id).switch_schema() + check_and_create_license_usage_for_tenant(tenant_id) + tenant = Tenant.query.get(tenant_id) + if tenant.storage_dirty: + recalculate_storage_for_tenant(tenant) + logs = get_logs_for_processing(tenant_id, current_timestamp) + if not logs: + continue # If no logs to be processed, continu to the next tenant - # Get the min and max timestamp from the logs - min_timestamp = min(log.timestamp for log in logs) - max_timestamp = max(log.timestamp for log in logs) + # Get the min and max timestamp from the logs + min_timestamp = min(log.timestamp for log in logs) + max_timestamp = max(log.timestamp for log in logs) - # Retrieve relevant LicenseUsage records - license_usages = get_relevant_license_usages(db.session, tenant_id, min_timestamp, max_timestamp) + # Retrieve relevant LicenseUsage records + current_app.logger.debug(f"Searching relevant usages for tenant {tenant_id}") + license_usages = get_relevant_license_usages(db.session, tenant_id, min_timestamp, max_timestamp) + current_app.logger.debug(f"Found {license_usages}, end searching relevant usages for tenant {tenant_id}") - # Split logs based on LicenseUsage periods - logs_by_usage = split_logs_by_license_usage(logs, license_usages) + # Split logs based on LicenseUsage periods + current_app.logger.debug(f"Splitting usages for tenant {tenant_id}") + logs_by_usage = split_logs_by_license_usage(logs, license_usages) + current_app.logger.debug(f"Found {logs_by_usage}, end splitting logs for tenant {tenant_id}") - # Now you can process logs for each LicenseUsage - for license_usage_id, logs in logs_by_usage.items(): - process_logs_for_license_usage(tenant_id, license_usage_id, logs) + # Now you can process logs for each LicenseUsage + for license_usage_id, logs in logs_by_usage.items(): + current_app.logger.debug(f"Processing logs for usage id {license_usage_id} for tenant {tenant_id}") + process_logs_for_license_usage(tenant_id, license_usage_id, logs) + current_app.logger.debug(f"Finished processing logs for tenant {tenant_id}") + except Exception as e: + error = f"Usage Calculation error for Tenant {tenant_id}: {e}" + error_list.append(error) + current_app.logger.error(error) + continue + + if error_list: + raise Exception('\n'.join(error_list)) + + return "Update Usages taks completed successfully" def get_all_tenant_ids(): - tenant_ids = db.session.query(Tenant.tenant_id).all() + tenant_ids = db.session.query(Tenant.id).all() return [tenant_id[0] for tenant_id in tenant_ids] # Extract tenant_id from tuples @@ -57,21 +80,21 @@ def check_and_create_license_usage_for_tenant(tenant_id): current_date = dt.now(tz.utc).date() license_usages = (db.session.query(LicenseUsage) .filter_by(tenant_id=tenant_id) - .filter_by(and_(LicenseUsage.period_start_date <= current_date, - LicenseUsage.period_end_date >= current_date)) + .filter(and_(LicenseUsage.period_start_date <= current_date, + LicenseUsage.period_end_date >= current_date)) .all()) if not license_usages: active_license = (db.session.query(License).filter_by(tenant_id=tenant_id) - .filter_by(and_(License.start_date <= current_date, - License.end_date >= current_date)) - .one()) + .filter(and_(License.start_date <= current_date, + License.end_date >= current_date)) + .one_or_none()) if not active_license: current_app.logger.error(f"No License defined for {tenant_id}. " f"Impossible to calculate license usage.") raise EveAINoLicenseForTenant(message=f"No License defined for {tenant_id}. " f"Impossible to calculate license usage.") - start_date, end_date = calculate_valid_period(current_date, active_license.period_start_date) + start_date, end_date = calculate_valid_period(current_date, active_license.start_date) new_license_usage = LicenseUsage(period_start_date=start_date, period_end_date=end_date, license_id=active_license.id, @@ -124,8 +147,8 @@ def get_relevant_license_usages(session, tenant_id, min_timestamp, max_timestamp # Fetch LicenseUsage records where the log timestamps fall between period_start_date and period_end_date return session.query(LicenseUsage).filter( LicenseUsage.tenant_id == tenant_id, - LicenseUsage.period_start_date <= max_timestamp, - LicenseUsage.period_end_date >= min_timestamp + LicenseUsage.period_start_date <= max_timestamp.date(), + LicenseUsage.period_end_date >= min_timestamp.date() ).order_by(LicenseUsage.period_start_date).all() @@ -136,7 +159,7 @@ def split_logs_by_license_usage(logs, license_usages): for log in logs: # Find the corresponding LicenseUsage for each log based on the timestamp for license_usage in license_usages: - if license_usage.period_start_date <= log.timestamp <= license_usage.period_end_date: + if license_usage.period_start_date <= log.timestamp.date() <= license_usage.period_end_date: logs_by_usage[license_usage.id].append(log) break @@ -181,7 +204,7 @@ def process_logs_for_license_usage(tenant_id, license_usage_id, logs): log.license_usage_id = license_usage_id # Update the LicenseUsage record with the accumulated values - license_usage.embedding_mb += embedding_mb_used + license_usage.embedding_mb_used += embedding_mb_used license_usage.embedding_prompt_tokens_used += embedding_prompt_tokens_used license_usage.embedding_completion_tokens_used += embedding_completion_tokens_used license_usage.embedding_total_tokens_used += embedding_total_tokens_used @@ -189,27 +212,31 @@ def process_logs_for_license_usage(tenant_id, license_usage_id, logs): license_usage.interaction_completion_tokens_used += interaction_completion_tokens_used license_usage.interaction_total_tokens_used += interaction_total_tokens_used + current_app.logger.debug(f"Processed logs for license usage {license_usage.id}:\n{license_usage}") + # Commit the updates to the LicenseUsage and log records try: db.session.add(license_usage) - db.session.add(logs) + for log in logs: + db.session.add(log) db.session.commit() except SQLAlchemyError as e: db.session.rollback() - current_app.logger.error(f"Error trying to update license usage and logs for tenant {tenant_id}. ") + current_app.logger.error(f"Error trying to update license usage and logs for tenant {tenant_id}: {e}") raise e def recalculate_storage_for_tenant(tenant): # Perform a SUM operation to get the total file size from document_versions - total_storage = db.session.execute(f""" + total_storage = db.session.execute(text(f""" SELECT SUM(file_size) - FROM {tenant.id}.document_versions - """).scalar() + FROM document_version + """)).scalar() + current_app.logger.debug(f"Recalculating storage for tenant {tenant} - Total storage: {total_storage}") # Update the LicenseUsage with the recalculated storage license_usage = db.session.query(LicenseUsage).filter_by(tenant_id=tenant.id).first() - license_usage.storage_mb = total_storage / (1024 * 1024) # Convert bytes to MB + license_usage.storage_mb_used = total_storage # Reset the dirty flag after recalculating tenant.storage_dirty = False diff --git a/eveai_workers/Processors/audio_processor.py b/eveai_workers/Processors/audio_processor.py index 915f477..7ef3bd2 100644 --- a/eveai_workers/Processors/audio_processor.py +++ b/eveai_workers/Processors/audio_processor.py @@ -27,10 +27,8 @@ class AudioProcessor(TranscriptionProcessor): def _get_transcription(self): file_data = minio_client.download_document_file( self.tenant.id, - self.document_version.doc_id, - self.document_version.language, - self.document_version.id, - self.document_version.file_name + self.document_version.bucket_name, + self.document_version.object_name, ) with current_event.create_span("Audio Compression"): diff --git a/eveai_workers/Processors/html_processor.py b/eveai_workers/Processors/html_processor.py index 9538cb1..bd5f119 100644 --- a/eveai_workers/Processors/html_processor.py +++ b/eveai_workers/Processors/html_processor.py @@ -24,10 +24,8 @@ class HTMLProcessor(Processor): try: file_data = minio_client.download_document_file( self.tenant.id, - self.document_version.doc_id, - self.document_version.language, - self.document_version.id, - self.document_version.file_name + self.document_version.bucket_name, + self.document_version.object_name, ) html_content = file_data.decode('utf-8') diff --git a/eveai_workers/Processors/pdf_processor.py b/eveai_workers/Processors/pdf_processor.py index b8826e6..afa772e 100644 --- a/eveai_workers/Processors/pdf_processor.py +++ b/eveai_workers/Processors/pdf_processor.py @@ -27,10 +27,8 @@ class PDFProcessor(Processor): try: file_data = minio_client.download_document_file( self.tenant.id, - self.document_version.doc_id, - self.document_version.language, - self.document_version.id, - self.document_version.file_name + self.document_version.bucket_name, + self.document_version.object_name, ) with current_event.create_span("PDF Extraction"): diff --git a/eveai_workers/Processors/srt_processor.py b/eveai_workers/Processors/srt_processor.py index ccf2c6e..eb7c8fc 100644 --- a/eveai_workers/Processors/srt_processor.py +++ b/eveai_workers/Processors/srt_processor.py @@ -7,10 +7,8 @@ class SRTProcessor(TranscriptionProcessor): def _get_transcription(self): file_data = minio_client.download_document_file( self.tenant.id, - self.document_version.doc_id, - self.document_version.language, - self.document_version.id, - self.document_version.file_name + self.document_version.bucket_name, + self.document_version.object_name, ) srt_content = file_data.decode('utf-8') return self._clean_srt(srt_content) diff --git a/eveai_workers/__init__.py b/eveai_workers/__init__.py index ea13015..73efa2c 100644 --- a/eveai_workers/__init__.py +++ b/eveai_workers/__init__.py @@ -44,3 +44,4 @@ def register_extensions(app): app, celery = create_app() + diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 5266554..c6736b6 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -36,34 +36,36 @@ def ping(): @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): - # Retrieve document version to process - document_version = DocumentVersion.query.get(document_version_id) - if document_version is None: - raise Exception(f'Document version {document_version_id} not found') + try: + # Retrieve Tenant for which we are processing + tenant = Tenant.query.get(tenant_id) + if tenant is None: + raise Exception(f'Tenant {tenant_id} not found') + + # Ensure we are working in the correct database schema + Database(tenant_id).switch_schema() + + # Retrieve document version to process + document_version = DocumentVersion.query.get(document_version_id) + if document_version is None: + raise Exception(f'Document version {document_version_id} not found') + + # Select variables to work with depending on tenant and model + model_variables = select_model_variables(tenant) + current_app.logger.debug(f'Model variables: {model_variables}') + + except Exception as e: + current_app.logger.error(f'Create Embeddings request received ' + f'for non existing document version {document_version_id} ' + f'for tenant {tenant_id}, ' + f'error: {e}') + raise + # BusinessEvent creates a context, which is why we need to use it with a with block with BusinessEvent('Create Embeddings', tenant_id, document_version_id=document_version_id, document_version_file_size=document_version.file_size): current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}') - try: - # Retrieve Tenant for which we are processing - tenant = Tenant.query.get(tenant_id) - if tenant is None: - raise Exception(f'Tenant {tenant_id} not found') - - # Ensure we are working in the correct database schema - Database(tenant_id).switch_schema() - - # Select variables to work with depending on tenant and model - model_variables = select_model_variables(tenant) - current_app.logger.debug(f'Model variables: {model_variables}') - - except Exception as e: - current_app.logger.error(f'Create Embeddings request received ' - f'for non existing document version {document_version_id} ' - f'for tenant {tenant_id}, ' - f'error: {e}') - raise try: db.session.add(document_version) @@ -204,7 +206,7 @@ def enrich_chunks(tenant, model_variables, document_version, title, chunks): if len(chunks) > 1: summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) - chunk_total_context = (f'Filename: {document_version.file_name}\n' + chunk_total_context = (f'Filename: {document_version.object_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'User Metadata:\n{document_version.user_metadata}\n\n' f'Title: {title}\n' @@ -213,7 +215,7 @@ def enrich_chunks(tenant, model_variables, document_version, title, chunks): f'System Metadata:\n{document_version.system_metadata}\n\n' ) enriched_chunks = [] - initial_chunk = (f'Filename: {document_version.file_name}\n' + initial_chunk = (f'Filename: {document_version.object_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'User Metadata:\n{document_version.user_metadata}\n\n' f'Title: {title}\n' @@ -304,13 +306,12 @@ def log_parsing_info(tenant, tags, included_elements, excluded_elements, exclude def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): try: current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') + markdown_on = document_version.object_name.rsplit('.', 1)[0] + '.md' # Download the markdown file from MinIO markdown_data = minio_client.download_document_file(tenant_id, - document_version.doc_id, - document_version.language, - document_version.id, - input_file + document_version.bucket_name, + markdown_on, ) markdown = markdown_data.decode('utf-8') diff --git a/migrations/public/versions/02debd224316_set_storage_dirty_flag_for_all_tenants.py b/migrations/public/versions/02debd224316_set_storage_dirty_flag_for_all_tenants.py new file mode 100644 index 0000000..8a70930 --- /dev/null +++ b/migrations/public/versions/02debd224316_set_storage_dirty_flag_for_all_tenants.py @@ -0,0 +1,24 @@ +"""Set storage_dirty flag for all tenants + +Revision ID: 02debd224316 +Revises: 8fdd7f2965c1 +Create Date: 2024-10-08 06:53:17.261709 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '02debd224316' +down_revision = '8fdd7f2965c1' +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute('UPDATE tenant SET storage_dirty = TRUE') + + +def downgrade(): + pass diff --git a/migrations/public/versions/a678c84d5633_licenseusage_correct_mb_fields_to_be_.py b/migrations/public/versions/a678c84d5633_licenseusage_correct_mb_fields_to_be_.py new file mode 100644 index 0000000..38cc095 --- /dev/null +++ b/migrations/public/versions/a678c84d5633_licenseusage_correct_mb_fields_to_be_.py @@ -0,0 +1,46 @@ +"""LicenseUsage: correct mb fields to be floats iso integers + +Revision ID: a678c84d5633 +Revises: 02debd224316 +Create Date: 2024-10-11 08:03:22.823327 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'a678c84d5633' +down_revision = '02debd224316' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('license_usage', schema=None) as batch_op: + batch_op.alter_column('storage_mb_used', + existing_type=sa.INTEGER(), + type_=sa.Float(), + existing_nullable=True) + batch_op.alter_column('embedding_mb_used', + existing_type=sa.INTEGER(), + type_=sa.Float(), + existing_nullable=True) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('license_usage', schema=None) as batch_op: + batch_op.alter_column('embedding_mb_used', + existing_type=sa.Float(), + type_=sa.INTEGER(), + existing_nullable=True) + batch_op.alter_column('storage_mb_used', + existing_type=sa.Float(), + type_=sa.INTEGER(), + existing_nullable=True) + + # ### end Alembic commands ### diff --git a/migrations/tenant/env.py b/migrations/tenant/env.py index a14f041..e239846 100644 --- a/migrations/tenant/env.py +++ b/migrations/tenant/env.py @@ -124,31 +124,34 @@ def run_migrations_online(): with connectable.connect() as connection: tenants = get_tenant_ids() for tenant in tenants: - logger.info(f"Migrating tenant: {tenant}") - # set search path on the connection, which ensures that - # PostgreSQL will emit all CREATE / ALTER / DROP statements - # in terms of this schema by default - connection.execute(text(f'SET search_path TO "{tenant}", public')) - # in SQLAlchemy v2+ the search path change needs to be committed - connection.commit() + try: + logger.info(f"Migrating tenant: {tenant}") + # set search path on the connection, which ensures that + # PostgreSQL will emit all CREATE / ALTER / DROP statements + # in terms of this schema by default + connection.execute(text(f'SET search_path TO "{tenant}", public')) + # in SQLAlchemy v2+ the search path change needs to be committed + connection.commit() - # make use of non-supported SQLAlchemy attribute to ensure - # the dialect reflects tables in terms of the current tenant name - connection.dialect.default_schema_name = str(tenant) + # make use of non-supported SQLAlchemy attribute to ensure + # the dialect reflects tables in terms of the current tenant name + connection.dialect.default_schema_name = str(tenant) - context.configure( - connection=connection, - target_metadata=get_metadata(), - # literal_binds=True, - include_object=include_object, - ) + context.configure( + connection=connection, + target_metadata=get_metadata(), + # literal_binds=True, + include_object=include_object, + ) - with context.begin_transaction(): - context.run_migrations() + with context.begin_transaction(): + context.run_migrations() - # for checking migrate or upgrade is running - if getattr(config.cmd_opts, "autogenerate", False): - break + # for checking migrate or upgrade is running + if getattr(config.cmd_opts, "autogenerate", False): + break + except Exception as e: + continue if context.is_offline_mode(): diff --git a/migrations/tenant/versions/322d3cf1f17b_documentversion_update_to_bucket_name_.py b/migrations/tenant/versions/322d3cf1f17b_documentversion_update_to_bucket_name_.py index 3ce93d8..583638d 100644 --- a/migrations/tenant/versions/322d3cf1f17b_documentversion_update_to_bucket_name_.py +++ b/migrations/tenant/versions/322d3cf1f17b_documentversion_update_to_bucket_name_.py @@ -60,7 +60,7 @@ def upgrade(): except S3Error as e: if e.code == "NoSuchKey": current_app.logger.warning( - f"Object {doc_version.file_location} not found in bucket {doc_version.bucket_name}. Skipping.") + f"Object {doc_version.object_name} not found in bucket {doc_version.bucket_name}. Skipping.") continue # Move to the next item else: raise e # Handle other types of S3 errors diff --git a/requirements.txt b/requirements.txt index eb8af59..2f57f6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -81,3 +81,4 @@ anthropic~=0.34.2 prometheus-client~=0.20.0 flower~=2.0.1 psutil~=6.0.0 +celery-redbeat~=2.2.0 diff --git a/scripts/entrypoint_no_db.sh b/scripts/entrypoint_no_db.sh new file mode 100755 index 0000000..91efcc5 --- /dev/null +++ b/scripts/entrypoint_no_db.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -e + +# Ensure the logs directory has the correct permissions +echo "Changing permissions on logs directory" +#chown -R appuser:appuser /app/logs +chmod -R 777 /app/logs + +# Switch to appuser and execute the command passed to the script +exec su -- appuser -c "$@" diff --git a/scripts/repopack_eveai.sh b/scripts/repopack_eveai.sh index 4ee5731..565ccec 100755 --- a/scripts/repopack_eveai.sh +++ b/scripts/repopack_eveai.sh @@ -1,17 +1,35 @@ #!/bin/bash +# Delete previous repopack files +rm -f *repo.txt -# Run repopack to generate the file -repopack +# Define the list of components +components=("docker" "eveai_api" "eveai_app" "eveai_beat" "eveai_chat" "eveai_chat_workers" "eveai_entitlements" "eveai_workers" "nginx" "full" "integrations") -# Check if repopack generated the eveai_repo.txt file -if [[ -f "eveai_repo.txt" ]]; then - # Get the current timestamp in the format YYYY-DD-MM_HH:MM:SS - timestamp=$(date +"%Y-%d-%m_%H-%M-%S") +# Get the current date and time in the format YYYY-MM-DD_HH-MM +timestamp=$(date +"%Y-%m-%d_%H-%M") - # Rename the file with the timestamp - mv eveai_repo.txt "${timestamp}_eveai_repo.txt" +# Loop through each component and perform the tasks +for component in "${components[@]}"; do + echo "Processing component: $component" - echo "File renamed to ${timestamp}_eveai_repo.txt" -else - echo "Error: eveai_repo.txt not found. repopack may have failed." -fi + # Merge the .repopackignore_base and .repopackignore_ into .repopackignore + if [[ -f ".repopackignore_base" && -f ".repopackignore_$component" ]]; then + cat .repopackignore_base .repopackignore_$component > .repopackignore + else + echo "Warning: Missing .repopackignore_base or .repopackignore_$component for $component" + continue + fi + + # Execute repopack + repopack + + # Rename the resulting eveai_repo.txt file to YYYY-MM-DD_HH-MM_repo.txt + if [[ -f "eveai_repo.txt" ]]; then + mv eveai_repo.txt "${component}_${timestamp}_repo.txt" + echo "Renamed eveai_repo.txt to ${component}_${timestamp}_repo.txt" + else + echo "Error: repopack did not generate eveai_repo.txt for $component" + fi + + echo "Finished processing $component" +done diff --git a/scripts/start_eveai_beat.sh b/scripts/start_eveai_beat.sh new file mode 100755 index 0000000..233b724 --- /dev/null +++ b/scripts/start_eveai_beat.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +cd "/app/" || exit 1 +export PROJECT_DIR="/app" +export PYTHONPATH="$PROJECT_DIR/patched_packages:$PYTHONPATH:$PROJECT_DIR" # Include the app directory in the Python path & patched packages + +# Ensure we can write the logs +chown -R appuser:appuser /app/logs + +# Start Celery Beat +celery -A eveai_beat.celery beat --scheduler=redbeat.RedBeatScheduler --loglevel=debug & + +# Start a worker for the 'llm_interactions' queue with auto-scaling - not necessary, in eveai_chat_workers +# celery -A eveai_workers.celery worker --loglevel=info - Q llm_interactions --autoscale=2,8 --hostname=interactions_worker@%h & + +# Wait for all background processes to finish +wait diff --git a/scripts/start_eveai_entitlements.sh b/scripts/start_eveai_entitlements.sh new file mode 100755 index 0000000..a84ad9a --- /dev/null +++ b/scripts/start_eveai_entitlements.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +cd "/app/" || exit 1 +export PROJECT_DIR="/app" +export PYTHONPATH="$PROJECT_DIR/patched_packages:$PYTHONPATH:$PROJECT_DIR" # Include the app directory in the Python path & patched packages + +# Ensure we can write the logs +chown -R appuser:appuser /app/logs + +# Start a worker for the 'embeddings' queue with higher concurrency +celery -A eveai_entitlements.celery worker --loglevel=debug -Q entitlements --autoscale=2,8 --hostname=entitlements_worker@%h & + +# Start a worker for the 'llm_interactions' queue with auto-scaling - not necessary, in eveai_chat_workers +# celery -A eveai_workers.celery worker --loglevel=info - Q llm_interactions --autoscale=2,8 --hostname=interactions_worker@%h & + +# Wait for all background processes to finish +wait diff --git a/scripts/start_flower.sh b/scripts/start_flower.sh old mode 100644 new mode 100755