- Adding Prometheus and grafana services in development

- Adding Prometheus metrics to the business events
- Ensure asynchronous behaviour of crewai specialists.
- Adapt Business events to working in mixed synchronous / asynchronous contexts
- Extend business events with specialist information
- Started adding a grafana dashboard (TBC)
This commit is contained in:
Josako
2025-03-24 16:39:22 +01:00
parent 238bdb58f4
commit b6ee7182de
25 changed files with 1337 additions and 83 deletions

View File

@@ -1,16 +1,81 @@
import os
import time
import uuid
from contextlib import contextmanager
from contextlib import contextmanager, asynccontextmanager
from datetime import datetime
from typing import Dict, Any, Optional, List
from datetime import datetime as dt, timezone as tz
import logging
from prometheus_client import Counter, Histogram, Gauge, Summary
from .business_event_context import BusinessEventContext
from common.models.entitlements import BusinessEventLog
from common.extensions import db
from .celery_utils import current_celery
from common.utils.performance_monitoring import EveAIMetrics
# Standard duration buckets for all histograms
DURATION_BUCKETS = EveAIMetrics.get_standard_buckets()
# Prometheus metrics for business events
TRACE_COUNTER = Counter(
'eveai_business_events_total',
'Total number of business events triggered',
['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version']
)
TRACE_DURATION = Histogram(
'eveai_business_events_duration_seconds',
'Duration of business events in seconds',
['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'],
buckets=DURATION_BUCKETS
)
CONCURRENT_TRACES = Gauge(
'eveai_business_events_concurrent',
'Number of concurrent business events',
['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version']
)
SPAN_COUNTER = Counter(
'eveai_business_spans_total',
'Total number of spans within business events',
['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version']
)
SPAN_DURATION = Histogram(
'eveai_business_spans_duration_seconds',
'Duration of spans within business events in seconds',
['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'],
buckets=DURATION_BUCKETS
)
CONCURRENT_SPANS = Gauge(
'eveai_business_spans_concurrent',
'Number of concurrent spans within business events',
['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version']
)
# LLM Usage metrics
LLM_TOKENS_COUNTER = Counter(
'eveai_llm_tokens_total',
'Total number of tokens used in LLM calls',
['tenant_id', 'event_type', 'interaction_type', 'token_type', 'specialist_id', 'specialist_type',
'specialist_type_version']
)
LLM_DURATION = Histogram(
'eveai_llm_duration_seconds',
'Duration of LLM API calls in seconds',
['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version'],
buckets=DURATION_BUCKETS
)
LLM_CALLS_COUNTER = Counter(
'eveai_llm_calls_total',
'Total number of LLM API calls',
['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version']
)
class BusinessEvent:
@@ -29,6 +94,9 @@ class BusinessEvent:
self.document_version_file_size = kwargs.get('document_version_file_size')
self.chat_session_id = kwargs.get('chat_session_id')
self.interaction_id = kwargs.get('interaction_id')
self.specialist_id = kwargs.get('specialist_id')
self.specialist_type = kwargs.get('specialist_type')
self.specialist_type_version = kwargs.get('specialist_type_version')
self.environment = os.environ.get("FLASK_ENV", "development")
self.span_counter = 0
self.spans = []
@@ -42,9 +110,42 @@ class BusinessEvent:
}
self._log_buffer = []
# Prometheus label values must be strings
self.tenant_id_str = str(self.tenant_id)
self.specialist_id_str = str(self.specialist_id) if self.specialist_id else ""
self.specialist_type_str = str(self.specialist_type) if self.specialist_type else ""
self.specialist_type_version_str = str(self.specialist_type_version) if self.specialist_type_version else ""
# Increment concurrent events gauge when initialized
CONCURRENT_TRACES.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc()
# Increment trace counter
TRACE_COUNTER.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc()
def update_attribute(self, attribute: str, value: any):
if hasattr(self, attribute):
setattr(self, attribute, value)
# Update string versions for Prometheus labels if needed
if attribute == 'specialist_id':
self.specialist_id_str = str(value) if value else ""
elif attribute == 'specialist_type':
self.specialist_type_str = str(value) if value else ""
elif attribute == 'specialist_type_version':
self.specialist_type_version_str = str(value) if value else ""
elif attribute == 'tenant_id':
self.tenant_id_str = str(value)
else:
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attribute}'")
@@ -56,6 +157,60 @@ class BusinessEvent:
self.llm_metrics['call_count'] += 1
self.llm_metrics['interaction_type'] = metrics['interaction_type']
# Track in Prometheus metrics
interaction_type = metrics['interaction_type']
# Track token usage
LLM_TOKENS_COUNTER.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
interaction_type=interaction_type,
token_type='total',
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc(metrics['total_tokens'])
LLM_TOKENS_COUNTER.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
interaction_type=interaction_type,
token_type='prompt',
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc(metrics['prompt_tokens'])
LLM_TOKENS_COUNTER.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
interaction_type=interaction_type,
token_type='completion',
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc(metrics['completion_tokens'])
# Track duration
LLM_DURATION.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
interaction_type=interaction_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).observe(metrics['time_elapsed'])
# Track call count
LLM_CALLS_COUNTER.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
interaction_type=interaction_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc()
def reset_llm_metrics(self):
self.llm_metrics['total_tokens'] = 0
self.llm_metrics['prompt_tokens'] = 0
@@ -86,6 +241,26 @@ class BusinessEvent:
# Track start time for the span
span_start_time = time.time()
# Increment span metrics - using span_name as activity_name for metrics
SPAN_COUNTER.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc()
# Increment concurrent spans gauge
CONCURRENT_SPANS.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc()
self.log(f"Start")
try:
@@ -94,6 +269,104 @@ class BusinessEvent:
# Calculate total time for this span
span_total_time = time.time() - span_start_time
# Observe span duration
SPAN_DURATION.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).observe(span_total_time)
# Decrement concurrent spans gauge
CONCURRENT_SPANS.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).dec()
if self.llm_metrics['call_count'] > 0:
self.log_final_metrics()
self.reset_llm_metrics()
self.log(f"End", extra_fields={'span_duration': span_total_time})
# Restore the previous span info
if self.spans:
self.span_id, self.span_name, self.parent_span_id = self.spans.pop()
else:
self.span_id = None
self.span_name = None
self.parent_span_id = None
@asynccontextmanager
async def create_span_async(self, span_name: str):
"""Async version of create_span using async context manager"""
parent_span_id = self.span_id
self.span_counter += 1
new_span_id = str(uuid.uuid4())
# Save the current span info
self.spans.append((self.span_id, self.span_name, self.parent_span_id))
# Set the new span info
self.span_id = new_span_id
self.span_name = span_name
self.parent_span_id = parent_span_id
# Track start time for the span
span_start_time = time.time()
# Increment span metrics - using span_name as activity_name for metrics
SPAN_COUNTER.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc()
# Increment concurrent spans gauge
CONCURRENT_SPANS.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).inc()
self.log(f"Start")
try:
yield
finally:
# Calculate total time for this span
span_total_time = time.time() - span_start_time
# Observe span duration
SPAN_DURATION.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).observe(span_total_time)
# Decrement concurrent spans gauge
CONCURRENT_SPANS.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
activity_name=span_name,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).dec()
if self.llm_metrics['call_count'] > 0:
self.log_final_metrics()
self.reset_llm_metrics()
@@ -119,6 +392,9 @@ class BusinessEvent:
'document_version_file_size': self.document_version_file_size,
'chat_session_id': self.chat_session_id,
'interaction_id': self.interaction_id,
'specialist_id': self.specialist_id,
'specialist_type': self.specialist_type,
'specialist_type_version': self.specialist_type_version,
'environment': self.environment,
'message': message,
}
@@ -149,6 +425,9 @@ class BusinessEvent:
'document_version_file_size': self.document_version_file_size,
'chat_session_id': self.chat_session_id,
'interaction_id': self.interaction_id,
'specialist_id': self.specialist_id,
'specialist_type': self.specialist_type,
'specialist_type_version': self.specialist_type_version,
'environment': self.environment,
'llm_metrics_total_tokens': metrics['total_tokens'],
'llm_metrics_prompt_tokens': metrics['prompt_tokens'],
@@ -174,6 +453,9 @@ class BusinessEvent:
'document_version_file_size': self.document_version_file_size,
'chat_session_id': self.chat_session_id,
'interaction_id': self.interaction_id,
'specialist_id': self.specialist_id,
'specialist_type': self.specialist_type,
'specialist_type_version': self.specialist_type_version,
'environment': self.environment,
'llm_metrics_total_tokens': self.llm_metrics['total_tokens'],
'llm_metrics_prompt_tokens': self.llm_metrics['prompt_tokens'],
@@ -203,6 +485,9 @@ class BusinessEvent:
document_version_file_size=entry.pop('document_version_file_size', None),
chat_session_id=entry.pop('chat_session_id', None),
interaction_id=entry.pop('interaction_id', None),
specialist_id=entry.pop('specialist_id', None),
specialist_type=entry.pop('specialist_type', None),
specialist_type_version=entry.pop('specialist_type_version', None),
environment=entry.pop('environment', None),
llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None),
llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None),
@@ -249,6 +534,24 @@ class BusinessEvent:
def __exit__(self, exc_type, exc_val, exc_tb):
trace_total_time = time.time() - self.trace_start_time
# Record trace duration
TRACE_DURATION.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).observe(trace_total_time)
# Decrement concurrent traces gauge
CONCURRENT_TRACES.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).dec()
if self.llm_metrics['call_count'] > 0:
self.log_final_metrics()
self.reset_llm_metrics()
@@ -256,3 +559,37 @@ class BusinessEvent:
self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time})
self._flush_log_buffer()
return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb)
async def __aenter__(self):
self.trace_start_time = time.time()
self.log(f'Starting Trace for {self.event_type}')
return await BusinessEventContext(self).__aenter__()
async def __aexit__(self, exc_type, exc_val, exc_tb):
trace_total_time = time.time() - self.trace_start_time
# Record trace duration
TRACE_DURATION.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).observe(trace_total_time)
# Decrement concurrent traces gauge
CONCURRENT_TRACES.labels(
tenant_id=self.tenant_id_str,
event_type=self.event_type,
specialist_id=self.specialist_id_str,
specialist_type=self.specialist_type_str,
specialist_type_version=self.specialist_type_version_str
).dec()
if self.llm_metrics['call_count'] > 0:
self.log_final_metrics()
self.reset_llm_metrics()
self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time})
self._flush_log_buffer()
return await BusinessEventContext(self).__aexit__(exc_type, exc_val, exc_tb)