Files
eveAI/eveai_entitlements/tasks.py
Josako b4f7b210e0 - Improvement of Entitlements Domain
- Introduction of LicensePeriod
  - Introduction of Payments
  - Introduction of Invoices
- Services definitions for Entitlements Domain
2025-05-16 09:06:13 +02:00

187 lines
8.2 KiB
Python

import io
import os
from datetime import datetime as dt, timezone as tz, datetime
from flask import current_app
from sqlalchemy import or_, and_, text
from sqlalchemy.exc import SQLAlchemyError
from common.extensions import db
from common.models.user import Tenant
from common.models.entitlements import BusinessEventLog, LicenseUsage, License
from common.services.entitlements.license_period_services import LicensePeriodServices
from common.utils.celery_utils import current_celery
from common.utils.eveai_exceptions import EveAINoLicenseForTenant, EveAIException, EveAINoActiveLicense
from common.utils.database import Database
# Healthcheck task
@current_celery.task(name='ping', queue='entitlements')
def ping():
return 'pong'
@current_celery.task(name='persist_business_events', queue='entitlements')
def persist_business_events(log_entries):
"""
Persist multiple business event logs to the database in a single transaction
Args:
log_entries: List of log event dictionaries to persist
"""
try:
event_logs = []
for entry in log_entries:
event_log = BusinessEventLog(
timestamp=entry.pop('timestamp'),
event_type=entry.pop('event_type'),
tenant_id=entry.pop('tenant_id'),
trace_id=entry.pop('trace_id'),
span_id=entry.pop('span_id', None),
span_name=entry.pop('span_name', None),
parent_span_id=entry.pop('parent_span_id', None),
document_version_id=entry.pop('document_version_id', None),
document_version_file_size=entry.pop('document_version_file_size', None),
specialist_id=entry.pop('specialist_id', None),
specialist_type=entry.pop('specialist_type', None),
specialist_type_version=entry.pop('specialist_type_version', None),
chat_session_id=entry.pop('chat_session_id', None),
interaction_id=entry.pop('interaction_id', None),
environment=entry.pop('environment', None),
llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None),
llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None),
llm_metrics_completion_tokens=entry.pop('llm_metrics_completion_tokens', None),
llm_metrics_nr_of_pages=entry.pop('llm_metrics_nr_of_pages', None),
llm_metrics_total_time=entry.pop('llm_metrics_total_time', None),
llm_metrics_call_count=entry.pop('llm_metrics_call_count', None),
llm_interaction_type=entry.pop('llm_interaction_type', None),
message=entry.pop('message', None)
)
event_logs.append(event_log)
# Perform a bulk insert of all entries
db.session.bulk_save_objects(event_logs)
db.session.commit()
current_app.logger.info(f"Successfully persisted {len(event_logs)} business event logs")
tenant_id = event_logs[0].tenant_id
try:
license_period = LicensePeriodServices.find_current_license_period_for_usage(tenant_id)
except EveAIException as e:
current_app.logger.error(f"Failed to find license period for tenant {tenant_id}: {str(e)}")
return
lic_usage = None
if not license_period.license_usage:
lic_usage = LicenseUsage(
tenant_id=tenant_id,
license_period_id=license_period.id,
)
try:
db.session.add(lic_usage)
db.session.commit()
current_app.logger.info(f"Created new license usage for tenant {tenant_id}")
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.error(f"Error trying to create license usage for tenant {tenant_id}: {str(e)}")
return
else:
lic_usage = license_period.license_usage
process_logs_for_license_usage(tenant_id, lic_usage, event_logs)
except Exception as e:
current_app.logger.error(f"Failed to persist business event logs: {e}")
db.session.rollback()
def process_logs_for_license_usage(tenant_id, license_usage, logs):
# Initialize variables to accumulate usage data
embedding_mb_used = 0
embedding_prompt_tokens_used = 0
embedding_completion_tokens_used = 0
embedding_total_tokens_used = 0
interaction_prompt_tokens_used = 0
interaction_completion_tokens_used = 0
interaction_total_tokens_used = 0
recalculate_storage = False
# Process each log
for log in logs:
# Case for 'Create Embeddings' event
if log.event_type == 'Create Embeddings':
recalculate_storage = True
if log.message == 'Starting Trace for Create Embeddings':
embedding_mb_used += log.document_version_file_size
elif log.message == 'Final LLM Metrics':
embedding_prompt_tokens_used += log.llm_metrics_prompt_tokens
embedding_completion_tokens_used += log.llm_metrics_completion_tokens
embedding_total_tokens_used += log.llm_metrics_total_tokens
# Case for 'Ask Question' event
elif log.event_type == 'Ask Question':
if log.message == 'Final LLM Metrics':
interaction_prompt_tokens_used += log.llm_metrics_prompt_tokens
interaction_completion_tokens_used += log.llm_metrics_completion_tokens
interaction_total_tokens_used += log.llm_metrics_total_tokens
# Case for 'Specialist Execution' event
elif log.event_type == 'Execute Specialist':
if log.message == 'Final LLM Metrics':
if log.span_name == 'Specialist Retrieval': # This is embedding
embedding_prompt_tokens_used += log.llm_metrics_prompt_tokens
embedding_completion_tokens_used += log.llm_metrics_completion_tokens
embedding_total_tokens_used += log.llm_metrics_total_tokens
else: # This is an interaction
interaction_prompt_tokens_used += log.llm_metrics_prompt_tokens
interaction_completion_tokens_used += log.llm_metrics_completion_tokens
interaction_total_tokens_used += log.llm_metrics_total_tokens
# Mark the log as processed by setting the license_usage_id
log.license_usage_id = license_usage.id
# Update the LicenseUsage record with the accumulated values
license_usage.embedding_mb_used += embedding_mb_used
license_usage.embedding_prompt_tokens_used += embedding_prompt_tokens_used
license_usage.embedding_completion_tokens_used += embedding_completion_tokens_used
license_usage.embedding_total_tokens_used += embedding_total_tokens_used
license_usage.interaction_prompt_tokens_used += interaction_prompt_tokens_used
license_usage.interaction_completion_tokens_used += interaction_completion_tokens_used
license_usage.interaction_total_tokens_used += interaction_total_tokens_used
if recalculate_storage:
recalculate_storage_for_tenant(tenant_id)
# Commit the updates to the LicenseUsage and log records
try:
db.session.add(license_usage)
for log in logs:
db.session.add(log)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.error(f"Error trying to update license usage and logs for tenant {tenant_id}: {e}")
raise e
def recalculate_storage_for_tenant(tenant_id):
Database(tenant_id).switch_schema()
# Perform a SUM operation to get the total file size from document_versions
total_storage = db.session.execute(text(f"""
SELECT SUM(file_size)
FROM document_version
""")).scalar()
# Update the LicenseUsage with the recalculated storage
license_usage = db.session.query(LicenseUsage).filter_by(tenant_id=tenant_id).first()
license_usage.storage_mb_used = total_storage
# Commit the changes
try:
db.session.add(license_usage)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.error(f"Error trying to update tenant {tenant_id} for Dirty Storage. ")