- License Usage Calculation realised

- View License Usages
- Celery Beat container added
- First schedule in Celery Beat for calculating usage (hourly)
- repopack can now split for different components
- Various fixes as consequece of changing file_location / file_name ==> bucket_name / object_name
- Celery Routing / Queuing updated
This commit is contained in:
Josako
2024-10-11 16:33:36 +02:00
parent 5ffad160b1
commit 9f5f090f0c
57 changed files with 935 additions and 174 deletions

View File

@@ -1,14 +1,16 @@
from celery import Celery
from kombu import Queue
from werkzeug.local import LocalProxy
from redbeat import RedBeatScheduler
celery_app = Celery()
def init_celery(celery, app):
def init_celery(celery, app, is_beat=False):
celery_app.main = app.name
app.logger.debug(f'CELERY_BROKER_URL: {app.config["CELERY_BROKER_URL"]}')
app.logger.debug(f'CELERY_RESULT_BACKEND: {app.config["CELERY_RESULT_BACKEND"]}')
celery_config = {
'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
@@ -17,19 +19,40 @@ def init_celery(celery, app):
'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']),
'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'),
'enable_utc': app.config.get('CELERY_ENABLE_UTC', True),
'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings',
'routing_key': 'embeddings.create_embeddings'}},
}
if is_beat:
# Add configurations specific to Beat scheduler
celery_config['beat_scheduler'] = 'redbeat.RedBeatScheduler'
celery_config['redbeat_lock_key'] = 'redbeat::lock'
celery_config['beat_max_loop_interval'] = 10 # Adjust as needed
celery_app.conf.update(**celery_config)
# Setting up Celery task queues
celery_app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}),
Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}),
)
# Task queues for workers only
if not is_beat:
celery_app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}),
Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}),
Queue('entitlements', routing_key='entitlements.#', queue_arguments={'x-max-priority': 10}),
)
celery_app.conf.task_routes = {
'eveai_workers.*': { # All tasks from eveai_workers module
'queue': 'embeddings',
'routing_key': 'embeddings.#',
},
'eveai_chat_workers.*': { # All tasks from eveai_chat_workers module
'queue': 'llm_interactions',
'routing_key': 'llm_interactions.#',
},
'eveai_entitlements.*': { # All tasks from eveai_entitlements module
'queue': 'entitlements',
'routing_key': 'entitlements.#',
}
}
# Ensuring tasks execute with Flask application context
# Ensure tasks execute with Flask context
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
@@ -37,6 +60,39 @@ def init_celery(celery, app):
celery.Task = ContextTask
# Original init_celery before updating for beat
# def init_celery(celery, app):
# celery_app.main = app.name
# app.logger.debug(f'CELERY_BROKER_URL: {app.config["CELERY_BROKER_URL"]}')
# app.logger.debug(f'CELERY_RESULT_BACKEND: {app.config["CELERY_RESULT_BACKEND"]}')
# celery_config = {
# 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
# 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
# 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'),
# 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'),
# 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']),
# 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'),
# 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True),
# 'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings',
# 'routing_key': 'embeddings.create_embeddings'}},
# }
# celery_app.conf.update(**celery_config)
#
# # Setting up Celery task queues
# celery_app.conf.task_queues = (
# Queue('default', routing_key='task.#'),
# Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}),
# Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}),
# )
#
# # Ensuring tasks execute with Flask application context
# class ContextTask(celery.Task):
# def __call__(self, *args, **kwargs):
# with app.app_context():
# return self.run(*args, **kwargs)
#
# celery.Task = ContextTask
def make_celery(app_name, config):
return celery_app

View File

@@ -99,12 +99,12 @@ def upload_file_for_version(doc_vers, file, extension, tenant_id):
doc_vers.doc_id,
doc_vers.language,
doc_vers.id,
doc_vers.file_name,
f"{doc_vers.id}.{extension}",
file
)
doc_vers.bucket_name = bn
doc_vers.object_name = on
doc_vers.file_size_mb = size / 1048576 # Convert bytes to MB
doc_vers.file_size = size / 1048576 # Convert bytes to MB
db.session.commit()
current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for '
@@ -222,10 +222,9 @@ def process_multiple_urls(urls, tenant_id, api_input):
def start_embedding_task(tenant_id, doc_vers_id):
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
tenant_id,
doc_vers_id,
])
task = current_celery.send_task('create_embeddings',
args=[tenant_id, doc_vers_id,],
queue='embeddings')
current_app.logger.info(f'Embedding creation started for tenant {tenant_id}, '
f'Document Version {doc_vers_id}. '
f'Embedding creation task: {task.id}')
@@ -321,16 +320,16 @@ def refresh_document_with_info(doc_id, api_input):
upload_file_for_version(new_doc_vers, file_content, extension, doc.tenant_id)
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
doc.tenant_id,
new_doc_vers.id,
])
task = current_celery.send_task('create_embeddings', args=[doc.tenant_id, new_doc_vers.id,], queue='embeddings')
current_app.logger.info(f'Embedding creation started for document {doc_id} on version {new_doc_vers.id} '
f'with task id: {task.id}.')
return new_doc_vers, task.id
# Update the existing refresh_document function to use the new refresh_document_with_info
def refresh_document(doc_id):
current_app.logger.info(f'Refreshing document {doc_id}')
doc = Document.query.get_or_404(doc_id)
old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first()

View File

@@ -54,9 +54,7 @@ class MinioClient:
except S3Error as err:
raise Exception(f"Error occurred while uploading file: {err}")
def download_document_file(self, tenant_id, document_id, language, version_id, filename):
bucket_name = self.generate_bucket_name(tenant_id)
object_name = self.generate_object_name(document_id, language, version_id, filename)
def download_document_file(self, tenant_id, bucket_name, object_name):
try:
response = self.client.get_object(bucket_name, object_name)
return response.read()

View File

@@ -6,7 +6,6 @@ def prefixed_url_for(endpoint, **values):
prefix = request.headers.get('X-Forwarded-Prefix', '')
scheme = request.headers.get('X-Forwarded-Proto', request.scheme)
host = request.headers.get('Host', request.host)
current_app.logger.debug(f'prefix: {prefix}, scheme: {scheme}, host: {host}')
external = values.pop('_external', False)
generated_url = url_for(endpoint, **values)