diff --git a/.gitignore b/.gitignore index 80f2042..d150bb9 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,5 @@ scripts/__pycache__/run_eveai_app.cpython-312.pyc /tests/interactive_client/specialist_client.log /.repopackignore /patched_packages/crewai/ +/docker/prometheus/data/ +/docker/grafana/data/ diff --git a/common/eveai_model/eveai_embedding_base.py b/common/eveai_model/eveai_embedding_base.py index e79b350..2746d99 100644 --- a/common/eveai_model/eveai_embedding_base.py +++ b/common/eveai_model/eveai_embedding_base.py @@ -5,7 +5,7 @@ from typing import List class EveAIEmbeddings: @abstractmethod def embed_documents(self, texts: List[str]) -> List[List[float]]: - pass + raise NotImplementedError def embed_query(self, text: str) -> List[float]: return self.embed_documents([text])[0] \ No newline at end of file diff --git a/common/extensions.py b/common/extensions.py index d2ae2d6..c38fa7f 100644 --- a/common/extensions.py +++ b/common/extensions.py @@ -15,6 +15,7 @@ from .langchain.templates.template_manager import TemplateManager from .utils.cache.eveai_cache_manager import EveAICacheManager from .utils.simple_encryption import SimpleEncryption from .utils.minio_utils import MinioClient +from .utils.performance_monitoring import EveAIMetrics # Create extensions @@ -33,6 +34,6 @@ simple_encryption = SimpleEncryption() minio_client = MinioClient() metrics = PrometheusMetrics.for_app_factory() template_manager = TemplateManager() -# Caching classes cache_manager = EveAICacheManager() +eveai_metrics = EveAIMetrics() diff --git a/common/models/entitlements.py b/common/models/entitlements.py index 6b6fdad..3690574 100644 --- a/common/models/entitlements.py +++ b/common/models/entitlements.py @@ -15,6 +15,9 @@ class BusinessEventLog(db.Model): parent_span_id = db.Column(db.String(50)) document_version_id = db.Column(db.Integer) document_version_file_size = db.Column(db.Float) + specialist_id = db.Column(db.Integer) + specialist_type = db.Column(db.String(50)) + specialist_type_version = db.Column(db.String(20)) chat_session_id = db.Column(db.String(50)) interaction_id = db.Column(db.Integer) environment = db.Column(db.String(20)) diff --git a/common/utils/business_event.py b/common/utils/business_event.py index 317493f..1cb8feb 100644 --- a/common/utils/business_event.py +++ b/common/utils/business_event.py @@ -1,16 +1,81 @@ import os import time import uuid -from contextlib import contextmanager +from contextlib import contextmanager, asynccontextmanager from datetime import datetime from typing import Dict, Any, Optional, List from datetime import datetime as dt, timezone as tz import logging +from prometheus_client import Counter, Histogram, Gauge, Summary from .business_event_context import BusinessEventContext from common.models.entitlements import BusinessEventLog from common.extensions import db from .celery_utils import current_celery +from common.utils.performance_monitoring import EveAIMetrics + +# Standard duration buckets for all histograms +DURATION_BUCKETS = EveAIMetrics.get_standard_buckets() + +# Prometheus metrics for business events +TRACE_COUNTER = Counter( + 'eveai_business_events_total', + 'Total number of business events triggered', + ['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'] +) + +TRACE_DURATION = Histogram( + 'eveai_business_events_duration_seconds', + 'Duration of business events in seconds', + ['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'], + buckets=DURATION_BUCKETS +) + +CONCURRENT_TRACES = Gauge( + 'eveai_business_events_concurrent', + 'Number of concurrent business events', + ['tenant_id', 'event_type', 'specialist_id', 'specialist_type', 'specialist_type_version'] +) + +SPAN_COUNTER = Counter( + 'eveai_business_spans_total', + 'Total number of spans within business events', + ['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'] +) + +SPAN_DURATION = Histogram( + 'eveai_business_spans_duration_seconds', + 'Duration of spans within business events in seconds', + ['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'], + buckets=DURATION_BUCKETS +) + +CONCURRENT_SPANS = Gauge( + 'eveai_business_spans_concurrent', + 'Number of concurrent spans within business events', + ['tenant_id', 'event_type', 'activity_name', 'specialist_id', 'specialist_type', 'specialist_type_version'] +) + +# LLM Usage metrics +LLM_TOKENS_COUNTER = Counter( + 'eveai_llm_tokens_total', + 'Total number of tokens used in LLM calls', + ['tenant_id', 'event_type', 'interaction_type', 'token_type', 'specialist_id', 'specialist_type', + 'specialist_type_version'] +) + +LLM_DURATION = Histogram( + 'eveai_llm_duration_seconds', + 'Duration of LLM API calls in seconds', + ['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version'], + buckets=DURATION_BUCKETS +) + +LLM_CALLS_COUNTER = Counter( + 'eveai_llm_calls_total', + 'Total number of LLM API calls', + ['tenant_id', 'event_type', 'interaction_type', 'specialist_id', 'specialist_type', 'specialist_type_version'] +) class BusinessEvent: @@ -29,6 +94,9 @@ class BusinessEvent: self.document_version_file_size = kwargs.get('document_version_file_size') self.chat_session_id = kwargs.get('chat_session_id') self.interaction_id = kwargs.get('interaction_id') + self.specialist_id = kwargs.get('specialist_id') + self.specialist_type = kwargs.get('specialist_type') + self.specialist_type_version = kwargs.get('specialist_type_version') self.environment = os.environ.get("FLASK_ENV", "development") self.span_counter = 0 self.spans = [] @@ -42,9 +110,42 @@ class BusinessEvent: } self._log_buffer = [] + # Prometheus label values must be strings + self.tenant_id_str = str(self.tenant_id) + self.specialist_id_str = str(self.specialist_id) if self.specialist_id else "" + self.specialist_type_str = str(self.specialist_type) if self.specialist_type else "" + self.specialist_type_version_str = str(self.specialist_type_version) if self.specialist_type_version else "" + + # Increment concurrent events gauge when initialized + CONCURRENT_TRACES.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc() + + # Increment trace counter + TRACE_COUNTER.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc() + def update_attribute(self, attribute: str, value: any): if hasattr(self, attribute): setattr(self, attribute, value) + # Update string versions for Prometheus labels if needed + if attribute == 'specialist_id': + self.specialist_id_str = str(value) if value else "" + elif attribute == 'specialist_type': + self.specialist_type_str = str(value) if value else "" + elif attribute == 'specialist_type_version': + self.specialist_type_version_str = str(value) if value else "" + elif attribute == 'tenant_id': + self.tenant_id_str = str(value) else: raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attribute}'") @@ -56,6 +157,60 @@ class BusinessEvent: self.llm_metrics['call_count'] += 1 self.llm_metrics['interaction_type'] = metrics['interaction_type'] + # Track in Prometheus metrics + interaction_type = metrics['interaction_type'] + + # Track token usage + LLM_TOKENS_COUNTER.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + interaction_type=interaction_type, + token_type='total', + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc(metrics['total_tokens']) + + LLM_TOKENS_COUNTER.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + interaction_type=interaction_type, + token_type='prompt', + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc(metrics['prompt_tokens']) + + LLM_TOKENS_COUNTER.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + interaction_type=interaction_type, + token_type='completion', + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc(metrics['completion_tokens']) + + # Track duration + LLM_DURATION.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + interaction_type=interaction_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).observe(metrics['time_elapsed']) + + # Track call count + LLM_CALLS_COUNTER.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + interaction_type=interaction_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc() + def reset_llm_metrics(self): self.llm_metrics['total_tokens'] = 0 self.llm_metrics['prompt_tokens'] = 0 @@ -86,6 +241,26 @@ class BusinessEvent: # Track start time for the span span_start_time = time.time() + # Increment span metrics - using span_name as activity_name for metrics + SPAN_COUNTER.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc() + + # Increment concurrent spans gauge + CONCURRENT_SPANS.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc() + self.log(f"Start") try: @@ -94,6 +269,104 @@ class BusinessEvent: # Calculate total time for this span span_total_time = time.time() - span_start_time + # Observe span duration + SPAN_DURATION.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).observe(span_total_time) + + # Decrement concurrent spans gauge + CONCURRENT_SPANS.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).dec() + + if self.llm_metrics['call_count'] > 0: + self.log_final_metrics() + self.reset_llm_metrics() + self.log(f"End", extra_fields={'span_duration': span_total_time}) + # Restore the previous span info + if self.spans: + self.span_id, self.span_name, self.parent_span_id = self.spans.pop() + else: + self.span_id = None + self.span_name = None + self.parent_span_id = None + + @asynccontextmanager + async def create_span_async(self, span_name: str): + """Async version of create_span using async context manager""" + parent_span_id = self.span_id + self.span_counter += 1 + new_span_id = str(uuid.uuid4()) + + # Save the current span info + self.spans.append((self.span_id, self.span_name, self.parent_span_id)) + + # Set the new span info + self.span_id = new_span_id + self.span_name = span_name + self.parent_span_id = parent_span_id + + # Track start time for the span + span_start_time = time.time() + + # Increment span metrics - using span_name as activity_name for metrics + SPAN_COUNTER.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc() + + # Increment concurrent spans gauge + CONCURRENT_SPANS.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).inc() + + self.log(f"Start") + + try: + yield + finally: + # Calculate total time for this span + span_total_time = time.time() - span_start_time + + # Observe span duration + SPAN_DURATION.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).observe(span_total_time) + + # Decrement concurrent spans gauge + CONCURRENT_SPANS.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + activity_name=span_name, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).dec() + if self.llm_metrics['call_count'] > 0: self.log_final_metrics() self.reset_llm_metrics() @@ -119,6 +392,9 @@ class BusinessEvent: 'document_version_file_size': self.document_version_file_size, 'chat_session_id': self.chat_session_id, 'interaction_id': self.interaction_id, + 'specialist_id': self.specialist_id, + 'specialist_type': self.specialist_type, + 'specialist_type_version': self.specialist_type_version, 'environment': self.environment, 'message': message, } @@ -149,6 +425,9 @@ class BusinessEvent: 'document_version_file_size': self.document_version_file_size, 'chat_session_id': self.chat_session_id, 'interaction_id': self.interaction_id, + 'specialist_id': self.specialist_id, + 'specialist_type': self.specialist_type, + 'specialist_type_version': self.specialist_type_version, 'environment': self.environment, 'llm_metrics_total_tokens': metrics['total_tokens'], 'llm_metrics_prompt_tokens': metrics['prompt_tokens'], @@ -174,6 +453,9 @@ class BusinessEvent: 'document_version_file_size': self.document_version_file_size, 'chat_session_id': self.chat_session_id, 'interaction_id': self.interaction_id, + 'specialist_id': self.specialist_id, + 'specialist_type': self.specialist_type, + 'specialist_type_version': self.specialist_type_version, 'environment': self.environment, 'llm_metrics_total_tokens': self.llm_metrics['total_tokens'], 'llm_metrics_prompt_tokens': self.llm_metrics['prompt_tokens'], @@ -203,6 +485,9 @@ class BusinessEvent: document_version_file_size=entry.pop('document_version_file_size', None), chat_session_id=entry.pop('chat_session_id', None), interaction_id=entry.pop('interaction_id', None), + specialist_id=entry.pop('specialist_id', None), + specialist_type=entry.pop('specialist_type', None), + specialist_type_version=entry.pop('specialist_type_version', None), environment=entry.pop('environment', None), llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None), llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None), @@ -249,6 +534,24 @@ class BusinessEvent: def __exit__(self, exc_type, exc_val, exc_tb): trace_total_time = time.time() - self.trace_start_time + # Record trace duration + TRACE_DURATION.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).observe(trace_total_time) + + # Decrement concurrent traces gauge + CONCURRENT_TRACES.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).dec() + if self.llm_metrics['call_count'] > 0: self.log_final_metrics() self.reset_llm_metrics() @@ -256,3 +559,37 @@ class BusinessEvent: self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time}) self._flush_log_buffer() return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb) + + async def __aenter__(self): + self.trace_start_time = time.time() + self.log(f'Starting Trace for {self.event_type}') + return await BusinessEventContext(self).__aenter__() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + trace_total_time = time.time() - self.trace_start_time + + # Record trace duration + TRACE_DURATION.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).observe(trace_total_time) + + # Decrement concurrent traces gauge + CONCURRENT_TRACES.labels( + tenant_id=self.tenant_id_str, + event_type=self.event_type, + specialist_id=self.specialist_id_str, + specialist_type=self.specialist_type_str, + specialist_type_version=self.specialist_type_version_str + ).dec() + + if self.llm_metrics['call_count'] > 0: + self.log_final_metrics() + self.reset_llm_metrics() + + self.log(f'Ending Trace for {self.event_type}', extra_fields={'trace_duration': trace_total_time}) + self._flush_log_buffer() + return await BusinessEventContext(self).__aexit__(exc_type, exc_val, exc_tb) \ No newline at end of file diff --git a/common/utils/business_event_context.py b/common/utils/business_event_context.py index 42a85e2..f413fc8 100644 --- a/common/utils/business_event_context.py +++ b/common/utils/business_event_context.py @@ -1,9 +1,22 @@ from werkzeug.local import LocalProxy, LocalStack +import asyncio +from contextvars import ContextVar +import contextvars +# Keep existing stack for backward compatibility _business_event_stack = LocalStack() +# Add contextvar for async support +_business_event_contextvar = ContextVar('business_event', default=None) + def _get_current_event(): + # Try contextvar first (for async) + event = _business_event_contextvar.get() + if event is not None: + return event + + # Fall back to the stack-based approach (for sync) top = _business_event_stack.top if top is None: raise RuntimeError("No business event context found. Are you sure you're in a business event?") @@ -16,10 +29,24 @@ current_event = LocalProxy(_get_current_event) class BusinessEventContext: def __init__(self, event): self.event = event + self._token = None # For storing contextvar token def __enter__(self): _business_event_stack.push(self.event) + self._token = _business_event_contextvar.set(self.event) return self.event def __exit__(self, exc_type, exc_val, exc_tb): _business_event_stack.pop() + if self._token is not None: + _business_event_contextvar.reset(self._token) + + async def __aenter__(self): + _business_event_stack.push(self.event) + self._token = _business_event_contextvar.set(self.event) + return self.event + + async def __aexit__(self, exc_type, exc_val, exc_tb): + _business_event_stack.pop() + if self._token is not None: + _business_event_contextvar.reset(self._token) \ No newline at end of file diff --git a/common/utils/cache/base.py b/common/utils/cache/base.py index 036a3d9..4e7cd01 100644 --- a/common/utils/cache/base.py +++ b/common/utils/cache/base.py @@ -59,7 +59,7 @@ class CacheHandler(Generic[T]): Returns: A serializable format of the instance. """ - pass + raise NotImplementedError @abstractmethod def _from_cache_data(self, data: Any, **kwargs) -> T: @@ -73,7 +73,7 @@ class CacheHandler(Generic[T]): Returns: The data in its usable format. """ - pass + raise NotImplementedError @abstractmethod def _should_cache(self, value: T) -> bool: @@ -86,7 +86,7 @@ class CacheHandler(Generic[T]): Returns: True if the value should be cached, False otherwise. """ - pass + raise NotImplementedError def configure_keys(self, *components: str): """ diff --git a/common/utils/performance_monitoring.py b/common/utils/performance_monitoring.py new file mode 100644 index 0000000..3727000 --- /dev/null +++ b/common/utils/performance_monitoring.py @@ -0,0 +1,59 @@ +import time +import threading +from contextlib import contextmanager +from functools import wraps +from prometheus_client import Counter, Histogram, Summary, start_http_server, Gauge +from flask import current_app, g, request, Flask + + +class EveAIMetrics: + """ + Central class for Prometheus metrics infrastructure. + This class initializes the Prometheus HTTP server and provides + shared functionality for metrics across components. + + Component-specific metrics should be defined in their respective modules. + """ + + def __init__(self, app: Flask = None): + self.app = app + self._metrics_server_started = False + if app is not None: + self.init_app(app) + + def init_app(self, app: Flask): + """Initialize metrics with Flask app and start Prometheus server""" + self.app = app + self._start_metrics_server() + + def _start_metrics_server(self): + """Start the Prometheus metrics HTTP server if not already running""" + if not self._metrics_server_started: + try: + metrics_port = self.app.config.get('PROMETHEUS_PORT', 8000) + start_http_server(metrics_port) + self.app.logger.info(f"Prometheus metrics server started on port {metrics_port}") + self._metrics_server_started = True + except Exception as e: + self.app.logger.error(f"Failed to start metrics server: {e}") + + @staticmethod + def get_standard_buckets(): + """ + Return the standard duration buckets for histogram metrics. + Components should use these for consistency across the system. + """ + return [0.1, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 240, 360, float('inf')] + + @staticmethod + def sanitize_label_values(labels_dict): + """ + Convert all label values to strings as required by Prometheus. + + Args: + labels_dict: Dictionary of label name to label value + + Returns: + Dictionary with all values converted to strings + """ + return {k: str(v) if v is not None else "" for k, v in labels_dict.items()} diff --git a/docker/compose_dev.yaml b/docker/compose_dev.yaml index edf392b..6537859 100644 --- a/docker/compose_dev.yaml +++ b/docker/compose_dev.yaml @@ -80,6 +80,8 @@ services: - linux/arm64 ports: - 5001:5001 + expose: + - 8000 environment: <<: *common-variables COMPONENT_NAME: eveai_app @@ -115,6 +117,8 @@ services: platforms: - linux/amd64 - linux/arm64 + expose: + - 8000 environment: <<: *common-variables COMPONENT_NAME: eveai_workers @@ -177,6 +181,8 @@ services: platforms: - linux/amd64 - linux/arm64 + expose: + - 8000 environment: <<: *common-variables COMPONENT_NAME: eveai_chat_workers @@ -205,6 +211,8 @@ services: - linux/arm64 ports: - 5003:5003 + expose: + - 8000 environment: <<: *common-variables COMPONENT_NAME: eveai_api @@ -266,6 +274,8 @@ services: platforms: - linux/amd64 - linux/arm64 + expose: + - 8000 environment: <<: *common-variables COMPONENT_NAME: eveai_entitlements @@ -361,6 +371,42 @@ services: networks: - eveai-network + prometheus: + image: prom/prometheus:latest + container_name: prometheus + ports: + - "9090:9090" + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + - ./prometheus/data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--web.enable-lifecycle' + restart: unless-stopped + networks: + - eveai-network + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "3000:3000" + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning + - ./grafana/data:/var/lib/grafana + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_USERS_ALLOW_SIGN_UP=false + restart: unless-stopped + depends_on: + - prometheus + networks: + - eveai-network + networks: eveai-network: driver: bridge diff --git a/docker/grafana/provisioning/dashboards/eveai-system-dashboard.json b/docker/grafana/provisioning/dashboards/eveai-system-dashboard.json new file mode 100644 index 0000000..d48aed7 --- /dev/null +++ b/docker/grafana/provisioning/dashboards/eveai-system-dashboard.json @@ -0,0 +1,627 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "color": { + "fixedColor": "#76599a", + "mode": "fixed" + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "sum(increase(eveai_business_events_total[$__interval])) by (event_type)", + "refId": "A" + } + ], + "title": "Business Events by Type", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 5 + }, + { + "color": "red", + "value": 10 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "9.5.3", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "sum(eveai_business_events_concurrent)", + "refId": "A" + } + ], + "title": "Concurrent Business Events", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + } + }, + "mappings": [] + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "options": { + "legend": { + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "pieType": "pie", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "9.5.3", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "sum(increase(eveai_business_events_total[$__range])) by (specialist_type)", + "refId": "A" + } + ], + "title": "Events by Specialist Type", + "type": "piechart" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "histogram_quantile(0.95, sum(rate(eveai_business_events_duration_seconds_bucket[$__interval])) by (le, event_type))", + "refId": "A" + } + ], + "title": "Business Event Duration (95th percentile)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 16 + }, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "sum(increase(eveai_business_spans_total[$__interval])) by (activity_name)", + "refId": "A" + } + ], + "title": "Activity Execution Count", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 60, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 0, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 24 + }, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "sum(increase(eveai_llm_tokens_total[$__interval])) by (token_type)", + "refId": "A" + } + ], + "title": "LLM Token Usage by Type", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 24 + }, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "histogram_quantile(0.95, sum(rate(eveai_llm_duration_seconds_bucket[$__interval])) by (le, interaction_type))", + "refId": "A" + } + ], + "title": "LLM Duration by Interaction Type (95th percentile)", + "type": "timeseries" + } + ], + "refresh": "15m", + "schemaVersion": 38, + "style": "dark", + "tags": ["eveai", "system"], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "Prometheus", + "value": "PBFA97CFB590B2093" + }, + "hide": 0, + "includeAll": false, + "label": "Datasource", + "multi": false, + "name": "datasource", + "options": [], + "query": "prometheus", + "queryValue": "", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + } + ] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "timezone": "", + "title": "EveAI System Dashboard", + "uid": "eveai-system-dashboard", + "version": 1, + "weekStart": "" +} \ No newline at end of file diff --git a/docker/grafana/provisioning/datasources/prometheus.yml b/docker/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 0000000..86fd346 --- /dev/null +++ b/docker/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,8 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true diff --git a/docker/prometheus/prometheus.yml b/docker/prometheus/prometheus.yml new file mode 100644 index 0000000..6c3860d --- /dev/null +++ b/docker/prometheus/prometheus.yml @@ -0,0 +1,34 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + scrape_timeout: 10s + +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + - job_name: 'eveai_app' + static_configs: + - targets: ['eveai_app:8000'] + scrape_interval: 10s + + - job_name: 'eveai_workers' + static_configs: + - targets: ['eveai_workers:8000'] + scrape_interval: 10s + + - job_name: 'eveai_chat_workers' + static_configs: + - targets: ['eveai_chat_workers:8000'] + scrape_interval: 10s + + - job_name: 'eveai_api' + static_configs: + - targets: ['eveai_api:8000'] + scrape_interval: 10s + + - job_name: 'eveai_entitlements' + static_configs: + - targets: ['eveai_entitlements:8000'] + scrape_interval: 10s diff --git a/eveai_chat_workers/__init__.py b/eveai_chat_workers/__init__.py index 35ea409..31356a9 100644 --- a/eveai_chat_workers/__init__.py +++ b/eveai_chat_workers/__init__.py @@ -5,7 +5,7 @@ import os from common.langchain.templates.template_manager import TemplateManager from common.utils.celery_utils import make_celery, init_celery -from common.extensions import db, template_manager, cache_manager +from common.extensions import db, template_manager, cache_manager, eveai_metrics from config.logging_config import LOGGING from config.config import get_config @@ -45,6 +45,7 @@ def register_extensions(app): db.init_app(app) cache_manager.init_app(app) template_manager.init_app(app) + eveai_metrics.init_app(app) def register_cache_handlers(app): diff --git a/eveai_chat_workers/retrievers/base.py b/eveai_chat_workers/retrievers/base.py index bbf32e1..9b6bede 100644 --- a/eveai_chat_workers/retrievers/base.py +++ b/eveai_chat_workers/retrievers/base.py @@ -27,7 +27,7 @@ class BaseRetriever(ABC): @abstractmethod def type(self) -> str: """The type of the retriever""" - pass + raise NotImplementedError def _setup_tuning_logger(self): try: @@ -86,4 +86,4 @@ class BaseRetriever(ABC): Returns: List[Dict[str, Any]]: List of retrieved documents/content """ - pass + raise NotImplementedError diff --git a/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py index d5df0b8..87bfe82 100644 --- a/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py +++ b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py @@ -123,10 +123,10 @@ class RAGFlow(EveAICrewAIFlow[RAGFlowState]): return "" @listen(process_inputs) - def execute_rag(self): + async def execute_rag(self): inputs = self.state.input.model_dump() try: - crew_output = self.rag_crew.kickoff(inputs=inputs) + crew_output = await self.rag_crew.kickoff_async(inputs=inputs) self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump()) output_pydantic = crew_output.pydantic if not output_pydantic: @@ -139,13 +139,13 @@ class RAGFlow(EveAICrewAIFlow[RAGFlowState]): self.exception_raised = True raise e - def kickoff(self, inputs=None): - with current_event.create_span("RAG Specialist Execution"): + async def execute_async(self, inputs=None): + async with current_event.create_span_async("RAG Specialist Execution"): self.specialist_executor.log_tuning("Inputs retrieved", inputs) self.state.input = RAGSpecialistInput.model_validate(inputs) self.specialist.update_progress("EveAI Flow Start", {"name": "RAG"}) try: - result = super().kickoff() + result = await super().kickoff_async() except Exception as e: current_app.logger.error(f"Error kicking of Flow: {str(e)}") diff --git a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py index 28e6371..2b0d70e 100644 --- a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py +++ b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py @@ -1,3 +1,4 @@ +import asyncio import json from os import wait from typing import Optional, List @@ -136,8 +137,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor): "nr_of_questions": self.specialist.configuration.get('nr_of_questions', ''), "identification": arguments.identification, } - # crew_results = self.rag_crew.kickoff(inputs=flow_inputs) - # current_app.logger.debug(f"Test Crew Output received: {crew_results}") + flow_results = self.flow.kickoff(inputs=flow_inputs) flow_state = self.flow.state @@ -214,10 +214,10 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): return "" @listen(process_inputs) - def execute_rag(self): + async def execute_rag(self): inputs = self.state.input.model_dump() try: - crew_output = self.rag_crew.kickoff(inputs=inputs) + crew_output = await self.rag_crew.kickoff_async(inputs=inputs) self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump()) output_pydantic = crew_output.pydantic if not output_pydantic: @@ -231,10 +231,11 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): raise e @listen(process_inputs) - def execute_spin(self): + async def execute_spin(self): inputs = self.state.input.model_dump() try: - crew_output = self.spin_crew.kickoff(inputs=inputs) + crew_output = await self.spin_crew.kickoff_async(inputs=inputs) + current_app.logger.info(f"SPIN Crew Executed, output: {crew_output.model_dump()}") self.specialist_executor.log_tuning("Spin Crew Output", crew_output.model_dump()) output_pydantic = crew_output.pydantic if not output_pydantic: @@ -248,10 +249,10 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): raise e @listen(process_inputs) - def execute_identification(self): + async def execute_identification(self): inputs = self.state.input.model_dump() try: - crew_output = self.identification_crew.kickoff(inputs=inputs) + crew_output = await self.identification_crew.kickoff_async(inputs=inputs) self.specialist_executor.log_tuning("Identification Crew Output", crew_output.model_dump()) output_pydantic = crew_output.pydantic if not output_pydantic: @@ -265,7 +266,7 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): raise e @listen(and_(execute_rag, execute_spin, execute_identification)) - def consolidate(self): + async def consolidate(self): inputs = self.state.input.model_dump() if self.state.rag_output: inputs["prepared_answers"] = self.state.rag_output.answer @@ -277,7 +278,7 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): current_app.logger.debug(f"Additional Questions: {additional_questions}") inputs["additional_questions"] = additional_questions try: - crew_output = self.rag_consolidation_crew.kickoff(inputs=inputs) + crew_output = await self.rag_consolidation_crew.kickoff_async(inputs=inputs) self.specialist_executor.log_tuning("RAG Consolidation Crew Output", crew_output.model_dump()) output_pydantic = crew_output.pydantic if not output_pydantic: @@ -290,13 +291,16 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): self.exception_raised = True raise e - def kickoff(self, inputs=None): - with current_event.create_span("SPIN Specialist Execution"): + async def execute_async(self, inputs=None): + current_app.logger.debug(f"Async kickoff {self.name}") + async with current_event.create_span_async("SPIN Specialist Execution"): self.specialist_executor.log_tuning("Inputs retrieved", inputs) self.state.input = SPINSpecialistInput.model_validate(inputs) self.specialist.update_progress("EveAI Flow Start", {"name": "SPIN"}) try: - result = super().kickoff() + current_app.logger.debug(f"Async super kickoff {self.name}") + result = await super().kickoff_async() + current_app.logger.debug(f"Async super kickoff {self.name} ended") except Exception as e: current_app.logger.error(f"Error kicking of Flow: {str(e)}") diff --git a/eveai_chat_workers/specialists/base_specialist.py b/eveai_chat_workers/specialists/base_specialist.py index c046230..cb252a0 100644 --- a/eveai_chat_workers/specialists/base_specialist.py +++ b/eveai_chat_workers/specialists/base_specialist.py @@ -28,13 +28,13 @@ class BaseSpecialistExecutor(ABC): @abstractmethod def type(self) -> str: """The type of the specialist""" - pass + raise NotImplementedError @property @abstractmethod def type_version(self) -> str: """The type version of the specialist""" - pass + raise NotImplementedError def _initialize_retrievers(self) -> List[BaseRetriever]: """Initialize all retrievers associated with this specialist""" @@ -96,7 +96,7 @@ class BaseSpecialistExecutor(ABC): @abstractmethod def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult: """Execute the specialist's logic""" - pass + raise NotImplementedError def get_specialist_class(specialist_type: str, type_version: str): diff --git a/eveai_chat_workers/specialists/crewai_base_classes.py b/eveai_chat_workers/specialists/crewai_base_classes.py index 6f997b8..16750f5 100644 --- a/eveai_chat_workers/specialists/crewai_base_classes.py +++ b/eveai_chat_workers/specialists/crewai_base_classes.py @@ -1,5 +1,7 @@ +import asyncio import json import time +from abc import abstractmethod from crewai import Agent, Task, Crew, Flow from crewai.agents.parser import AgentAction, AgentFinish @@ -21,8 +23,6 @@ class EveAICrewAIAgent(Agent): super().__init__(**kwargs) self.specialist = specialist self.name = name - self.specialist.log_tuning("Initializing EveAICrewAIAgent", {"name": name}) - self.specialist.update_progress("EveAI Agent Initialisation", {"name": self.name}) def execute_task( self, @@ -40,28 +40,30 @@ class EveAICrewAIAgent(Agent): Returns: Output of the agent """ - with current_event.create_span(f"Task Execution {task.name} by {self.name}"): - self.specialist.log_tuning("EveAI Agent Task Start", - {"name": self.name, - 'task': task.name, - }) - self.specialist.update_progress("EveAI Agent Task Start", - {"name": self.name, - 'task': task.name, - }) + current_app.logger.debug(f"Task Execution {task.name} by {self.name}") + # with current_event.create_span(f"Task Execution {task.name} by {self.name}"): + self.specialist.log_tuning("EveAI Agent Task Start", + {"name": self.name, + 'task': task.name, + }) + self.specialist.update_progress("EveAI Agent Task Start", + {"name": self.name, + 'task': task.name, + }) - result = super().execute_task(task, context, tools) + result = super().execute_task(task, context, tools) - self.specialist.log_tuning("EveAI Agent Task Complete", - {"name": self.name, - 'task': task.name, - 'result': result, - }) - self.specialist.update_progress("EveAI Agent Task Complete", - {"name": self.name, - 'task': task.name, - }) + self.specialist.log_tuning("EveAI Agent Task Complete", + {"name": self.name, + 'task': task.name, + 'result': result, + }) + self.specialist.update_progress("EveAI Agent Task Complete", + {"name": self.name, + 'task': task.name, + }) + current_app.logger.debug(f"Task Execution Ended {task.name} by {self.name}") return result @@ -76,8 +78,6 @@ class EveAICrewAITask(Task): # current_app.logger.debug(f"Task pydantic class for {name}: {"class", self.output_pydantic}") self.specialist = specialist self.name = name - self.specialist.log_tuning("Initializing EveAICrewAITask", {"name": name}) - self.specialist.update_progress("EveAI Task Initialisation", {"name": name}) class EveAICrewAICrew(Crew): @@ -89,12 +89,10 @@ class EveAICrewAICrew(Crew): super().__init__(**kwargs) self.specialist = specialist self.name = name - self.specialist.log_tuning("Initializing EveAICrewAICrew", {"name": self.name}) - self.specialist.update_progress("EveAI Crew Initialisation", {"name": self.name}) def kickoff( - self, - inputs: Optional[Dict[str, Any]] = None, + self, + inputs: Optional[Dict[str, Any]] = None, ) -> CrewOutput: with current_event.create_span(f"Crew {self.name} kickoff"): start_time = time.time() @@ -111,6 +109,26 @@ class EveAICrewAICrew(Crew): return results + async def kickoff_async( + self, + inputs: Optional[Dict[str, Any]] = None, + ) -> CrewOutput: + async with current_event.create_span_async(f"Crew {self.name} kickoff"): + start_time = time.time() + results = await super().kickoff_async(inputs) + end_time = time.time() + metrics = { + "total_tokens": self.usage_metrics.total_tokens, + "prompt_tokens": self.usage_metrics.prompt_tokens, + "completion_tokens": self.usage_metrics.completion_tokens, + "time_elapsed": end_time - start_time, + "interaction_type": "Crew Execution" + } + current_event.log_llm_metrics(metrics) + + return results + + class EveAICrewAIFlow(Flow): specialist: Any = Field(default=None, exclude=True) name: str = Field(default=None, exclude=True) @@ -123,10 +141,14 @@ class EveAICrewAIFlow(Flow): self.specialist.log_tuning("Initializing EveAICrewAIFlow", {"name": self.name}) self.specialist.update_progress("EveAI Flow Initialisation", {"name": self.name}) + def kickoff(self, inputs=None): + result = asyncio.run(self.execute_async(inputs=inputs)) + + @abstractmethod + async def execute_async(self, inputs=None): + raise NotImplementedError + class EveAIFlowState(BaseModel): """Base class for all EveAI flow states""" pass - - - diff --git a/eveai_chat_workers/specialists/crewai_base_specialist.py b/eveai_chat_workers/specialists/crewai_base_specialist.py index c4aaa10..a1bca7d 100644 --- a/eveai_chat_workers/specialists/crewai_base_specialist.py +++ b/eveai_chat_workers/specialists/crewai_base_specialist.py @@ -90,6 +90,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): def _config_task_agents(self): """Configure the task agents by adding task-agent combinations. Use _add_task_agent() """ + raise NotImplementedError @property def task_agents(self) -> Dict[str, str]: @@ -103,6 +104,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): @abstractmethod def _config_pydantic_outputs(self): """Configure the task pydantic outputs by adding task-output combinations. Use _add_pydantic_output()""" + raise NotImplementedError @property def task_pydantic_outputs(self): @@ -203,6 +205,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): def _instantiate_specialist(self): """Instantiate a crew (or flow) to set up the complete specialist, using the assets (agents, tasks, tools). The assets can be retrieved using their type name in lower case, e.g. rag_agent""" + raise NotImplementedError def _detail_question(self, language: str, question: str) -> str: """Detail question based on conversation history""" @@ -211,9 +214,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): # Get LLM and template llm = self.model_variables.get_llm(temperature=0.3) template = cache_manager.prompts_config_cache.get_config('history').get('content', '') - current_app.logger.debug(f"History Template: {template}") language_template = create_language_template(template, language) - current_app.logger.debug(f"History Language Template: {template}") # Create prompt history_prompt = ChatPromptTemplate.from_template(language_template) @@ -226,7 +227,6 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): ) # Execute chain - current_app.logger.debug(f"Formatted History: {self.formatted_history}") detailed_question = chain.invoke({ "history": self.formatted_history, "question": question @@ -254,7 +254,6 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): "num_retrievers": len(self.retrievers), "all arguments": arguments.model_dump(), }) - current_app.logger.debug(f"Retrieving context from arguments: {arguments}") original_query = arguments.query detailed_query = self._detail_question(arguments.language, original_query) @@ -289,7 +288,6 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): retriever_args = RetrieverArguments(**current_retriever_args) # Each retriever gets its own specific arguments - current_app.logger.debug(f"Retrieving context {retriever_id} with arguments {retriever_args}") retriever_result = retriever.retrieve(retriever_args) all_context.extend(retriever_result) @@ -326,7 +324,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): @abstractmethod def execute(self, arguments: SpecialistArguments, formatted_context: str, citations: List[int]) -> SpecialistResult: - pass + raise NotImplementedError def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult: # Detail the incoming query diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index 0ce79d4..4b8edb9 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -226,26 +226,39 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict task_id = self.request.id ept = ExecutionProgressTracker() ept.send_update(task_id, "EveAI Specialist Started", {}) - with BusinessEvent("Execute Specialist", tenant_id=tenant_id, chat_session_id=session_id) as event: + + # Prepare context + try: + # Retrieve the tenant + tenant = Tenant.query.get(tenant_id) + if not tenant: + raise Exception(f'Tenant {tenant_id} not found.') + + # Switch to correct database schema + Database(str(tenant_id)).switch_schema() + + # Get specialist from database + specialist = Specialist.query.get_or_404(specialist_id) + except Exception as e: + ept.send_update(task_id, "EveAI Specialist Error", {'Error': str(e)}) + current_app.logger.error(f'execute_specialist: Error executing specialist: {e}') + raise + + with BusinessEvent("Execute Specialist", + tenant_id=tenant_id, + chat_session_id=session_id, + specialist_id=specialist_id, + specialist_type=specialist.type, + specialist_type_version=specialist.type_version) as event: current_app.logger.info( f'execute_specialist: Processing request for tenant {tenant_id} using specialist {specialist_id}') try: - # Retrieve the tenant - tenant = Tenant.query.get(tenant_id) - if not tenant: - raise Exception(f'Tenant {tenant_id} not found.') - - # Switch to correct database schema - Database(tenant_id).switch_schema() - # Ensure we have a session cached_session = cache_manager.chat_session_cache.get_cached_session( session_id, create_params={'timezone': user_timezone} ) - # Get specialist from database - specialist = Specialist.query.get_or_404(specialist_id) # Prepare complete arguments try: diff --git a/eveai_entitlements/tasks.py b/eveai_entitlements/tasks.py index 79a8110..8950976 100644 --- a/eveai_entitlements/tasks.py +++ b/eveai_entitlements/tasks.py @@ -88,6 +88,9 @@ def persist_business_events(log_entries): parent_span_id=entry.pop('parent_span_id', None), document_version_id=entry.pop('document_version_id', None), document_version_file_size=entry.pop('document_version_file_size', None), + specialist_id=entry.pop('specialist_id', None), + specialist_type=entry.pop('specialist_type', None), + specialist_type_version=entry.pop('specialist_type_version', None), chat_session_id=entry.pop('chat_session_id', None), interaction_id=entry.pop('interaction_id', None), environment=entry.pop('environment', None), diff --git a/eveai_workers/processors/base_processor.py b/eveai_workers/processors/base_processor.py index b072841..fe8777e 100644 --- a/eveai_workers/processors/base_processor.py +++ b/eveai_workers/processors/base_processor.py @@ -40,7 +40,7 @@ class BaseProcessor(ABC): @abstractmethod def process(self): - pass + raise NotImplementedError @property def configuration(self): diff --git a/migrations/public/versions/03a1e7633c01_adding_specialist_information_to_.py b/migrations/public/versions/03a1e7633c01_adding_specialist_information_to_.py new file mode 100644 index 0000000..b60bf79 --- /dev/null +++ b/migrations/public/versions/03a1e7633c01_adding_specialist_information_to_.py @@ -0,0 +1,40 @@ +"""Adding specialist information to BusinessEventLog + +Revision ID: 03a1e7633c01 +Revises: 4d2842d9c1d0 +Create Date: 2025-03-24 14:28:57.200173 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '03a1e7633c01' +down_revision = '4d2842d9c1d0' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.add_column(sa.Column('specialist_id', sa.Integer(), nullable=True)) + batch_op.add_column(sa.Column('specialist_type', sa.String(length=50), nullable=True)) + batch_op.add_column(sa.Column('specialist_type_version', sa.String(length=20), nullable=True)) + batch_op.drop_constraint('business_event_log_license_usage_id_fkey', type_='foreignkey') + batch_op.create_foreign_key(None, 'license_usage', ['license_usage_id'], ['id'], referent_schema='public') + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.drop_constraint(None, type_='foreignkey') + batch_op.create_foreign_key('business_event_log_license_usage_id_fkey', 'license_usage', ['license_usage_id'], ['id']) + batch_op.drop_column('specialist_type_version') + batch_op.drop_column('specialist_type') + batch_op.drop_column('specialist_id') + + # ### end Alembic commands ### diff --git a/requirements.txt b/requirements.txt index 5e29037..3f1708e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -70,19 +70,17 @@ pillow~=10.4.0 pdfplumber~=0.11.4 PyPDF2~=3.0.1 flask-restx~=1.3.0 -prometheus-flask-exporter~=0.23.1 flask-healthz~=1.0.1 langsmith~=0.1.121 anthropic~=0.34.2 -prometheus-client~=0.20.0 +prometheus-client~=0.21.1 +prometheus-flask-exporter~=0.23.2 flower~=2.0.1 psutil~=6.0.0 celery-redbeat~=2.2.0 WTForms-SQLAlchemy~=0.4.1 packaging~=24.1 typing_extensions~=4.12.2 -prometheus_flask_exporter~=0.23.1 -prometheus_client~=0.20.0 babel~=2.16.0 dogpile.cache~=1.3.3 python-docx~=1.1.2 @@ -90,4 +88,5 @@ crewai~=0.108.0 sseclient~=0.0.27 termcolor~=2.5.0 mistral-common~=1.5.3 -mistralai~=1.5.0 \ No newline at end of file +mistralai~=1.5.0 +contextvars~=2.4 \ No newline at end of file diff --git a/scripts/reload-prometheus.sh b/scripts/reload-prometheus.sh new file mode 100755 index 0000000..daab010 --- /dev/null +++ b/scripts/reload-prometheus.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +# Check if the service name is provided +if [ -z "$1" ]; then + echo "Usage: $0 (usually 'prometheus')" + exit 1 +fi + +SERVICE_NAME=$1 + +# Get the container ID of the service +CONTAINER_ID=$(docker-compose ps -q $SERVICE_NAME) + +# Check if the container ID is found +if [ -z "$CONTAINER_ID" ]; then + echo "Service $SERVICE_NAME not found or not running." + exit 1 +fi + +# Reload Prometheus configuration +echo "Reloading Prometheus configuration..." +curl -X POST http://localhost:9090/-/reload + +# Output the result +if [ $? -eq 0 ]; then + echo "Prometheus configuration reloaded successfully." +else + echo "Failed to reload Prometheus configuration." + exit 1 +fi \ No newline at end of file