- ensure very long chunks get split into smaller chunks - ensure TrackedMistralAIEmbedding is batched if needed to ensure correct execution - upgraded some of the packages to a higher version
644 lines
27 KiB
Python
644 lines
27 KiB
Python
import os
|
|
import time
|
|
import uuid
|
|
from contextlib import contextmanager, asynccontextmanager
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
from datetime import datetime as dt, timezone as tz
|
|
import logging
|
|
|
|
from flask import current_app
|
|
from prometheus_client import Counter, Histogram, Gauge, Summary, push_to_gateway, REGISTRY
|
|
|
|
from .business_event_context import BusinessEventContext
|
|
from common.models.entitlements import BusinessEventLog
|
|
from common.extensions import db
|
|
from .celery_utils import current_celery
|
|
from common.utils.prometheus_utils import sanitize_label
|
|
|
|
# Standard duration buckets for all histograms
|
|
DURATION_BUCKETS = [0.1, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 240, 360, float('inf')]
|
|
|
|
# Prometheus metrics for business events
|
|
TRACE_COUNTER = Counter(
|
|
'eveai_business_events_total',
|
|
'Total number of business events triggered',
|
|
['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version']
|
|
)
|
|
|
|
TRACE_DURATION = Histogram(
|
|
'eveai_business_events_duration_seconds',
|
|
'Duration of business events in seconds',
|
|
['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'],
|
|
buckets=DURATION_BUCKETS
|
|
)
|
|
|
|
CONCURRENT_TRACES = Gauge(
|
|
'eveai_business_events_concurrent',
|
|
'Number of concurrent business events',
|
|
['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version']
|
|
)
|
|
|
|
SPAN_COUNTER = Counter(
|
|
'eveai_business_spans_total',
|
|
'Total number of spans within business events',
|
|
['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version']
|
|
)
|
|
|
|
SPAN_DURATION = Histogram(
|
|
'eveai_business_spans_duration_seconds',
|
|
'Duration of spans within business events in seconds',
|
|
['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'],
|
|
buckets=DURATION_BUCKETS
|
|
)
|
|
|
|
CONCURRENT_SPANS = Gauge(
|
|
'eveai_business_spans_concurrent',
|
|
'Number of concurrent spans within business events',
|
|
['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version']
|
|
)
|
|
|
|
# LLM Usage metrics
|
|
LLM_TOKENS_COUNTER = Counter(
|
|
'eveai_llm_tokens_total',
|
|
'Total number of tokens used in LLM calls',
|
|
['tenant_id', 'event_type', 'interaction_type', 'token_type', 'specialist_id', 'specialist_type',
|
|
'specialist_type_version']
|
|
)
|
|
|
|
LLM_DURATION = Histogram(
|
|
'eveai_llm_duration_seconds',
|
|
'Duration of LLM API calls in seconds',
|
|
['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version'],
|
|
buckets=DURATION_BUCKETS
|
|
)
|
|
|
|
LLM_CALLS_COUNTER = Counter(
|
|
'eveai_llm_calls_total',
|
|
'Total number of LLM API calls',
|
|
['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version']
|
|
)
|
|
|
|
|
|
class BusinessEvent:
|
|
# The BusinessEvent class itself is a context manager, but it doesn't use the @contextmanager decorator.
|
|
# Instead, it defines __enter__ and __exit__ methods explicitly. This is because we're doing something a bit more
|
|
# complex - we're interacting with the BusinessEventContext and the _business_event_stack.
|
|
|
|
def __init__(self, event_type: str, tenant_id: int, **kwargs):
|
|
self.event_type = event_type
|
|
self.tenant_id = tenant_id
|
|
self.trace_id = str(uuid.uuid4())
|
|
self.span_id = None
|
|
self.span_name = None
|
|
self.parent_span_id = None
|
|
self.document_version_id = kwargs.get('document_version_id')
|
|
self.document_version_file_size = kwargs.get('document_version_file_size')
|
|
self.chat_session_id = kwargs.get('chat_session_id')
|
|
self.interaction_id = kwargs.get('interaction_id')
|
|
self.specialist_id = kwargs.get('specialist_id')
|
|
self.specialist_type = kwargs.get('specialist_type')
|
|
self.specialist_type_version = kwargs.get('specialist_type_version')
|
|
self.environment = os.environ.get("FLASK_ENV", "development")
|
|
self.span_counter = 0
|
|
self.spans = []
|
|
self.llm_metrics = {
|
|
'total_tokens': 0,
|
|
'prompt_tokens': 0,
|
|
'completion_tokens': 0,
|
|
'nr_of_pages': 0,
|
|
'total_time': 0,
|
|
'call_count': 0,
|
|
'interaction_type': None
|
|
}
|
|
self._log_buffer = []
|
|
|
|
# Prometheus label values must be strings
|
|
self.tenant_id_str = str(self.tenant_id)
|
|
self.event_type_str = sanitize_label(self.event_type)
|
|
self.specialist_id_str = str(self.specialist_id) if self.specialist_id else ""
|
|
self.specialist_type_str = str(self.specialist_type) if self.specialist_type else ""
|
|
self.specialist_type_version_str = sanitize_label(str(self.specialist_type_version)) \
|
|
if self.specialist_type_version else ""
|
|
self.span_name_str = ""
|
|
|
|
# Increment concurrent events gauge when initialized
|
|
CONCURRENT_TRACES.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc()
|
|
|
|
# Increment trace counter
|
|
TRACE_COUNTER.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc()
|
|
|
|
self._push_to_gateway()
|
|
|
|
def update_attribute(self, attribute: str, value: any):
|
|
if hasattr(self, attribute):
|
|
setattr(self, attribute, value)
|
|
# Update string versions for Prometheus labels if needed
|
|
if attribute == 'specialist_id':
|
|
self.specialist_id_str = str(value) if value else ""
|
|
elif attribute == 'specialist_type':
|
|
self.specialist_type_str = str(value) if value else ""
|
|
elif attribute == 'specialist_type_version':
|
|
self.specialist_type_version_str = sanitize_label(str(value)) if value else ""
|
|
elif attribute == 'tenant_id':
|
|
self.tenant_id_str = str(value)
|
|
elif attribute == 'event_type':
|
|
self.event_type_str = sanitize_label(value)
|
|
elif attribute == 'span_name':
|
|
self.span_name_str = sanitize_label(value)
|
|
else:
|
|
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attribute}'")
|
|
|
|
def update_llm_metrics(self, metrics: dict):
|
|
self.llm_metrics['total_tokens'] += metrics.get('total_tokens', 0)
|
|
self.llm_metrics['prompt_tokens'] += metrics.get('prompt_tokens', 0)
|
|
self.llm_metrics['completion_tokens'] += metrics.get('completion_tokens', 0)
|
|
self.llm_metrics['nr_of_pages'] += metrics.get('nr_of_pages', 0)
|
|
self.llm_metrics['total_time'] += metrics.get('time_elapsed', 0)
|
|
self.llm_metrics['call_count'] += 1
|
|
self.llm_metrics['interaction_type'] = metrics['interaction_type']
|
|
|
|
# Track in Prometheus metrics
|
|
interaction_type_str = sanitize_label(metrics['interaction_type']) if metrics['interaction_type'] else ""
|
|
|
|
# Track token usage
|
|
LLM_TOKENS_COUNTER.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
interaction_type=interaction_type_str,
|
|
token_type='total',
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc(metrics.get('total_tokens', 0))
|
|
|
|
LLM_TOKENS_COUNTER.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
interaction_type=interaction_type_str,
|
|
token_type='prompt',
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc(metrics.get('prompt_tokens', 0))
|
|
|
|
LLM_TOKENS_COUNTER.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
interaction_type=interaction_type_str,
|
|
token_type='completion',
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc(metrics.get('completion_tokens', 0))
|
|
|
|
# Track duration
|
|
LLM_DURATION.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
interaction_type=interaction_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).observe(metrics.get('time_elapsed', 0))
|
|
|
|
# Track call count
|
|
LLM_CALLS_COUNTER.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
interaction_type=interaction_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc()
|
|
|
|
self._push_to_gateway()
|
|
|
|
def reset_llm_metrics(self):
|
|
self.llm_metrics['total_tokens'] = 0
|
|
self.llm_metrics['prompt_tokens'] = 0
|
|
self.llm_metrics['completion_tokens'] = 0
|
|
self.llm_metrics['nr_of_pages'] = 0
|
|
self.llm_metrics['total_time'] = 0
|
|
self.llm_metrics['call_count'] = 0
|
|
self.llm_metrics['interaction_type'] = None
|
|
|
|
@contextmanager
|
|
def create_span(self, span_name: str):
|
|
# The create_span method is designed to be used as a context manager. We want to perform some actions when
|
|
# entering the span (like setting the span ID and name) and some actions when exiting the span (like removing
|
|
# these temporary attributes). The @contextmanager decorator allows us to write this method in a way that
|
|
# clearly separates the "entry" and "exit" logic, with the yield statement in between.
|
|
|
|
parent_span_id = self.span_id
|
|
self.span_counter += 1
|
|
new_span_id = str(uuid.uuid4())
|
|
|
|
# Save the current span info
|
|
self.spans.append((self.span_id, self.span_name, self.parent_span_id))
|
|
|
|
# Set the new span info
|
|
self.span_id = new_span_id
|
|
self.span_name = span_name
|
|
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
|
self.parent_span_id = parent_span_id
|
|
|
|
# Track start time for the span
|
|
span_start_time = time.time()
|
|
|
|
# Increment span metrics - using span_name as activity_name for metrics
|
|
SPAN_COUNTER.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc()
|
|
|
|
# Increment concurrent spans gauge
|
|
CONCURRENT_SPANS.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc()
|
|
|
|
self._push_to_gateway()
|
|
|
|
self.log(f"Start")
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
# Calculate total time for this span
|
|
span_total_time = time.time() - span_start_time
|
|
|
|
# Observe span duration
|
|
SPAN_DURATION.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).observe(span_total_time)
|
|
|
|
# Decrement concurrent spans gauge
|
|
CONCURRENT_SPANS.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).dec()
|
|
|
|
self._push_to_gateway()
|
|
|
|
if self.llm_metrics['call_count'] > 0:
|
|
self.log_final_metrics()
|
|
self.reset_llm_metrics()
|
|
self.log(f"End", extra_fields={'span_duration': span_total_time})
|
|
# Restore the previous span info
|
|
if self.spans:
|
|
self.span_id, self.span_name, self.parent_span_id = self.spans.pop()
|
|
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
|
else:
|
|
self.span_id = None
|
|
self.span_name = None
|
|
self.parent_span_id = None
|
|
self.span_name_str = ""
|
|
|
|
@asynccontextmanager
|
|
async def create_span_async(self, span_name: str):
|
|
"""Async version of create_span using async context manager"""
|
|
parent_span_id = self.span_id
|
|
self.span_counter += 1
|
|
new_span_id = str(uuid.uuid4())
|
|
|
|
# Save the current span info
|
|
self.spans.append((self.span_id, self.span_name, self.parent_span_id))
|
|
|
|
# Set the new span info
|
|
self.span_id = new_span_id
|
|
self.span_name = span_name
|
|
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
|
self.parent_span_id = parent_span_id
|
|
|
|
# Track start time for the span
|
|
span_start_time = time.time()
|
|
|
|
# Increment span metrics - using span_name as activity_name for metrics
|
|
SPAN_COUNTER.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc()
|
|
|
|
# Increment concurrent spans gauge
|
|
CONCURRENT_SPANS.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).inc()
|
|
|
|
self._push_to_gateway()
|
|
|
|
self.log(f"Start")
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
# Calculate total time for this span
|
|
span_total_time = time.time() - span_start_time
|
|
|
|
# Observe span duration
|
|
SPAN_DURATION.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).observe(span_total_time)
|
|
|
|
# Decrement concurrent spans gauge
|
|
CONCURRENT_SPANS.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
activity_name=self.span_name_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).dec()
|
|
|
|
self._push_to_gateway()
|
|
|
|
if self.llm_metrics['call_count'] > 0:
|
|
self.log_final_metrics()
|
|
self.reset_llm_metrics()
|
|
self.log(f"End", extra_fields={'span_duration': span_total_time})
|
|
# Restore the previous span info
|
|
if self.spans:
|
|
self.span_id, self.span_name, self.parent_span_id = self.spans.pop()
|
|
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
|
else:
|
|
self.span_id = None
|
|
self.span_name = None
|
|
self.parent_span_id = None
|
|
self.span_name_str = ""
|
|
|
|
def log(self, message: str, level: str = 'info', extra_fields: Dict[str, Any] = None):
|
|
log_data = {
|
|
'timestamp': dt.now(tz=tz.utc),
|
|
'event_type': self.event_type,
|
|
'tenant_id': self.tenant_id,
|
|
'trace_id': self.trace_id,
|
|
'span_id': self.span_id,
|
|
'span_name': self.span_name,
|
|
'parent_span_id': self.parent_span_id,
|
|
'document_version_id': self.document_version_id,
|
|
'document_version_file_size': self.document_version_file_size,
|
|
'chat_session_id': self.chat_session_id,
|
|
'interaction_id': self.interaction_id,
|
|
'specialist_id': self.specialist_id,
|
|
'specialist_type': self.specialist_type,
|
|
'specialist_type_version': self.specialist_type_version,
|
|
'environment': self.environment,
|
|
'message': message,
|
|
}
|
|
# Add any extra fields
|
|
if extra_fields:
|
|
for key, value in extra_fields.items():
|
|
# For span/trace duration, use the llm_metrics_total_time field
|
|
if key == 'span_duration' or key == 'trace_duration':
|
|
log_data['llm_metrics_total_time'] = value
|
|
else:
|
|
log_data[key] = value
|
|
|
|
self._log_buffer.append(log_data)
|
|
|
|
def log_llm_metrics(self, metrics: dict, level: str = 'info'):
|
|
self.update_llm_metrics(metrics)
|
|
message = "LLM Metrics"
|
|
logger = logging.getLogger('business_events')
|
|
log_data = {
|
|
'timestamp': dt.now(tz=tz.utc),
|
|
'event_type': self.event_type,
|
|
'tenant_id': self.tenant_id,
|
|
'trace_id': self.trace_id,
|
|
'span_id': self.span_id,
|
|
'span_name': self.span_name,
|
|
'parent_span_id': self.parent_span_id,
|
|
'document_version_id': self.document_version_id,
|
|
'document_version_file_size': self.document_version_file_size,
|
|
'chat_session_id': self.chat_session_id,
|
|
'interaction_id': self.interaction_id,
|
|
'specialist_id': self.specialist_id,
|
|
'specialist_type': self.specialist_type,
|
|
'specialist_type_version': self.specialist_type_version,
|
|
'environment': self.environment,
|
|
'llm_metrics_total_tokens': metrics.get('total_tokens', 0),
|
|
'llm_metrics_prompt_tokens': metrics.get('prompt_tokens', 0),
|
|
'llm_metrics_completion_tokens': metrics.get('completion_tokens', 0),
|
|
'llm_metrics_nr_of_pages': metrics.get('nr_of_pages', 0),
|
|
'llm_metrics_total_time': metrics.get('time_elapsed', 0),
|
|
'llm_interaction_type': metrics['interaction_type'],
|
|
'message': message,
|
|
}
|
|
self._log_buffer.append(log_data)
|
|
|
|
def log_final_metrics(self, level: str = 'info'):
|
|
logger = logging.getLogger('business_events')
|
|
message = "Final LLM Metrics"
|
|
log_data = {
|
|
'timestamp': dt.now(tz=tz.utc),
|
|
'event_type': self.event_type,
|
|
'tenant_id': self.tenant_id,
|
|
'trace_id': self.trace_id,
|
|
'span_id': self.span_id,
|
|
'span_name': self.span_name,
|
|
'parent_span_id': self.parent_span_id,
|
|
'document_version_id': self.document_version_id,
|
|
'document_version_file_size': self.document_version_file_size,
|
|
'chat_session_id': self.chat_session_id,
|
|
'interaction_id': self.interaction_id,
|
|
'specialist_id': self.specialist_id,
|
|
'specialist_type': self.specialist_type,
|
|
'specialist_type_version': self.specialist_type_version,
|
|
'environment': self.environment,
|
|
'llm_metrics_total_tokens': self.llm_metrics['total_tokens'],
|
|
'llm_metrics_prompt_tokens': self.llm_metrics['prompt_tokens'],
|
|
'llm_metrics_completion_tokens': self.llm_metrics['completion_tokens'],
|
|
'llm_metrics_nr_of_pages': self.llm_metrics['nr_of_pages'],
|
|
'llm_metrics_total_time': self.llm_metrics['total_time'],
|
|
'llm_metrics_call_count': self.llm_metrics['call_count'],
|
|
'llm_interaction_type': self.llm_metrics['interaction_type'],
|
|
'message': message,
|
|
}
|
|
self._log_buffer.append(log_data)
|
|
|
|
@staticmethod
|
|
def _direct_db_persist(log_entries: List[Dict[str, Any]]):
|
|
"""Fallback method to directly persist logs to DB if async fails"""
|
|
try:
|
|
db_entries = []
|
|
for entry in log_entries:
|
|
event_log = BusinessEventLog(
|
|
timestamp=entry.pop('timestamp'),
|
|
event_type=entry.pop('event_type'),
|
|
tenant_id=entry.pop('tenant_id'),
|
|
trace_id=entry.pop('trace_id'),
|
|
span_id=entry.pop('span_id', None),
|
|
span_name=entry.pop('span_name', None),
|
|
parent_span_id=entry.pop('parent_span_id', None),
|
|
document_version_id=entry.pop('document_version_id', None),
|
|
document_version_file_size=entry.pop('document_version_file_size', None),
|
|
chat_session_id=entry.pop('chat_session_id', None),
|
|
interaction_id=entry.pop('interaction_id', None),
|
|
specialist_id=entry.pop('specialist_id', None),
|
|
specialist_type=entry.pop('specialist_type', None),
|
|
specialist_type_version=entry.pop('specialist_type_version', None),
|
|
environment=entry.pop('environment', None),
|
|
llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None),
|
|
llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None),
|
|
llm_metrics_completion_tokens=entry.pop('llm_metrics_completion_tokens', None),
|
|
llm_metrics_total_time=entry.pop('llm_metrics_total_time', None),
|
|
llm_metrics_call_count=entry.pop('llm_metrics_call_count', None),
|
|
llm_interaction_type=entry.pop('llm_interaction_type', None),
|
|
message=entry.pop('message', None)
|
|
)
|
|
db_entries.append(event_log)
|
|
|
|
# Bulk insert
|
|
db.session.bulk_save_objects(db_entries)
|
|
db.session.commit()
|
|
except Exception as e:
|
|
logger = logging.getLogger('business_events')
|
|
logger.error(f"Failed to persist logs directly to DB: {e}")
|
|
db.session.rollback()
|
|
|
|
def _flush_log_buffer(self):
|
|
"""Flush the log buffer to the database via a Celery task"""
|
|
if self._log_buffer:
|
|
try:
|
|
# Send to Celery task
|
|
current_celery.send_task(
|
|
'persist_business_events',
|
|
args=[self._log_buffer],
|
|
queue='entitlements' # Or dedicated log queue
|
|
)
|
|
except Exception as e:
|
|
# Fallback to direct DB write in case of issues with Celery
|
|
logger = logging.getLogger('business_events')
|
|
logger.error(f"Failed to send logs to Celery. Falling back to direct DB: {e}")
|
|
self._direct_db_persist(self._log_buffer)
|
|
|
|
# Clear the buffer after sending
|
|
self._log_buffer = []
|
|
|
|
def _push_to_gateway(self):
|
|
# Push metrics to the gateway
|
|
try:
|
|
push_to_gateway(
|
|
current_app.config['PUSH_GATEWAY_URL'],
|
|
job=current_app.config['COMPONENT_NAME'],
|
|
registry=REGISTRY
|
|
)
|
|
except Exception as e:
|
|
current_app.logger.error(f"Failed to push metrics to Prometheus Push Gateway: {e}")
|
|
|
|
def __enter__(self):
|
|
self.trace_start_time = time.time()
|
|
self.log(f'Starting Trace for {self.event_type}')
|
|
return BusinessEventContext(self).__enter__()
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
trace_total_time = time.time() - self.trace_start_time
|
|
|
|
# Record trace duration
|
|
TRACE_DURATION.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).observe(trace_total_time)
|
|
|
|
# Decrement concurrent traces gauge
|
|
CONCURRENT_TRACES.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).dec()
|
|
|
|
self._push_to_gateway()
|
|
|
|
if self.llm_metrics['call_count'] > 0:
|
|
self.log_final_metrics()
|
|
self.reset_llm_metrics()
|
|
|
|
self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time})
|
|
self._flush_log_buffer()
|
|
|
|
|
|
return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb)
|
|
|
|
async def __aenter__(self):
|
|
self.trace_start_time = time.time()
|
|
self.log(f'Starting Trace for {self.event_type}')
|
|
return await BusinessEventContext(self).__aenter__()
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
trace_total_time = time.time() - self.trace_start_time
|
|
|
|
# Record trace duration
|
|
TRACE_DURATION.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).observe(trace_total_time)
|
|
|
|
# Decrement concurrent traces gauge
|
|
CONCURRENT_TRACES.labels(
|
|
tenant_id=self.tenant_id_str,
|
|
event_type=self.event_type_str,
|
|
specialist_id=self.specialist_id_str,
|
|
specialist_type=self.specialist_type_str,
|
|
specialist_type_version=self.specialist_type_version_str
|
|
).dec()
|
|
|
|
self._push_to_gateway()
|
|
|
|
if self.llm_metrics['call_count'] > 0:
|
|
self.log_final_metrics()
|
|
self.reset_llm_metrics()
|
|
|
|
self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time})
|
|
self._flush_log_buffer()
|
|
return await BusinessEventContext(self).__aexit__(exc_type, exc_val, exc_tb) |