import uuid from datetime import datetime as dt, timezone as tz from typing import Dict, Any, Tuple, Optional from flask import current_app from sqlalchemy.exc import SQLAlchemyError from common.extensions import db, cache_manager from common.models.interaction import ( Specialist, EveAIAgent, EveAITask, EveAITool ) from common.utils.celery_utils import current_celery from common.utils.model_logging_utils import set_logging_information, update_logging_information class SpecialistServices: @staticmethod def start_session() -> str: return f"CHAT_SESSION_{uuid.uuid4()}" @staticmethod def execute_specialist(tenant_id, specialist_id, specialist_arguments, session_id, user_timezone) -> Dict[str, Any]: task = current_celery.send_task( 'execute_specialist', args=[tenant_id, specialist_id, specialist_arguments, session_id, user_timezone, ], queue='llm_interactions' ) return { 'task_id': task.id, 'status': 'queued', } @staticmethod def initialize_specialist(specialist_id: int, specialist_type: str, specialist_version: str): """ Initialize an agentic specialist by creating all its components based on configuration. Args: specialist_id: ID of the specialist to initialize specialist_type: Type of the specialist specialist_version: Version of the specialist type to use Raises: ValueError: If specialist not found or invalid configuration SQLAlchemyError: If database operations fail """ config = cache_manager.specialists_config_cache.get_config(specialist_type, specialist_version) if not config: raise ValueError(f"No configuration found for {specialist_type} version {specialist_version}") if config['framework'] == 'langchain': pass # Langchain does not require additional items to be initialized. All configuration is in the specialist. specialist = Specialist.query.get(specialist_id) if not specialist: raise ValueError(f"Specialist with ID {specialist_id} not found") if config['framework'] == 'crewai': SpecialistServices.initialize_crewai_specialist(specialist, config) @staticmethod def initialize_crewai_specialist(specialist: Specialist, config: Dict[str, Any]): timestamp = dt.now(tz=tz.utc) try: # Initialize agents if 'agents' in config: for agent_config in config['agents']: SpecialistServices._create_agent( specialist_id=specialist.id, agent_type=agent_config['type'], agent_version=agent_config['version'], name=agent_config.get('name'), description=agent_config.get('description'), timestamp=timestamp ) # Initialize tasks if 'tasks' in config: for task_config in config['tasks']: SpecialistServices._create_task( specialist_id=specialist.id, task_type=task_config['type'], task_version=task_config['version'], name=task_config.get('name'), description=task_config.get('description'), timestamp=timestamp ) # Initialize tools if 'tools' in config: for tool_config in config['tools']: SpecialistServices._create_tool( specialist_id=specialist.id, tool_type=tool_config['type'], tool_version=tool_config['version'], name=tool_config.get('name'), description=tool_config.get('description'), timestamp=timestamp ) db.session.commit() current_app.logger.info(f"Successfully initialized crewai specialist {specialist.id}") except SQLAlchemyError as e: db.session.rollback() current_app.logger.error(f"Database error initializing crewai specialist {specialist.id}: {str(e)}") raise except Exception as e: db.session.rollback() current_app.logger.error(f"Error initializing crewai specialist {specialist.id}: {str(e)}") raise @staticmethod def _create_agent( specialist_id: int, agent_type: str, agent_version: str, name: Optional[str] = None, description: Optional[str] = None, timestamp: Optional[dt] = None ) -> EveAIAgent: """Create an agent with the given configuration.""" if timestamp is None: timestamp = dt.now(tz=tz.utc) # Get agent configuration from cache agent_config = cache_manager.agents_config_cache.get_config(agent_type, agent_version) agent = EveAIAgent( specialist_id=specialist_id, name=name or agent_config.get('name', agent_type), description=description or agent_config.get('metadata').get('description', ''), type=agent_type, type_version=agent_version, role=None, goal=None, backstory=None, tuning=False, configuration=None, arguments=None ) set_logging_information(agent, timestamp) db.session.add(agent) current_app.logger.info(f"Created agent {agent.id} of type {agent_type}") return agent @staticmethod def _create_task( specialist_id: int, task_type: str, task_version: str, name: Optional[str] = None, description: Optional[str] = None, timestamp: Optional[dt] = None ) -> EveAITask: """Create a task with the given configuration.""" if timestamp is None: timestamp = dt.now(tz=tz.utc) # Get task configuration from cache task_config = cache_manager.tasks_config_cache.get_config(task_type, task_version) task = EveAITask( specialist_id=specialist_id, name=name or task_config.get('name', task_type), description=description or task_config.get('metadata').get('description', ''), type=task_type, type_version=task_version, task_description=None, expected_output=None, tuning=False, configuration=None, arguments=None, context=None, asynchronous=False, ) set_logging_information(task, timestamp) db.session.add(task) current_app.logger.info(f"Created task {task.id} of type {task_type}") return task @staticmethod def _create_tool( specialist_id: int, tool_type: str, tool_version: str, name: Optional[str] = None, description: Optional[str] = None, timestamp: Optional[dt] = None ) -> EveAITool: """Create a tool with the given configuration.""" if timestamp is None: timestamp = dt.now(tz=tz.utc) # Get tool configuration from cache tool_config = cache_manager.tools_config_cache.get_config(tool_type, tool_version) tool = EveAITool( specialist_id=specialist_id, name=name or tool_config.get('name', tool_type), description=description or tool_config.get('metadata').get('description', ''), type=tool_type, type_version=tool_version, tuning=False, configuration=None, arguments=None, ) set_logging_information(tool, timestamp) db.session.add(tool) current_app.logger.info(f"Created tool {tool.id} of type {tool_type}") return tool @staticmethod def get_specialist_system_field(specialist_id, config_name, system_name): specialist = Specialist.query.get(specialist_id) if not specialist: raise ValueError(f"Specialist with ID {specialist_id} not found") config = cache_manager.specialists_config_cache.get_config(specialist.type, specialist.version) if not config: raise ValueError(f"No configuration found for {specialist.type} version {specialist.version}") potential_field = config.get(config_name, None) if potential_field: if potential_field.type == 'system' and potential_field.system_name == system_name: return specialist.configuration.get(config_name, None) return None