- Prometheus metrics go via pushgateway, as different worker processes might have different registries that are not picked up by Prometheus
This commit is contained in:
@@ -6,16 +6,18 @@ from datetime import datetime
|
||||
from typing import Dict, Any, Optional, List
|
||||
from datetime import datetime as dt, timezone as tz
|
||||
import logging
|
||||
from prometheus_client import Counter, Histogram, Gauge, Summary
|
||||
|
||||
from flask import current_app
|
||||
from prometheus_client import Counter, Histogram, Gauge, Summary, push_to_gateway, REGISTRY
|
||||
|
||||
from .business_event_context import BusinessEventContext
|
||||
from common.models.entitlements import BusinessEventLog
|
||||
from common.extensions import db
|
||||
from .celery_utils import current_celery
|
||||
from common.utils.performance_monitoring import EveAIMetrics
|
||||
from common.utils.prometheus_utils import sanitize_label
|
||||
|
||||
# Standard duration buckets for all histograms
|
||||
DURATION_BUCKETS = EveAIMetrics.get_standard_buckets()
|
||||
DURATION_BUCKETS = [0.1, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 240, 360, float('inf')]
|
||||
|
||||
# Prometheus metrics for business events
|
||||
TRACE_COUNTER = Counter(
|
||||
@@ -112,14 +114,24 @@ class BusinessEvent:
|
||||
|
||||
# Prometheus label values must be strings
|
||||
self.tenant_id_str = str(self.tenant_id)
|
||||
self.event_type_str = sanitize_label(self.event_type)
|
||||
self.specialist_id_str = str(self.specialist_id) if self.specialist_id else ""
|
||||
self.specialist_type_str = str(self.specialist_type) if self.specialist_type else ""
|
||||
self.specialist_type_version_str = str(self.specialist_type_version) if self.specialist_type_version else ""
|
||||
self.specialist_type_version_str = sanitize_label(str(self.specialist_type_version)) \
|
||||
if self.specialist_type_version else ""
|
||||
self.span_name_str = ""
|
||||
|
||||
current_app.logger.debug(f"Labels for metrics: "
|
||||
f"tenant_id={self.tenant_id_str}, "
|
||||
f"event_type={self.event_type_str},"
|
||||
f"specialist_id={self.specialist_id_str}, "
|
||||
f"specialist_type={self.specialist_type_str}, " +
|
||||
f"specialist_type_version={self.specialist_type_version_str}")
|
||||
|
||||
# Increment concurrent events gauge when initialized
|
||||
CONCURRENT_TRACES.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
event_type=self.event_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -128,12 +140,14 @@ class BusinessEvent:
|
||||
# Increment trace counter
|
||||
TRACE_COUNTER.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
event_type=self.event_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).inc()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
def update_attribute(self, attribute: str, value: any):
|
||||
if hasattr(self, attribute):
|
||||
setattr(self, attribute, value)
|
||||
@@ -143,9 +157,13 @@ class BusinessEvent:
|
||||
elif attribute == 'specialist_type':
|
||||
self.specialist_type_str = str(value) if value else ""
|
||||
elif attribute == 'specialist_type_version':
|
||||
self.specialist_type_version_str = str(value) if value else ""
|
||||
self.specialist_type_version_str = sanitize_label(str(value)) if value else ""
|
||||
elif attribute == 'tenant_id':
|
||||
self.tenant_id_str = str(value)
|
||||
elif attribute == 'event_type':
|
||||
self.event_type_str = sanitize_label(value)
|
||||
elif attribute == 'span_name':
|
||||
self.span_name_str = sanitize_label(value)
|
||||
else:
|
||||
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attribute}'")
|
||||
|
||||
@@ -158,13 +176,21 @@ class BusinessEvent:
|
||||
self.llm_metrics['interaction_type'] = metrics['interaction_type']
|
||||
|
||||
# Track in Prometheus metrics
|
||||
interaction_type = metrics['interaction_type']
|
||||
interaction_type_str = sanitize_label(metrics['interaction_type']) if metrics['interaction_type'] else ""
|
||||
|
||||
current_app.logger.debug(f"Labels for metrics: "
|
||||
f"tenant_id={self.tenant_id_str}, "
|
||||
f"event_type={self.event_type_str},"
|
||||
f"interaction_type={interaction_type_str}, "
|
||||
f"specialist_id={self.specialist_id_str}, "
|
||||
f"specialist_type={self.specialist_type_str}, "
|
||||
f"specialist_type_version={self.specialist_type_version_str}")
|
||||
|
||||
# Track token usage
|
||||
LLM_TOKENS_COUNTER.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
interaction_type=interaction_type,
|
||||
event_type=self.event_type_str,
|
||||
interaction_type=interaction_type_str,
|
||||
token_type='total',
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
@@ -173,8 +199,8 @@ class BusinessEvent:
|
||||
|
||||
LLM_TOKENS_COUNTER.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
interaction_type=interaction_type,
|
||||
event_type=self.event_type_str,
|
||||
interaction_type=interaction_type_str,
|
||||
token_type='prompt',
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
@@ -183,8 +209,8 @@ class BusinessEvent:
|
||||
|
||||
LLM_TOKENS_COUNTER.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
interaction_type=interaction_type,
|
||||
event_type=self.event_type_str,
|
||||
interaction_type=interaction_type_str,
|
||||
token_type='completion',
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
@@ -194,8 +220,8 @@ class BusinessEvent:
|
||||
# Track duration
|
||||
LLM_DURATION.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
interaction_type=interaction_type,
|
||||
event_type=self.event_type_str,
|
||||
interaction_type=interaction_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -204,13 +230,15 @@ class BusinessEvent:
|
||||
# Track call count
|
||||
LLM_CALLS_COUNTER.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
interaction_type=interaction_type,
|
||||
event_type=self.event_type_str,
|
||||
interaction_type=interaction_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).inc()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
def reset_llm_metrics(self):
|
||||
self.llm_metrics['total_tokens'] = 0
|
||||
self.llm_metrics['prompt_tokens'] = 0
|
||||
@@ -236,16 +264,25 @@ class BusinessEvent:
|
||||
# Set the new span info
|
||||
self.span_id = new_span_id
|
||||
self.span_name = span_name
|
||||
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
||||
self.parent_span_id = parent_span_id
|
||||
|
||||
# Track start time for the span
|
||||
span_start_time = time.time()
|
||||
|
||||
current_app.logger.debug(f"Labels for metrics: "
|
||||
f"tenant_id={self.tenant_id_str}, "
|
||||
f"event_type={self.event_type_str}, "
|
||||
f"activity_name={self.span_name_str}, "
|
||||
f"specialist_id={self.specialist_id_str}, "
|
||||
f"specialist_type={self.specialist_type_str}, "
|
||||
f"specialist_type_version={self.specialist_type_version_str}")
|
||||
|
||||
# Increment span metrics - using span_name as activity_name for metrics
|
||||
SPAN_COUNTER.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -254,13 +291,15 @@ class BusinessEvent:
|
||||
# Increment concurrent spans gauge
|
||||
CONCURRENT_SPANS.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).inc()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
self.log(f"Start")
|
||||
|
||||
try:
|
||||
@@ -272,8 +311,8 @@ class BusinessEvent:
|
||||
# Observe span duration
|
||||
SPAN_DURATION.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -282,13 +321,15 @@ class BusinessEvent:
|
||||
# Decrement concurrent spans gauge
|
||||
CONCURRENT_SPANS.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).dec()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
if self.llm_metrics['call_count'] > 0:
|
||||
self.log_final_metrics()
|
||||
self.reset_llm_metrics()
|
||||
@@ -296,10 +337,12 @@ class BusinessEvent:
|
||||
# Restore the previous span info
|
||||
if self.spans:
|
||||
self.span_id, self.span_name, self.parent_span_id = self.spans.pop()
|
||||
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
||||
else:
|
||||
self.span_id = None
|
||||
self.span_name = None
|
||||
self.parent_span_id = None
|
||||
self.span_name_str = ""
|
||||
|
||||
@asynccontextmanager
|
||||
async def create_span_async(self, span_name: str):
|
||||
@@ -314,16 +357,25 @@ class BusinessEvent:
|
||||
# Set the new span info
|
||||
self.span_id = new_span_id
|
||||
self.span_name = span_name
|
||||
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
||||
self.parent_span_id = parent_span_id
|
||||
|
||||
# Track start time for the span
|
||||
span_start_time = time.time()
|
||||
|
||||
current_app.logger.debug(f"Labels for metrics: "
|
||||
f"tenant_id={self.tenant_id_str}, "
|
||||
f"event_type={self.event_type_str}, "
|
||||
f"activity_name={self.span_name_str}, "
|
||||
f"specialist_id={self.specialist_id_str}, "
|
||||
f"specialist_type={self.specialist_type_str}, "
|
||||
f"specialist_type_version={self.specialist_type_version_str}")
|
||||
|
||||
# Increment span metrics - using span_name as activity_name for metrics
|
||||
SPAN_COUNTER.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -332,13 +384,15 @@ class BusinessEvent:
|
||||
# Increment concurrent spans gauge
|
||||
CONCURRENT_SPANS.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).inc()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
self.log(f"Start")
|
||||
|
||||
try:
|
||||
@@ -350,8 +404,8 @@ class BusinessEvent:
|
||||
# Observe span duration
|
||||
SPAN_DURATION.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -360,13 +414,15 @@ class BusinessEvent:
|
||||
# Decrement concurrent spans gauge
|
||||
CONCURRENT_SPANS.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
activity_name=span_name,
|
||||
event_type=self.event_type_str,
|
||||
activity_name=self.span_name_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).dec()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
if self.llm_metrics['call_count'] > 0:
|
||||
self.log_final_metrics()
|
||||
self.reset_llm_metrics()
|
||||
@@ -374,10 +430,12 @@ class BusinessEvent:
|
||||
# Restore the previous span info
|
||||
if self.spans:
|
||||
self.span_id, self.span_name, self.parent_span_id = self.spans.pop()
|
||||
self.span_name_str = sanitize_label(span_name) if span_name else ""
|
||||
else:
|
||||
self.span_id = None
|
||||
self.span_name = None
|
||||
self.parent_span_id = None
|
||||
self.span_name_str = ""
|
||||
|
||||
def log(self, message: str, level: str = 'info', extra_fields: Dict[str, Any] = None):
|
||||
log_data = {
|
||||
@@ -526,6 +584,17 @@ class BusinessEvent:
|
||||
# Clear the buffer after sending
|
||||
self._log_buffer = []
|
||||
|
||||
def _push_to_gateway(self):
|
||||
# Push metrics to the gateway
|
||||
try:
|
||||
push_to_gateway(
|
||||
current_app.config['PUSH_GATEWAY_URL'],
|
||||
job=current_app.config['COMPONENT_NAME'],
|
||||
registry=REGISTRY
|
||||
)
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Failed to push metrics to Prometheus Push Gateway: {e}")
|
||||
|
||||
def __enter__(self):
|
||||
self.trace_start_time = time.time()
|
||||
self.log(f'Starting Trace for {self.event_type}')
|
||||
@@ -537,7 +606,7 @@ class BusinessEvent:
|
||||
# Record trace duration
|
||||
TRACE_DURATION.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
event_type=self.event_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -546,18 +615,22 @@ class BusinessEvent:
|
||||
# Decrement concurrent traces gauge
|
||||
CONCURRENT_TRACES.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
event_type=self.event_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).dec()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
if self.llm_metrics['call_count'] > 0:
|
||||
self.log_final_metrics()
|
||||
self.reset_llm_metrics()
|
||||
|
||||
self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time})
|
||||
self._flush_log_buffer()
|
||||
|
||||
|
||||
return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
async def __aenter__(self):
|
||||
@@ -571,7 +644,7 @@ class BusinessEvent:
|
||||
# Record trace duration
|
||||
TRACE_DURATION.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
event_type=self.event_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
@@ -580,12 +653,14 @@ class BusinessEvent:
|
||||
# Decrement concurrent traces gauge
|
||||
CONCURRENT_TRACES.labels(
|
||||
tenant_id=self.tenant_id_str,
|
||||
event_type=self.event_type,
|
||||
event_type=self.event_type_str,
|
||||
specialist_id=self.specialist_id_str,
|
||||
specialist_type=self.specialist_type_str,
|
||||
specialist_type_version=self.specialist_type_version_str
|
||||
).dec()
|
||||
|
||||
self._push_to_gateway()
|
||||
|
||||
if self.llm_metrics['call_count'] > 0:
|
||||
self.log_final_metrics()
|
||||
self.reset_llm_metrics()
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
import time
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
from functools import wraps
|
||||
from prometheus_client import Counter, Histogram, Summary, start_http_server, Gauge
|
||||
from flask import current_app, g, request, Flask
|
||||
|
||||
|
||||
class EveAIMetrics:
|
||||
"""
|
||||
Central class for Prometheus metrics infrastructure.
|
||||
This class initializes the Prometheus HTTP server and provides
|
||||
shared functionality for metrics across components.
|
||||
|
||||
Component-specific metrics should be defined in their respective modules.
|
||||
"""
|
||||
|
||||
def __init__(self, app: Flask = None):
|
||||
self.app = app
|
||||
self._metrics_server_started = False
|
||||
if app is not None:
|
||||
self.init_app(app)
|
||||
|
||||
def init_app(self, app: Flask):
|
||||
"""Initialize metrics with Flask app and start Prometheus server"""
|
||||
self.app = app
|
||||
self._start_metrics_server()
|
||||
|
||||
def _start_metrics_server(self):
|
||||
"""Start the Prometheus metrics HTTP server if not already running"""
|
||||
if not self._metrics_server_started:
|
||||
try:
|
||||
metrics_port = self.app.config.get('PROMETHEUS_PORT', 8000)
|
||||
start_http_server(metrics_port)
|
||||
self.app.logger.info(f"Prometheus metrics server started on port {metrics_port}")
|
||||
self._metrics_server_started = True
|
||||
except Exception as e:
|
||||
self.app.logger.error(f"Failed to start metrics server: {e}")
|
||||
|
||||
@staticmethod
|
||||
def get_standard_buckets():
|
||||
"""
|
||||
Return the standard duration buckets for histogram metrics.
|
||||
Components should use these for consistency across the system.
|
||||
"""
|
||||
return [0.1, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 240, 360, float('inf')]
|
||||
|
||||
@staticmethod
|
||||
def sanitize_label_values(labels_dict):
|
||||
"""
|
||||
Convert all label values to strings as required by Prometheus.
|
||||
|
||||
Args:
|
||||
labels_dict: Dictionary of label name to label value
|
||||
|
||||
Returns:
|
||||
Dictionary with all values converted to strings
|
||||
"""
|
||||
return {k: str(v) if v is not None else "" for k, v in labels_dict.items()}
|
||||
11
common/utils/prometheus_utils.py
Normal file
11
common/utils/prometheus_utils.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from flask import current_app
|
||||
from prometheus_client import push_to_gateway
|
||||
|
||||
|
||||
def sanitize_label(value):
|
||||
"""Convert value to valid Prometheus label by removing/replacing invalid chars"""
|
||||
if value is None:
|
||||
return ""
|
||||
# Replace spaces and special chars with underscores
|
||||
import re
|
||||
return re.sub(r'[^a-zA-Z0-9_]', '_', str(value))
|
||||
Reference in New Issue
Block a user