import os import time import uuid from contextlib import contextmanager, asynccontextmanager from datetime import datetime from typing import Dict, Any, Optional, List from datetime import datetime as dt, timezone as tz import logging from flask import current_app from prometheus_client import Counter, Histogram, Gauge, Summary, push_to_gateway, REGISTRY from .business_event_context import BusinessEventContext from common.models.entitlements import BusinessEventLog from common.extensions import db from .celery_utils import current_celery from common.utils.prometheus_utils import sanitize_label # Standard duration buckets for all histograms DURATION_BUCKETS = [0.1, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 240, 360, float('inf')] # Prometheus metrics for business events TRACE_COUNTER = Counter( 'eveai_business_events_total', 'Total number of business events triggered', ['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'] ) TRACE_DURATION = Histogram( 'eveai_business_events_duration_seconds', 'Duration of business events in seconds', ['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'], buckets=DURATION_BUCKETS ) CONCURRENT_TRACES = Gauge( 'eveai_business_events_concurrent', 'Number of concurrent business events', ['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'] ) SPAN_COUNTER = Counter( 'eveai_business_spans_total', 'Total number of spans within business events', ['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'] ) SPAN_DURATION = Histogram( 'eveai_business_spans_duration_seconds', 'Duration of spans within business events in seconds', ['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'], buckets=DURATION_BUCKETS ) CONCURRENT_SPANS = Gauge( 'eveai_business_spans_concurrent', 'Number of concurrent spans within business events', ['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'] ) # LLM Usage metrics LLM_TOKENS_COUNTER = Counter( 'eveai_llm_tokens_total', 'Total number of tokens used in LLM calls', ['tenant_id', 'event_type', 'interaction_type', 'token_type', 'specialist_id', 'specialist_type', 'specialist_type_version'] ) LLM_DURATION = Histogram( 'eveai_llm_duration_seconds', 'Duration of LLM API calls in seconds', ['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version'], buckets=DURATION_BUCKETS ) LLM_CALLS_COUNTER = Counter( 'eveai_llm_calls_total', 'Total number of LLM API calls', ['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version'] ) class BusinessEvent: # The BusinessEvent class itself is a context manager, but it doesn't use the @contextmanager decorator. # Instead, it defines __enter__ and __exit__ methods explicitly. This is because we're doing something a bit more # complex - we're interacting with the BusinessEventContext and the _business_event_stack. def __init__(self, event_type: str, tenant_id: int, **kwargs): self.event_type = event_type self.tenant_id = tenant_id self.trace_id = str(uuid.uuid4()) self.span_id = None self.span_name = None self.parent_span_id = None self.document_version_id = kwargs.get('document_version_id') self.document_version_file_size = kwargs.get('document_version_file_size') self.chat_session_id = kwargs.get('chat_session_id') self.interaction_id = kwargs.get('interaction_id') self.specialist_id = kwargs.get('specialist_id') self.specialist_type = kwargs.get('specialist_type') self.specialist_type_version = kwargs.get('specialist_type_version') self.environment = os.environ.get("FLASK_ENV", "development") self.span_counter = 0 self.spans = [] self.llm_metrics = { 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0, 'nr_of_pages': 0, 'total_time': 0, 'call_count': 0, 'interaction_type': None } self._log_buffer = [] # Prometheus label values must be strings self.tenant_id_str = str(self.tenant_id) self.event_type_str = sanitize_label(self.event_type) self.specialist_id_str = str(self.specialist_id) if self.specialist_id else "" self.specialist_type_str = str(self.specialist_type) if self.specialist_type else "" self.specialist_type_version_str = sanitize_label(str(self.specialist_type_version)) \ if self.specialist_type_version else "" self.span_name_str = "" # Increment concurrent events gauge when initialized CONCURRENT_TRACES.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc() # Increment trace counter TRACE_COUNTER.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc() self._push_to_gateway() def update_attribute(self, attribute: str, value: any): if hasattr(self, attribute): setattr(self, attribute, value) # Update string versions for Prometheus labels if needed if attribute == 'specialist_id': self.specialist_id_str = str(value) if value else "" elif attribute == 'specialist_type': self.specialist_type_str = str(value) if value else "" elif attribute == 'specialist_type_version': self.specialist_type_version_str = sanitize_label(str(value)) if value else "" elif attribute == 'tenant_id': self.tenant_id_str = str(value) elif attribute == 'event_type': self.event_type_str = sanitize_label(value) elif attribute == 'span_name': self.span_name_str = sanitize_label(value) else: raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attribute}'") def update_llm_metrics(self, metrics: dict): self.llm_metrics['total_tokens'] += metrics.get('total_tokens', 0) self.llm_metrics['prompt_tokens'] += metrics.get('prompt_tokens', 0) self.llm_metrics['completion_tokens'] += metrics.get('completion_tokens', 0) self.llm_metrics['nr_of_pages'] += metrics.get('nr_of_pages', 0) self.llm_metrics['total_time'] += metrics.get('time_elapsed', 0) self.llm_metrics['call_count'] += 1 self.llm_metrics['interaction_type'] = metrics['interaction_type'] # Track in Prometheus metrics interaction_type_str = sanitize_label(metrics['interaction_type']) if metrics['interaction_type'] else "" # Track token usage LLM_TOKENS_COUNTER.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, interaction_type=interaction_type_str, token_type='total', specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc(metrics.get('total_tokens', 0)) LLM_TOKENS_COUNTER.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, interaction_type=interaction_type_str, token_type='prompt', specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc(metrics.get('prompt_tokens', 0)) LLM_TOKENS_COUNTER.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, interaction_type=interaction_type_str, token_type='completion', specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc(metrics.get('completion_tokens', 0)) # Track duration LLM_DURATION.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, interaction_type=interaction_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).observe(metrics.get('time_elapsed', 0)) # Track call count LLM_CALLS_COUNTER.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, interaction_type=interaction_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc() self._push_to_gateway() def reset_llm_metrics(self): self.llm_metrics['total_tokens'] = 0 self.llm_metrics['prompt_tokens'] = 0 self.llm_metrics['completion_tokens'] = 0 self.llm_metrics['nr_of_pages'] = 0 self.llm_metrics['total_time'] = 0 self.llm_metrics['call_count'] = 0 self.llm_metrics['interaction_type'] = None @contextmanager def create_span(self, span_name: str): # The create_span method is designed to be used as a context manager. We want to perform some actions when # entering the span (like setting the span ID and name) and some actions when exiting the span (like removing # these temporary attributes). The @contextmanager decorator allows us to write this method in a way that # clearly separates the "entry" and "exit" logic, with the yield statement in between. parent_span_id = self.span_id self.span_counter += 1 new_span_id = str(uuid.uuid4()) # Save the current span info self.spans.append((self.span_id, self.span_name, self.parent_span_id)) # Set the new span info self.span_id = new_span_id self.span_name = span_name self.span_name_str = sanitize_label(span_name) if span_name else "" self.parent_span_id = parent_span_id # Track start time for the span span_start_time = time.time() # Increment span metrics - using span_name as activity_name for metrics SPAN_COUNTER.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc() # Increment concurrent spans gauge CONCURRENT_SPANS.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc() self._push_to_gateway() self.log(f"Start") try: yield finally: # Calculate total time for this span span_total_time = time.time() - span_start_time # Observe span duration SPAN_DURATION.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).observe(span_total_time) # Decrement concurrent spans gauge CONCURRENT_SPANS.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).dec() self._push_to_gateway() if self.llm_metrics['call_count'] > 0: self.log_final_metrics() self.reset_llm_metrics() self.log(f"End", extra_fields={'span_duration': span_total_time}) # Restore the previous span info if self.spans: self.span_id, self.span_name, self.parent_span_id = self.spans.pop() self.span_name_str = sanitize_label(span_name) if span_name else "" else: self.span_id = None self.span_name = None self.parent_span_id = None self.span_name_str = "" @asynccontextmanager async def create_span_async(self, span_name: str): """Async version of create_span using async context manager""" parent_span_id = self.span_id self.span_counter += 1 new_span_id = str(uuid.uuid4()) # Save the current span info self.spans.append((self.span_id, self.span_name, self.parent_span_id)) # Set the new span info self.span_id = new_span_id self.span_name = span_name self.span_name_str = sanitize_label(span_name) if span_name else "" self.parent_span_id = parent_span_id # Track start time for the span span_start_time = time.time() # Increment span metrics - using span_name as activity_name for metrics SPAN_COUNTER.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc() # Increment concurrent spans gauge CONCURRENT_SPANS.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).inc() self._push_to_gateway() self.log(f"Start") try: yield finally: # Calculate total time for this span span_total_time = time.time() - span_start_time # Observe span duration SPAN_DURATION.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).observe(span_total_time) # Decrement concurrent spans gauge CONCURRENT_SPANS.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, activity_name=self.span_name_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).dec() self._push_to_gateway() if self.llm_metrics['call_count'] > 0: self.log_final_metrics() self.reset_llm_metrics() self.log(f"End", extra_fields={'span_duration': span_total_time}) # Restore the previous span info if self.spans: self.span_id, self.span_name, self.parent_span_id = self.spans.pop() self.span_name_str = sanitize_label(span_name) if span_name else "" else: self.span_id = None self.span_name = None self.parent_span_id = None self.span_name_str = "" def log(self, message: str, level: str = 'info', extra_fields: Dict[str, Any] = None): log_data = { 'timestamp': dt.now(tz=tz.utc), 'event_type': self.event_type, 'tenant_id': self.tenant_id, 'trace_id': self.trace_id, 'span_id': self.span_id, 'span_name': self.span_name, 'parent_span_id': self.parent_span_id, 'document_version_id': self.document_version_id, 'document_version_file_size': self.document_version_file_size, 'chat_session_id': self.chat_session_id, 'interaction_id': self.interaction_id, 'specialist_id': self.specialist_id, 'specialist_type': self.specialist_type, 'specialist_type_version': self.specialist_type_version, 'environment': self.environment, 'message': message, } # Add any extra fields if extra_fields: for key, value in extra_fields.items(): # For span/trace duration, use the llm_metrics_total_time field if key == 'span_duration' or key == 'trace_duration': log_data['llm_metrics_total_time'] = value else: log_data[key] = value self._log_buffer.append(log_data) def log_llm_metrics(self, metrics: dict, level: str = 'info'): self.update_llm_metrics(metrics) message = "LLM Metrics" logger = logging.getLogger('business_events') log_data = { 'timestamp': dt.now(tz=tz.utc), 'event_type': self.event_type, 'tenant_id': self.tenant_id, 'trace_id': self.trace_id, 'span_id': self.span_id, 'span_name': self.span_name, 'parent_span_id': self.parent_span_id, 'document_version_id': self.document_version_id, 'document_version_file_size': self.document_version_file_size, 'chat_session_id': self.chat_session_id, 'interaction_id': self.interaction_id, 'specialist_id': self.specialist_id, 'specialist_type': self.specialist_type, 'specialist_type_version': self.specialist_type_version, 'environment': self.environment, 'llm_metrics_total_tokens': metrics.get('total_tokens', 0), 'llm_metrics_prompt_tokens': metrics.get('prompt_tokens', 0), 'llm_metrics_completion_tokens': metrics.get('completion_tokens', 0), 'llm_metrics_nr_of_pages': metrics.get('nr_of_pages', 0), 'llm_metrics_total_time': metrics.get('time_elapsed', 0), 'llm_interaction_type': metrics['interaction_type'], 'message': message, } self._log_buffer.append(log_data) def log_final_metrics(self, level: str = 'info'): logger = logging.getLogger('business_events') message = "Final LLM Metrics" log_data = { 'timestamp': dt.now(tz=tz.utc), 'event_type': self.event_type, 'tenant_id': self.tenant_id, 'trace_id': self.trace_id, 'span_id': self.span_id, 'span_name': self.span_name, 'parent_span_id': self.parent_span_id, 'document_version_id': self.document_version_id, 'document_version_file_size': self.document_version_file_size, 'chat_session_id': self.chat_session_id, 'interaction_id': self.interaction_id, 'specialist_id': self.specialist_id, 'specialist_type': self.specialist_type, 'specialist_type_version': self.specialist_type_version, 'environment': self.environment, 'llm_metrics_total_tokens': self.llm_metrics['total_tokens'], 'llm_metrics_prompt_tokens': self.llm_metrics['prompt_tokens'], 'llm_metrics_completion_tokens': self.llm_metrics['completion_tokens'], 'llm_metrics_nr_of_pages': self.llm_metrics['nr_of_pages'], 'llm_metrics_total_time': self.llm_metrics['total_time'], 'llm_metrics_call_count': self.llm_metrics['call_count'], 'llm_interaction_type': self.llm_metrics['interaction_type'], 'message': message, } self._log_buffer.append(log_data) @staticmethod def _direct_db_persist(log_entries: List[Dict[str, Any]]): """Fallback method to directly persist logs to DB if async fails""" try: db_entries = [] for entry in log_entries: event_log = BusinessEventLog( timestamp=entry.pop('timestamp'), event_type=entry.pop('event_type'), tenant_id=entry.pop('tenant_id'), trace_id=entry.pop('trace_id'), span_id=entry.pop('span_id', None), span_name=entry.pop('span_name', None), parent_span_id=entry.pop('parent_span_id', None), document_version_id=entry.pop('document_version_id', None), document_version_file_size=entry.pop('document_version_file_size', None), chat_session_id=entry.pop('chat_session_id', None), interaction_id=entry.pop('interaction_id', None), specialist_id=entry.pop('specialist_id', None), specialist_type=entry.pop('specialist_type', None), specialist_type_version=entry.pop('specialist_type_version', None), environment=entry.pop('environment', None), llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None), llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None), llm_metrics_completion_tokens=entry.pop('llm_metrics_completion_tokens', None), llm_metrics_total_time=entry.pop('llm_metrics_total_time', None), llm_metrics_call_count=entry.pop('llm_metrics_call_count', None), llm_interaction_type=entry.pop('llm_interaction_type', None), message=entry.pop('message', None) ) db_entries.append(event_log) # Bulk insert db.session.bulk_save_objects(db_entries) db.session.commit() except Exception as e: logger = logging.getLogger('business_events') logger.error(f"Failed to persist logs directly to DB: {e}") db.session.rollback() def _flush_log_buffer(self): """Flush the log buffer to the database via a Celery task""" if self._log_buffer: try: # Send to Celery task current_celery.send_task( 'persist_business_events', args=[self._log_buffer], queue='entitlements' # Or dedicated log queue ) except Exception as e: # Fallback to direct DB write in case of issues with Celery logger = logging.getLogger('business_events') logger.error(f"Failed to send logs to Celery. Falling back to direct DB: {e}") self._direct_db_persist(self._log_buffer) # Clear the buffer after sending self._log_buffer = [] def _push_to_gateway(self): # Push metrics to the gateway try: push_to_gateway( current_app.config['PUSH_GATEWAY_URL'], job=current_app.config['COMPONENT_NAME'], registry=REGISTRY ) except Exception as e: current_app.logger.error(f"Failed to push metrics to Prometheus Push Gateway: {e}") def __enter__(self): self.trace_start_time = time.time() self.log(f'Starting Trace for {self.event_type}') return BusinessEventContext(self).__enter__() def __exit__(self, exc_type, exc_val, exc_tb): trace_total_time = time.time() - self.trace_start_time # Record trace duration TRACE_DURATION.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).observe(trace_total_time) # Decrement concurrent traces gauge CONCURRENT_TRACES.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).dec() self._push_to_gateway() if self.llm_metrics['call_count'] > 0: self.log_final_metrics() self.reset_llm_metrics() self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time}) self._flush_log_buffer() return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb) async def __aenter__(self): self.trace_start_time = time.time() self.log(f'Starting Trace for {self.event_type}') return await BusinessEventContext(self).__aenter__() async def __aexit__(self, exc_type, exc_val, exc_tb): trace_total_time = time.time() - self.trace_start_time # Record trace duration TRACE_DURATION.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).observe(trace_total_time) # Decrement concurrent traces gauge CONCURRENT_TRACES.labels( tenant_id=self.tenant_id_str, event_type=self.event_type_str, specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str ).dec() self._push_to_gateway() if self.llm_metrics['call_count'] > 0: self.log_final_metrics() self.reset_llm_metrics() self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time}) self._flush_log_buffer() return await BusinessEventContext(self).__aexit__(exc_type, exc_val, exc_tb)