import importlib from abc import ABC, abstractmethod from typing import Dict, Any, List from flask import current_app from common.extensions import cache_manager from common.models.interaction import SpecialistRetriever, Specialist from common.models.user import Tenant from common.utils.execution_progress import ExecutionProgressTracker from config.logging_config import TuningLogger from eveai_chat_workers.retrievers.base import BaseRetriever from eveai_chat_workers.retrievers.registry import RetrieverRegistry from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments, SpecialistResult class BaseSpecialistExecutor(ABC): """Base class for all specialists""" def __init__(self, tenant_id: int, specialist_id: int, session_id: str, task_id: str): self.tenant_id = tenant_id self.tenant = Tenant.query.get_or_404(tenant_id) self.specialist_id = specialist_id self.specialist = Specialist.query.get_or_404(specialist_id) self.session_id = session_id self.task_id = task_id self.tuning = False self.tuning_logger: TuningLogger = None self._setup_tuning_logger() self.ept = ExecutionProgressTracker() @property @abstractmethod def type(self) -> str: """The type of the specialist""" raise NotImplementedError @property @abstractmethod def type_version(self) -> str: """The type version of the specialist""" raise NotImplementedError def _initialize_retrievers(self) -> List[BaseRetriever]: """Initialize all retrievers associated with this specialist""" retrievers = [] # Get retriever associations from database specialist_retrievers = ( SpecialistRetriever.query .filter_by(specialist_id=self.specialist_id) .all() ) self.log_tuning("_initialize_retrievers", {"Nr of retrievers": len(specialist_retrievers)}) for spec_retriever in specialist_retrievers: # Get retriever configuration from database retriever = spec_retriever.retriever retriever_class = RetrieverRegistry.get_retriever_class(retriever.type) self.log_tuning("_initialize_retrievers", { "Retriever id": spec_retriever.retriever_id, "Retriever Type": retriever.type, "Retriever Class": str(retriever_class), }) # Initialize retriever with its configuration retrievers.append( retriever_class( tenant_id=self.tenant_id, retriever_id=retriever.id, ) ) return retrievers def _setup_tuning_logger(self): try: self.tuning_logger = TuningLogger( 'tuning', tenant_id=self.tenant_id, specialist_id=self.specialist_id, session_id=self.session_id, log_file=f"logs/tuning_{self.session_id}.log" ) # Verify logger is working with a test message if self.tuning: self.tuning_logger.log_tuning('specialist', "Tuning logger initialized") except Exception as e: current_app.logger.error(f"Failed to setup tuning logger: {str(e)}") raise def log_tuning(self, message: str, data: Dict[str, Any] = None) -> None: if self.tuning and self.tuning_logger: try: self.tuning_logger.log_tuning('specialist', message, data) except Exception as e: current_app.logger.error(f"Processor: Error in tuning logging: {e}") def update_progress(self, processing_type, data) -> None: self.ept.send_update(self.task_id, processing_type, data) def _replace_system_variables(self, text: str) -> str: """ Replace all system variables in the text with their corresponding values. System variables are in the format 'tenant_' Args: text: The text containing system variables to replace Returns: str: The text with all system variables replaced """ if not text: return text from common.utils.model_utils import replace_variable_in_template # Find all tenant_* variables and replace them with tenant attribute values # Format of variables: tenant_name, tenant_code, etc. result = text # Get all attributes of the tenant object tenant_attrs = vars(self.tenant) # Replace all tenant_* variables for attr_name, attr_value in tenant_attrs.items(): variable = f"tenant_{attr_name}" if variable in result: result = replace_variable_in_template(result, variable, str(attr_value)) return result @abstractmethod def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult: """Execute the specialist's logic""" raise NotImplementedError def get_specialist_class(specialist_type: str, type_version: str): major_minor = '_'.join(type_version.split('.')[:2]) specialist_config = cache_manager.specialists_config_cache.get_config(specialist_type, type_version) partner = specialist_config.get("partner", None) current_app.logger.debug(f"Specialist partner for {specialist_type} {type_version} is {partner}") if partner: module_path = f"eveai_chat_workers.specialists.{partner}.{specialist_type}.{major_minor}" else: module_path = f"eveai_chat_workers.specialists.{specialist_type}.{major_minor}" current_app.logger.debug(f"Importing specialist class from {module_path}") module = importlib.import_module(module_path) return module.SpecialistExecutor