diff --git a/common/langchain/templates/template_manager.py b/common/langchain/templates/template_manager.py index 5dd8d67..6b3c064 100644 --- a/common/langchain/templates/template_manager.py +++ b/common/langchain/templates/template_manager.py @@ -28,7 +28,6 @@ class TemplateManager: # Initialize template manager base_dir = "/app" self.templates_dir = os.path.join(base_dir, 'config', 'prompts') - app.logger.debug(f'Loading templates from {self.templates_dir}') self.app = app self._templates = self._load_templates() # Log available templates for each supported model diff --git a/common/models/entitlements.py b/common/models/entitlements.py index cf3dc26..6b6fdad 100644 --- a/common/models/entitlements.py +++ b/common/models/entitlements.py @@ -11,7 +11,7 @@ class BusinessEventLog(db.Model): tenant_id = db.Column(db.Integer, nullable=False) trace_id = db.Column(db.String(50), nullable=False) span_id = db.Column(db.String(50)) - span_name = db.Column(db.String(50)) + span_name = db.Column(db.String(255)) parent_span_id = db.Column(db.String(50)) document_version_id = db.Column(db.Integer) document_version_file_size = db.Column(db.Float) diff --git a/common/utils/business_event.py b/common/utils/business_event.py index c238c66..e308399 100644 --- a/common/utils/business_event.py +++ b/common/utils/business_event.py @@ -2,13 +2,14 @@ import os import uuid from contextlib import contextmanager from datetime import datetime -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List from datetime import datetime as dt, timezone as tz import logging from .business_event_context import BusinessEventContext from common.models.entitlements import BusinessEventLog from common.extensions import db +from .celery_utils import current_celery class BusinessEvent: @@ -38,6 +39,7 @@ class BusinessEvent: 'call_count': 0, 'interaction_type': None } + self._log_buffer = [] def update_attribute(self, attribute: str, value: any): if hasattr(self, attribute): @@ -80,7 +82,7 @@ class BusinessEvent: self.span_name = span_name self.parent_span_id = parent_span_id - self.log(f"Starting span {span_name}") + self.log(f"Start") try: yield @@ -88,7 +90,7 @@ class BusinessEvent: if self.llm_metrics['call_count'] > 0: self.log_final_metrics() self.reset_llm_metrics() - self.log(f"Ending span {span_name}") + self.log(f"End") # Restore the previous span info if self.spans: self.span_id, self.span_name, self.parent_span_id = self.spans.pop() @@ -98,8 +100,8 @@ class BusinessEvent: self.parent_span_id = None def log(self, message: str, level: str = 'info'): - logger = logging.getLogger('business_events') log_data = { + 'timestamp': dt.now(tz=tz.utc), 'event_type': self.event_type, 'tenant_id': self.tenant_id, 'trace_id': self.trace_id, @@ -111,34 +113,16 @@ class BusinessEvent: 'chat_session_id': self.chat_session_id, 'interaction_id': self.interaction_id, 'environment': self.environment, + 'message': message, } - # log to Graylog - getattr(logger, level)(message, extra=log_data) - - # Log to database - event_log = BusinessEventLog( - timestamp=dt.now(tz=tz.utc), - event_type=self.event_type, - tenant_id=self.tenant_id, - trace_id=self.trace_id, - span_id=self.span_id, - span_name=self.span_name, - parent_span_id=self.parent_span_id, - document_version_id=self.document_version_id, - document_version_file_size=self.document_version_file_size, - chat_session_id=self.chat_session_id, - interaction_id=self.interaction_id, - environment=self.environment, - message=message - ) - db.session.add(event_log) - db.session.commit() + self._log_buffer.append(log_data) def log_llm_metrics(self, metrics: dict, level: str = 'info'): self.update_llm_metrics(metrics) message = "LLM Metrics" logger = logging.getLogger('business_events') log_data = { + 'timestamp': dt.now(tz=tz.utc), 'event_type': self.event_type, 'tenant_id': self.tenant_id, 'trace_id': self.trace_id, @@ -155,38 +139,15 @@ class BusinessEvent: 'llm_metrics_completion_tokens': metrics['completion_tokens'], 'llm_metrics_total_time': metrics['time_elapsed'], 'llm_interaction_type': metrics['interaction_type'], + 'message': message, } - # log to Graylog - getattr(logger, level)(message, extra=log_data) - - # Log to database - event_log = BusinessEventLog( - timestamp=dt.now(tz=tz.utc), - event_type=self.event_type, - tenant_id=self.tenant_id, - trace_id=self.trace_id, - span_id=self.span_id, - span_name=self.span_name, - parent_span_id=self.parent_span_id, - document_version_id=self.document_version_id, - document_version_file_size=self.document_version_file_size, - chat_session_id=self.chat_session_id, - interaction_id=self.interaction_id, - environment=self.environment, - llm_metrics_total_tokens=metrics['total_tokens'], - llm_metrics_prompt_tokens=metrics['prompt_tokens'], - llm_metrics_completion_tokens=metrics['completion_tokens'], - llm_metrics_total_time=metrics['time_elapsed'], - llm_interaction_type=metrics['interaction_type'], - message=message - ) - db.session.add(event_log) - db.session.commit() + self._log_buffer.append(log_data) def log_final_metrics(self, level: str = 'info'): logger = logging.getLogger('business_events') message = "Final LLM Metrics" log_data = { + 'timestamp': dt.now(tz=tz.utc), 'event_type': self.event_type, 'tenant_id': self.tenant_id, 'trace_id': self.trace_id, @@ -204,34 +165,65 @@ class BusinessEvent: 'llm_metrics_total_time': self.llm_metrics['total_time'], 'llm_metrics_call_count': self.llm_metrics['call_count'], 'llm_interaction_type': self.llm_metrics['interaction_type'], + 'message': message, } - # log to Graylog - getattr(logger, level)(message, extra=log_data) + self._log_buffer.append(log_data) - # Log to database - event_log = BusinessEventLog( - timestamp=dt.now(tz=tz.utc), - event_type=self.event_type, - tenant_id=self.tenant_id, - trace_id=self.trace_id, - span_id=self.span_id, - span_name=self.span_name, - parent_span_id=self.parent_span_id, - document_version_id=self.document_version_id, - document_version_file_size=self.document_version_file_size, - chat_session_id=self.chat_session_id, - interaction_id=self.interaction_id, - environment=self.environment, - llm_metrics_total_tokens=self.llm_metrics['total_tokens'], - llm_metrics_prompt_tokens=self.llm_metrics['prompt_tokens'], - llm_metrics_completion_tokens=self.llm_metrics['completion_tokens'], - llm_metrics_total_time=self.llm_metrics['total_time'], - llm_metrics_call_count=self.llm_metrics['call_count'], - llm_interaction_type=self.llm_metrics['interaction_type'], - message=message - ) - db.session.add(event_log) - db.session.commit() + @staticmethod + def _direct_db_persist(log_entries: List[Dict[str, Any]]): + """Fallback method to directly persist logs to DB if async fails""" + try: + db_entries = [] + for entry in log_entries: + event_log = BusinessEventLog( + timestamp=entry.pop('timestamp'), + event_type=entry.pop('event_type'), + tenant_id=entry.pop('tenant_id'), + trace_id=entry.pop('trace_id'), + span_id=entry.pop('span_id', None), + span_name=entry.pop('span_name', None), + parent_span_id=entry.pop('parent_span_id', None), + document_version_id=entry.pop('document_version_id', None), + document_version_file_size=entry.pop('document_version_file_size', None), + chat_session_id=entry.pop('chat_session_id', None), + interaction_id=entry.pop('interaction_id', None), + environment=entry.pop('environment', None), + llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None), + llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None), + llm_metrics_completion_tokens=entry.pop('llm_metrics_completion_tokens', None), + llm_metrics_total_time=entry.pop('llm_metrics_total_time', None), + llm_metrics_call_count=entry.pop('llm_metrics_call_count', None), + llm_interaction_type=entry.pop('llm_interaction_type', None), + message=entry.pop('message', None) + ) + db_entries.append(event_log) + + # Bulk insert + db.session.bulk_save_objects(db_entries) + db.session.commit() + except Exception as e: + logger = logging.getLogger('business_events') + logger.error(f"Failed to persist logs directly to DB: {e}") + db.session.rollback() + + def _flush_log_buffer(self): + """Flush the log buffer to the database via a Celery task""" + if self._log_buffer: + try: + # Send to Celery task + current_celery.send_task( + 'persist_business_events', + args=[self._log_buffer], + queue='entitlements' # Or dedicated log queue + ) + except Exception as e: + # Fallback to direct DB write in case of issues with Celery + logger = logging.getLogger('business_events') + logger.error(f"Failed to send logs to Celery. Falling back to direct DB: {e}") + self._direct_db_persist(self._log_buffer) + + # Clear the buffer after sending + self._log_buffer = [] def __enter__(self): self.log(f'Starting Trace for {self.event_type}') @@ -242,4 +234,5 @@ class BusinessEvent: self.log_final_metrics() self.reset_llm_metrics() self.log(f'Ending Trace for {self.event_type}') + self._flush_log_buffer() return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb) diff --git a/common/utils/cache/base.py b/common/utils/cache/base.py index aa334f3..036a3d9 100644 --- a/common/utils/cache/base.py +++ b/common/utils/cache/base.py @@ -120,9 +120,6 @@ class CacheHandler(Generic[T]): region_name = getattr(self.region, 'name', 'default_region') - current_app.logger.debug(f"Generating cache key in region {region_name} with prefix {self.prefix} " - f"for {self._key_components}") - key = CacheKey({k: identifiers[k] for k in self._key_components}) return f"{region_name}_{self.prefix}:{str(key)}" @@ -138,13 +135,10 @@ class CacheHandler(Generic[T]): Cached or newly created value """ cache_key = self.generate_key(**identifiers) - current_app.logger.debug(f"Getting Cache key: {cache_key}") def creator(): instance = creator_func(**identifiers) - current_app.logger.debug("Caching object created and received. Now serializing...") serialized_instance = self._to_cache_data(instance) - current_app.logger.debug(f"Caching object serialized and received:\n{serialized_instance}") return serialized_instance cached_data = self.region.get_or_create( diff --git a/common/utils/cache/config_cache.py b/common/utils/cache/config_cache.py index a267145..402a6a0 100644 --- a/common/utils/cache/config_cache.py +++ b/common/utils/cache/config_cache.py @@ -69,10 +69,8 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]): Returns: Configuration data """ - current_app.logger.debug(f"Loading specific configuration for {type_name}, version: {version_str} - no cache") version_tree = self.version_tree_cache.get_versions(type_name) versions = version_tree['versions'] - current_app.logger.debug(f"Loaded specific versions for {type_name}, versions: {versions}") if version_str == 'latest': version_str = version_tree['latest_version'] @@ -81,12 +79,10 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]): raise ValueError(f"Version {version_str} not found for {type_name}") file_path = versions[version_str]['file_path'] - current_app.logger.debug(f'Trying to load configuration from {file_path}') try: with open(file_path) as f: config = yaml.safe_load(f) - current_app.logger.debug(f"Loaded config for {type_name}{version_str}: {config}") return config except Exception as e: raise ValueError(f"Error loading config from {file_path}: {e}") @@ -103,8 +99,6 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]): Returns: Configuration data """ - current_app.logger.debug(f"Trying to retrieve config for {self.config_type}, type name: {type_name}, " - f"version: {version}") if version is None: version_str = self.version_tree_cache.get_latest_version(type_name) elif is_major_minor(version): @@ -145,7 +139,6 @@ class BaseConfigVersionTreeCacheHandler(CacheHandler[Dict[str, Any]]): Returns: Dict containing available versions and their metadata """ - current_app.logger.debug(f"Loading version tree for {type_name} - no cache") type_path = Path(self._config_dir) / type_name if not type_path.exists(): raise ValueError(f"No configuration found for type {type_name}") @@ -180,8 +173,6 @@ class BaseConfigVersionTreeCacheHandler(CacheHandler[Dict[str, Any]]): current_app.logger.error(f"Error loading version {ver}: {e}") continue - current_app.logger.debug(f"Loaded versions for {type_name}: {versions}") - current_app.logger.debug(f"Latest version for {type_name}: {latest_version}") return { 'versions': versions, 'latest_version': latest_version @@ -221,7 +212,6 @@ class BaseConfigVersionTreeCacheHandler(CacheHandler[Dict[str, Any]]): Returns: Dict with version information """ - current_app.logger.debug(f"Trying to get version tree for {self.config_type}, {type_name}") return self.get( lambda type_name: self._load_version_tree(type_name), type_name=type_name @@ -319,7 +309,6 @@ class BaseConfigTypesCacheHandler(CacheHandler[Dict[str, Any]]): def _load_type_definitions(self) -> Dict[str, Dict[str, str]]: """Load type definitions from the corresponding type_defs module""" - current_app.logger.debug(f"Loading type definitions for {self.config_type} - no cache") if not self._types_module: raise ValueError("_types_module must be set by subclass") @@ -331,13 +320,10 @@ class BaseConfigTypesCacheHandler(CacheHandler[Dict[str, Any]]): for type_id, info in self._types_module.items() } - current_app.logger.debug(f"Loaded type definitions: {type_definitions}") - return type_definitions def get_types(self) -> Dict[str, Dict[str, str]]: """Get dictionary of available types with name and description""" - current_app.logger.debug(f"Trying to retrieve type definitions for {self.config_type}") result = self.get( lambda type_name: self._load_type_definitions(), type_name=f'{self.config_type}_types', diff --git a/common/utils/cache/crewai_config_processor.py b/common/utils/cache/crewai_config_processor.py index f0397d7..3269252 100644 --- a/common/utils/cache/crewai_config_processor.py +++ b/common/utils/cache/crewai_config_processor.py @@ -211,8 +211,6 @@ class BaseCrewAIConfigProcessor: tasks=self._process_task_configs(specialist_config), tools=self._process_tool_configs(specialist_config) ) - current_app.logger.debug(f"Processed config for tenant {self.tenant_id}, specialist {self.specialist_id}:\n" - f"{processed_config}") return processed_config except Exception as e: diff --git a/common/utils/celery_utils.py b/common/utils/celery_utils.py index 47ee9d9..271cf39 100644 --- a/common/utils/celery_utils.py +++ b/common/utils/celery_utils.py @@ -58,39 +58,6 @@ def init_celery(celery, app, is_beat=False): celery.Task = ContextTask -# Original init_celery before updating for beat -# def init_celery(celery, app): -# celery_app.main = app.name -# app.logger.debug(f'CELERY_BROKER_URL: {app.config["CELERY_BROKER_URL"]}') -# app.logger.debug(f'CELERY_RESULT_BACKEND: {app.config["CELERY_RESULT_BACKEND"]}') -# celery_config = { -# 'broker_url': app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), -# 'result_backend': app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), -# 'task_serializer': app.config.get('CELERY_TASK_SERIALIZER', 'json'), -# 'result_serializer': app.config.get('CELERY_RESULT_SERIALIZER', 'json'), -# 'accept_content': app.config.get('CELERY_ACCEPT_CONTENT', ['json']), -# 'timezone': app.config.get('CELERY_TIMEZONE', 'UTC'), -# 'enable_utc': app.config.get('CELERY_ENABLE_UTC', True), -# 'task_routes': {'eveai_worker.tasks.create_embeddings': {'queue': 'embeddings', -# 'routing_key': 'embeddings.create_embeddings'}}, -# } -# celery_app.conf.update(**celery_config) -# -# # Setting up Celery task queues -# celery_app.conf.task_queues = ( -# Queue('default', routing_key='task.#'), -# Queue('embeddings', routing_key='embeddings.#', queue_arguments={'x-max-priority': 10}), -# Queue('llm_interactions', routing_key='llm_interactions.#', queue_arguments={'x-max-priority': 5}), -# ) -# -# # Ensuring tasks execute with Flask application context -# class ContextTask(celery.Task): -# def __call__(self, *args, **kwargs): -# with app.app_context(): -# return self.run(*args, **kwargs) -# -# celery.Task = ContextTask - def make_celery(app_name, config): return celery_app diff --git a/common/utils/debug_utils.py b/common/utils/debug_utils.py index 2e55be8..f3f5c33 100644 --- a/common/utils/debug_utils.py +++ b/common/utils/debug_utils.py @@ -5,58 +5,11 @@ import json def log_request_middleware(app): - # @app.before_request - # def log_request_info(): - # start_time = time.time() - # app.logger.debug(f"Request URL: {request.url}") - # app.logger.debug(f"Request Method: {request.method}") - # app.logger.debug(f"Request Headers: {request.headers}") - # app.logger.debug(f"Time taken for logging request info: {time.time() - start_time} seconds") - # try: - # app.logger.debug(f"Request Body: {request.get_data()}") - # except Exception as e: - # app.logger.error(f"Error reading request body: {e}") - # app.logger.debug(f"Time taken for logging request body: {time.time() - start_time} seconds") - - # @app.before_request - # def check_csrf_token(): - # start_time = time.time() - # if request.method == "POST": - # csrf_token = request.form.get("csrf_token") - # app.logger.debug(f"CSRF Token: {csrf_token}") - # app.logger.debug(f"Time taken for logging CSRF token: {time.time() - start_time} seconds") - - # @app.before_request - # def log_user_info(): - # if current_user and current_user.is_authenticated: - # app.logger.debug(f"Before: User ID: {current_user.id}") - # app.logger.debug(f"Before: User Email: {current_user.email}") - # app.logger.debug(f"Before: User Roles: {current_user.roles}") - # else: - # app.logger.debug("After: No user logged in") @app.before_request def log_session_state_before(): pass - # @app.after_request - # def log_response_info(response): - # start_time = time.time() - # app.logger.debug(f"Response Status: {response.status}") - # app.logger.debug(f"Response Headers: {response.headers}") - # - # app.logger.debug(f"Time taken for logging response info: {time.time() - start_time} seconds") - # return response - - # @app.after_request - # def log_user_after_request(response): - # if current_user and current_user.is_authenticated: - # app.logger.debug(f"After: User ID: {current_user.id}") - # app.logger.debug(f"after: User Email: {current_user.email}") - # app.logger.debug(f"After: User Roles: {current_user.roles}") - # else: - # app.logger.debug("After: No user logged in") - @app.after_request def log_session_state_after(response): return response @@ -149,8 +102,3 @@ def register_request_debugger(app): # Format the debug info as a pretty-printed JSON string with indentation formatted_debug_info = json.dumps(debug_info, indent=2, sort_keys=True) - # Log everything in a single statement - app.logger.debug( - "Request Debug Information\n", - extra={"request_debug\n": formatted_debug_info} - ) diff --git a/common/utils/document_utils.py b/common/utils/document_utils.py index f555e35..1d0e0b5 100644 --- a/common/utils/document_utils.py +++ b/common/utils/document_utils.py @@ -367,7 +367,6 @@ def mark_tenant_storage_dirty(tenant_id): def cope_with_local_url(url): - current_app.logger.debug(f'Incomming URL: {url}') parsed_url = urlparse(url) # Check if this is an internal WordPress URL (TESTING) and rewrite it if parsed_url.netloc in [current_app.config['EXTERNAL_WORDPRESS_BASE_URL']]: @@ -376,7 +375,6 @@ def cope_with_local_url(url): netloc=f"{current_app.config['WORDPRESS_HOST']}:{current_app.config['WORDPRESS_PORT']}" ) url = urlunparse(parsed_url) - current_app.logger.debug(f'Translated Wordpress URL to: {url}') return url @@ -412,10 +410,6 @@ def lookup_document(tenant_id: int, lookup_criteria: dict, metadata_type: str) - for key, value in lookup_criteria.items(): query = query.filter(metadata_field[key].astext == str(value)) - # Log the final SQL query - current_app.logger.debug( - f"Final SQL query: {query.statement.compile(compile_kwargs={'literal_binds': True})}") - # Get first result result = query.first() diff --git a/common/utils/specialist_utils.py b/common/utils/specialist_utils.py index 7a56770..f0b3200 100644 --- a/common/utils/specialist_utils.py +++ b/common/utils/specialist_utils.py @@ -99,13 +99,11 @@ def _create_agent( timestamp: Optional[dt] = None ) -> EveAIAgent: """Create an agent with the given configuration.""" - current_app.logger.debug(f"Creating agent {agent_type} {agent_version} with {name}, {description}") if timestamp is None: timestamp = dt.now(tz=tz.utc) # Get agent configuration from cache agent_config = cache_manager.agents_config_cache.get_config(agent_type, agent_version) - current_app.logger.debug(f"Agent Config: {agent_config}") agent = EveAIAgent( specialist_id=specialist_id, @@ -142,7 +140,6 @@ def _create_task( # Get task configuration from cache task_config = cache_manager.tasks_config_cache.get_config(task_type, task_version) - current_app.logger.debug(f"Task Config: {task_config}") task = EveAITask( specialist_id=specialist_id, @@ -180,7 +177,6 @@ def _create_tool( # Get tool configuration from cache tool_config = cache_manager.tools_config_cache.get_config(tool_type, tool_version) - current_app.logger.debug(f"Tool Config: {tool_config}") tool = EveAITool( specialist_id=specialist_id, diff --git a/common/utils/startup_eveai.py b/common/utils/startup_eveai.py index 6ae12af..f333424 100644 --- a/common/utils/startup_eveai.py +++ b/common/utils/startup_eveai.py @@ -28,15 +28,10 @@ def perform_startup_invalidation(app): try: # Check if invalidation was already performed if not redis_client.get(marker_key): - app.logger.debug(f"Performing cache invalidation at startup time {startup_time}") - app.logger.debug(f"Current cache keys: {redis_client.keys('*')}") - # Perform invalidation cache_manager.invalidate_region('eveai_config') cache_manager.invalidate_region('eveai_chat_workers') - app.logger.debug(f"Cache keys after invalidation: {redis_client.keys('*')}") - redis_client.setex(marker_key, 180, str(startup_time)) app.logger.info("Startup cache invalidation completed") else: diff --git a/eveai_api/__init__.py b/eveai_api/__init__.py index ad2f983..cd04054 100644 --- a/eveai_api/__init__.py +++ b/eveai_api/__init__.py @@ -66,7 +66,6 @@ def create_app(config_file=None): @app.before_request def check_cors(): if request.method == 'OPTIONS': - app.logger.debug("Handling OPTIONS request") return '', 200 # Allow OPTIONS to pass through origin = request.headers.get('Origin') diff --git a/eveai_api/api/auth.py b/eveai_api/api/auth.py index d6207aa..c10ea2e 100644 --- a/eveai_api/api/auth.py +++ b/eveai_api/api/auth.py @@ -83,7 +83,6 @@ class Token(Resource): expires_delta=expires_delta, additional_claims=additional_claims ) - current_app.logger.debug(f"Created token: {access_token}") return { 'access_token': access_token, 'expires_in': expires_delta.total_seconds() @@ -164,10 +163,6 @@ class Services(Resource): """ Get allowed services for the current token """ - # Log the incoming authorization header - auth_header = request.headers.get('Authorization') - current_app.logger.debug(f"Received Authorization header: {auth_header}") - claims = get_jwt() tenant_id = get_jwt_identity() diff --git a/eveai_api/api/document_api.py b/eveai_api/api/document_api.py index 47f4ac4..7c0d2f3 100644 --- a/eveai_api/api/document_api.py +++ b/eveai_api/api/document_api.py @@ -345,7 +345,6 @@ class DocumentResource(Resource): def put(self, document_id): """Edit a document. The content of the document will not be refreshed!""" try: - current_app.logger.debug(f'Editing document {document_id}') data = request.json tenant_id = get_jwt_identity() updated_doc, error = edit_document(tenant_id, document_id, data.get('name', None), diff --git a/eveai_api/api/specialist_execution_api.py b/eveai_api/api/specialist_execution_api.py index 0f302ca..11f88e1 100644 --- a/eveai_api/api/specialist_execution_api.py +++ b/eveai_api/api/specialist_execution_api.py @@ -118,7 +118,6 @@ class SpecialistArgument(Resource): if specialist: configuration = cache_manager.specialists_config_cache.get_config(specialist.type, specialist.type_version) - current_app.logger.debug(f"Configuration returned: {configuration}") if configuration: if 'arguments' in configuration: return { diff --git a/eveai_app/views/user_views.py b/eveai_app/views/user_views.py index 903eb25..f8aaa4d 100644 --- a/eveai_app/views/user_views.py +++ b/eveai_app/views/user_views.py @@ -36,9 +36,7 @@ def log_after_request(response): @roles_required('Super User') def tenant(): form = TenantForm() - current_app.logger.debug(f'Tenant form: {form}') if form.validate_on_submit(): - current_app.logger.debug(f'Tenant form submitted: {form.data}') # Handle the required attributes new_tenant = Tenant() form.populate_obj(new_tenant) diff --git a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py index fd8ac5b..9a2f0db 100644 --- a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py +++ b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py @@ -198,7 +198,13 @@ class SPINFlowState(EveAIFlowState): class SPINFlow(EveAICrewAIFlow[SPINFlowState]): - def __init__(self, specialist_executor, rag_crew, spin_crew, identification_crew, rag_consolidation_crew, **kwargs): + def __init__(self, + specialist_executor: CrewAIBaseSpecialistExecutor, + rag_crew: EveAICrewAICrew, + spin_crew: EveAICrewAICrew, + identification_crew: EveAICrewAICrew, + rag_consolidation_crew: EveAICrewAICrew, + **kwargs): super().__init__(specialist_executor, "SPIN Specialist Flow", **kwargs) self.specialist_executor = specialist_executor self.rag_crew = rag_crew diff --git a/eveai_chat_workers/specialists/crewai_base_classes.py b/eveai_chat_workers/specialists/crewai_base_classes.py index 8b47ce7..6f997b8 100644 --- a/eveai_chat_workers/specialists/crewai_base_classes.py +++ b/eveai_chat_workers/specialists/crewai_base_classes.py @@ -1,12 +1,16 @@ import json +import time from crewai import Agent, Task, Crew, Flow from crewai.agents.parser import AgentAction, AgentFinish +from crewai.crews import CrewOutput from crewai.tools import BaseTool from flask import current_app from pydantic import BaseModel, create_model, Field, ConfigDict from typing import Dict, Type, get_type_hints, Optional, List, Any, Callable +from common.utils.business_event_context import current_event + class EveAICrewAIAgent(Agent): specialist: Any = Field(default=None, exclude=True) @@ -36,26 +40,27 @@ class EveAICrewAIAgent(Agent): Returns: Output of the agent """ - self.specialist.log_tuning("EveAI Agent Task Start", - {"name": self.name, - 'task': task.name, - }) - self.specialist.update_progress("EveAI Agent Task Start", - {"name": self.name, - 'task': task.name, - }) + with current_event.create_span(f"Task Execution {task.name} by {self.name}"): + self.specialist.log_tuning("EveAI Agent Task Start", + {"name": self.name, + 'task': task.name, + }) + self.specialist.update_progress("EveAI Agent Task Start", + {"name": self.name, + 'task': task.name, + }) - result = super().execute_task(task, context, tools) + result = super().execute_task(task, context, tools) - self.specialist.log_tuning("EveAI Agent Task Complete", - {"name": self.name, - 'task': task.name, - 'result': result, - }) - self.specialist.update_progress("EveAI Agent Task Complete", - {"name": self.name, - 'task': task.name, - }) + self.specialist.log_tuning("EveAI Agent Task Complete", + {"name": self.name, + 'task': task.name, + 'result': result, + }) + self.specialist.update_progress("EveAI Agent Task Complete", + {"name": self.name, + 'task': task.name, + }) return result @@ -75,26 +80,6 @@ class EveAICrewAITask(Task): self.specialist.update_progress("EveAI Task Initialisation", {"name": name}) -# def create_task_callback(task: EveAICrewAITask): -# def task_callback(output): -# # Todo Check if required with new version of crewai -# if isinstance(output, BaseModel): -# task.specialist.log_tuning(f"TASK CALLBACK: EveAICrewAITask {task.name} Output:", -# {'output': output.model_dump()}) -# if output.output_format == "pydantic" and not output.pydantic: -# try: -# raw_json = json.loads(output.raw) -# output_pydantic = task.output_pydantic(**raw_json) -# output.pydantic = output_pydantic -# task.specialist.log_tuning(f"TASK CALLBACK: EveAICrewAITask {task.name} Converted Output", -# {'output': output_pydantic.model_dump()}) -# except Exception as e: -# task.specialist.log_tuning(f"TASK CALLBACK: EveAICrewAITask {task.name} Output Conversion Error: " -# f"{str(e)}", {}) -# -# return task_callback - - class EveAICrewAICrew(Crew): specialist: Any = Field(default=None, exclude=True) name: str = Field(default=None, exclude=True) @@ -107,6 +92,24 @@ class EveAICrewAICrew(Crew): self.specialist.log_tuning("Initializing EveAICrewAICrew", {"name": self.name}) self.specialist.update_progress("EveAI Crew Initialisation", {"name": self.name}) + def kickoff( + self, + inputs: Optional[Dict[str, Any]] = None, + ) -> CrewOutput: + with current_event.create_span(f"Crew {self.name} kickoff"): + start_time = time.time() + results = super().kickoff(inputs) + end_time = time.time() + metrics = { + "total_tokens": self.usage_metrics.total_tokens, + "prompt_tokens": self.usage_metrics.prompt_tokens, + "completion_tokens": self.usage_metrics.completion_tokens, + "time_elapsed": end_time - start_time, + "interaction_type": "Crew Execution" + } + current_event.log_llm_metrics(metrics) + + return results class EveAICrewAIFlow(Flow): specialist: Any = Field(default=None, exclude=True) diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index fa9b333..d3742cd 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -244,11 +244,6 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict session_id, create_params={'timezone': user_timezone} ) - if cached_session: - current_app.logger.debug(f"Cached Session successfully retrieved for {session_id}: {cached_session.id}") - else: - current_app.logger.debug(f"No Cached Session retrieved for {session_id}") - # Get specialist from database specialist = Specialist.query.get_or_404(specialist_id) diff --git a/eveai_entitlements/tasks.py b/eveai_entitlements/tasks.py index 2847de6..79a8110 100644 --- a/eveai_entitlements/tasks.py +++ b/eveai_entitlements/tasks.py @@ -30,6 +30,8 @@ def update_usages(): error_list = [] for tenant_id in tenant_ids: + if tenant_id == 1: + continue try: Database(tenant_id).switch_schema() check_and_create_license_usage_for_tenant(tenant_id) @@ -65,6 +67,50 @@ def update_usages(): return "Update Usages taks completed successfully" +@current_celery.task(name='persist_business_events', queue='entitlements') +def persist_business_events(log_entries): + """ + Persist multiple business event logs to the database in a single transaction + + Args: + log_entries: List of log event dictionaries to persist + """ + try: + db_entries = [] + for entry in log_entries: + event_log = BusinessEventLog( + timestamp=entry.pop('timestamp'), + event_type=entry.pop('event_type'), + tenant_id=entry.pop('tenant_id'), + trace_id=entry.pop('trace_id'), + span_id=entry.pop('span_id', None), + span_name=entry.pop('span_name', None), + parent_span_id=entry.pop('parent_span_id', None), + document_version_id=entry.pop('document_version_id', None), + document_version_file_size=entry.pop('document_version_file_size', None), + chat_session_id=entry.pop('chat_session_id', None), + interaction_id=entry.pop('interaction_id', None), + environment=entry.pop('environment', None), + llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None), + llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None), + llm_metrics_completion_tokens=entry.pop('llm_metrics_completion_tokens', None), + llm_metrics_total_time=entry.pop('llm_metrics_total_time', None), + llm_metrics_call_count=entry.pop('llm_metrics_call_count', None), + llm_interaction_type=entry.pop('llm_interaction_type', None), + message=entry.pop('message', None) + ) + db_entries.append(event_log) + + # Perform a bulk insert of all entries + db.session.bulk_save_objects(db_entries) + db.session.commit() + current_app.logger.info(f"Successfully persisted {len(db_entries)} business event logs") + + except Exception as e: + current_app.logger.error(f"Failed to persist business event logs: {e}") + db.session.rollback() + + def get_all_tenant_ids(): tenant_ids = db.session.query(Tenant.id).all() return [tenant_id[0] for tenant_id in tenant_ids] # Extract tenant_id from tuples @@ -193,6 +239,18 @@ def process_logs_for_license_usage(tenant_id, license_usage_id, logs): interaction_completion_tokens_used += log.llm_metrics_completion_tokens interaction_total_tokens_used += log.llm_metrics_total_tokens + # Case for 'Specialist Execution' event + elif log.event_type == 'Execute Specialist': + if log.message == 'Final LLM Metrics': + if log.span_name == 'Specialist Retrieval': # This is embedding + embedding_prompt_tokens_used += log.llm_metrics_prompt_tokens + embedding_completion_tokens_used += log.llm_metrics_completion_tokens + embedding_total_tokens_used += log.llm_metrics_total_tokens + else: # This is an interaction + interaction_prompt_tokens_used += log.llm_metrics_prompt_tokens + interaction_completion_tokens_used += log.llm_metrics_completion_tokens + interaction_total_tokens_used += log.llm_metrics_total_tokens + # Mark the log as processed by setting the license_usage_id log.license_usage_id = license_usage_id diff --git a/migrations/public/versions/4d2842d9c1d0_enlarging_span_name_field_for_business_.py b/migrations/public/versions/4d2842d9c1d0_enlarging_span_name_field_for_business_.py new file mode 100644 index 0000000..e1f67d0 --- /dev/null +++ b/migrations/public/versions/4d2842d9c1d0_enlarging_span_name_field_for_business_.py @@ -0,0 +1,38 @@ +"""Enlarging span_name field for business events + +Revision ID: 4d2842d9c1d0 +Revises: b02d9ad000f4 +Create Date: 2025-03-06 14:27:37.986152 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '4d2842d9c1d0' +down_revision = 'b02d9ad000f4' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.alter_column('span_name', + existing_type=sa.VARCHAR(length=50), + type_=sa.String(length=255), + existing_nullable=True) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.alter_column('span_name', + existing_type=sa.String(length=255), + type_=sa.VARCHAR(length=50), + existing_nullable=True) + + # ### end Alembic commands ###