import io import os from datetime import datetime as dt, timezone as tz, datetime from flask import current_app from celery import states from sqlalchemy import or_, and_, text from sqlalchemy.exc import SQLAlchemyError, InterfaceError, OperationalError from common.extensions import db from common.models.user import Tenant from common.models.entitlements import BusinessEventLog, LicenseUsage, License from common.services.entitlements import LicensePeriodServices from common.utils.celery_utils import current_celery from common.utils.eveai_exceptions import EveAINoLicenseForTenant, EveAIException, EveAINoActiveLicense from common.utils.database import Database # Healthcheck task @current_celery.task(name='ping', queue='entitlements') def ping(): return 'pong' @current_celery.task(bind=True, name='persist_business_events', queue='entitlements', autoretry_for=(InterfaceError, OperationalError), retry_backoff=True, retry_jitter=True, max_retries=5) def persist_business_events(self, log_entries): """ Persist multiple business event logs to the database in a single transaction Args: log_entries: List of log event dictionaries to persist """ event_logs = [] for entry in log_entries: event_log = BusinessEventLog( timestamp=entry.pop('timestamp'), event_type=entry.pop('event_type'), tenant_id=entry.pop('tenant_id'), trace_id=entry.pop('trace_id'), span_id=entry.pop('span_id', None), span_name=entry.pop('span_name', None), parent_span_id=entry.pop('parent_span_id', None), document_version_id=entry.pop('document_version_id', None), document_version_file_size=entry.pop('document_version_file_size', None), specialist_id=entry.pop('specialist_id', None), specialist_type=entry.pop('specialist_type', None), specialist_type_version=entry.pop('specialist_type_version', None), chat_session_id=entry.pop('chat_session_id', None), interaction_id=entry.pop('interaction_id', None), environment=entry.pop('environment', None), llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None), llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None), llm_metrics_completion_tokens=entry.pop('llm_metrics_completion_tokens', None), llm_metrics_nr_of_pages=entry.pop('llm_metrics_nr_of_pages', None), llm_metrics_total_time=entry.pop('llm_metrics_total_time', None), llm_metrics_call_count=entry.pop('llm_metrics_call_count', None), llm_interaction_type=entry.pop('llm_interaction_type', None), message=entry.pop('message', None) ) db.session.add(event_log) event_logs.append(event_log) # Perform a bulk insert of all entries db.session.commit() current_app.logger.info(f"Successfully persisted {len(event_logs)} business event logs") tenant_id = event_logs[0].tenant_id try: license_period = LicensePeriodServices.find_current_license_period_for_usage(tenant_id) except EveAIException as e: current_app.logger.error(f"Failed to find license period for tenant {tenant_id}: {str(e)}") raise e lic_usage = None if not license_period.license_usage: lic_usage = LicenseUsage( tenant_id=tenant_id, license_period_id=license_period.id, ) try: db.session.add(lic_usage) db.session.commit() current_app.logger.info(f"Created new license usage for tenant {tenant_id}") except SQLAlchemyError as e: db.session.rollback() current_app.logger.error(f"Error trying to create license usage for tenant {tenant_id}: {str(e)}") raise e else: lic_usage = license_period.license_usage process_logs_for_license_usage(tenant_id, lic_usage, event_logs) def process_logs_for_license_usage(tenant_id, license_usage, logs): # Initialize variables to accumulate usage data embedding_mb_used = 0 embedding_prompt_tokens_used = 0 embedding_completion_tokens_used = 0 embedding_total_tokens_used = 0 interaction_prompt_tokens_used = 0 interaction_completion_tokens_used = 0 interaction_total_tokens_used = 0 current_app.logger.info(f"Processing {len(logs)} logs for tenant {tenant_id}") recalculate_storage = False # Process each log for log in logs: # Case for 'Create Embeddings' event current_app.logger.debug(f"Processing log for tenant {tenant_id}: {log.id} - {log.event_type} - {log.message}") if log.event_type == 'Create Embeddings': current_app.logger.debug(f"In Create Embeddings") recalculate_storage = True if log.message == 'Starting Trace for Create Embeddings': embedding_mb_used += log.document_version_file_size elif log.message == 'Final LLM Metrics': embedding_prompt_tokens_used += log.llm_metrics_prompt_tokens embedding_completion_tokens_used += log.llm_metrics_completion_tokens embedding_total_tokens_used += log.llm_metrics_total_tokens # Case for 'Ask Question' event elif log.event_type == 'Ask Question': if log.message == 'Final LLM Metrics': interaction_prompt_tokens_used += log.llm_metrics_prompt_tokens interaction_completion_tokens_used += log.llm_metrics_completion_tokens interaction_total_tokens_used += log.llm_metrics_total_tokens # Case for 'Specialist Execution' event elif log.event_type == 'Execute Specialist': if log.message == 'Final LLM Metrics': if log.span_name == 'Specialist Retrieval': # This is embedding embedding_prompt_tokens_used += log.llm_metrics_prompt_tokens embedding_completion_tokens_used += log.llm_metrics_completion_tokens embedding_total_tokens_used += log.llm_metrics_total_tokens else: # This is an interaction interaction_prompt_tokens_used += log.llm_metrics_prompt_tokens interaction_completion_tokens_used += log.llm_metrics_completion_tokens interaction_total_tokens_used += log.llm_metrics_total_tokens # Mark the log as processed by setting the license_usage_id log.license_usage_id = license_usage.id db.session.add(log) current_app.logger.debug(f"Finished processing {len(logs)} logs for tenant {tenant_id}") current_app.logger.debug(f"Embedding MB Used: {embedding_mb_used}") current_app.logger.debug(f"Embedding Prompt Tokens Used: {embedding_prompt_tokens_used}") current_app.logger.debug(f"Embedding Completion Tokens Used: {embedding_completion_tokens_used}") current_app.logger.debug(f"Embedding Total Tokens Used: {embedding_total_tokens_used}") # Update the LicenseUsage record with the accumulated values license_usage.embedding_mb_used += embedding_mb_used license_usage.embedding_prompt_tokens_used += embedding_prompt_tokens_used license_usage.embedding_completion_tokens_used += embedding_completion_tokens_used license_usage.embedding_total_tokens_used += embedding_total_tokens_used license_usage.interaction_prompt_tokens_used += interaction_prompt_tokens_used license_usage.interaction_completion_tokens_used += interaction_completion_tokens_used license_usage.interaction_total_tokens_used += interaction_total_tokens_used if recalculate_storage: license_usage.recalculate_storage() # Commit the updates to the LicenseUsage and log records try: db.session.add(license_usage) db.session.commit() except SQLAlchemyError as e: db.session.rollback() current_app.logger.error(f"Error trying to update license usage and logs for tenant {tenant_id}: {e}") raise e