- Traicie Selection Specialist Round Trip - Session improvements + debugging enabled - Tone of Voice & Langauge Level definitions introduced
151 lines
5.7 KiB
Python
151 lines
5.7 KiB
Python
import importlib
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any, List
|
|
from flask import current_app
|
|
|
|
from common.extensions import cache_manager
|
|
from common.models.interaction import SpecialistRetriever, Specialist
|
|
from common.models.user import Tenant
|
|
from common.utils.execution_progress import ExecutionProgressTracker
|
|
from config.logging_config import TuningLogger
|
|
from eveai_chat_workers.retrievers.base import BaseRetriever
|
|
from eveai_chat_workers.retrievers.registry import RetrieverRegistry
|
|
from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments, SpecialistResult
|
|
|
|
|
|
class BaseSpecialistExecutor(ABC):
|
|
"""Base class for all specialists"""
|
|
|
|
def __init__(self, tenant_id: int, specialist_id: int, session_id: str, task_id: str):
|
|
self.tenant_id = tenant_id
|
|
self.tenant = Tenant.query.get_or_404(tenant_id)
|
|
self.specialist_id = specialist_id
|
|
self.specialist = Specialist.query.get_or_404(specialist_id)
|
|
self.session_id = session_id
|
|
self.task_id = task_id
|
|
self.tuning = False
|
|
self.tuning_logger: TuningLogger = None
|
|
self._setup_tuning_logger()
|
|
self.ept = ExecutionProgressTracker()
|
|
|
|
@property
|
|
@abstractmethod
|
|
def type(self) -> str:
|
|
"""The type of the specialist"""
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
@abstractmethod
|
|
def type_version(self) -> str:
|
|
"""The type version of the specialist"""
|
|
raise NotImplementedError
|
|
|
|
def _initialize_retrievers(self) -> List[BaseRetriever]:
|
|
"""Initialize all retrievers associated with this specialist"""
|
|
retrievers = []
|
|
|
|
# Get retriever associations from database
|
|
specialist_retrievers = (
|
|
SpecialistRetriever.query
|
|
.filter_by(specialist_id=self.specialist_id)
|
|
.all()
|
|
)
|
|
|
|
self.log_tuning("_initialize_retrievers", {"Nr of retrievers": len(specialist_retrievers)})
|
|
|
|
for spec_retriever in specialist_retrievers:
|
|
# Get retriever configuration from database
|
|
retriever = spec_retriever.retriever
|
|
retriever_class = RetrieverRegistry.get_retriever_class(retriever.type)
|
|
self.log_tuning("_initialize_retrievers", {
|
|
"Retriever id": spec_retriever.retriever_id,
|
|
"Retriever Type": retriever.type,
|
|
"Retriever Class": str(retriever_class),
|
|
})
|
|
|
|
# Initialize retriever with its configuration
|
|
retrievers.append(
|
|
retriever_class(
|
|
tenant_id=self.tenant_id,
|
|
retriever_id=retriever.id,
|
|
)
|
|
)
|
|
|
|
return retrievers
|
|
|
|
def _setup_tuning_logger(self):
|
|
try:
|
|
self.tuning_logger = TuningLogger(
|
|
'tuning',
|
|
tenant_id=self.tenant_id,
|
|
specialist_id=self.specialist_id,
|
|
session_id=self.session_id,
|
|
log_file=f"logs/tuning_{self.session_id}.log"
|
|
)
|
|
# Verify logger is working with a test message
|
|
if self.tuning:
|
|
self.tuning_logger.log_tuning('specialist', "Tuning logger initialized")
|
|
except Exception as e:
|
|
current_app.logger.error(f"Failed to setup tuning logger: {str(e)}")
|
|
raise
|
|
|
|
def log_tuning(self, message: str, data: Dict[str, Any] = None) -> None:
|
|
if self.tuning and self.tuning_logger:
|
|
try:
|
|
self.tuning_logger.log_tuning('specialist', message, data)
|
|
except Exception as e:
|
|
current_app.logger.error(f"Processor: Error in tuning logging: {e}")
|
|
|
|
def update_progress(self, processing_type, data) -> None:
|
|
self.ept.send_update(self.task_id, processing_type, data)
|
|
|
|
def _replace_system_variables(self, text: str) -> str:
|
|
"""
|
|
Replace all system variables in the text with their corresponding values.
|
|
System variables are in the format 'tenant_<attribute_name>'
|
|
|
|
Args:
|
|
text: The text containing system variables to replace
|
|
|
|
Returns:
|
|
str: The text with all system variables replaced
|
|
"""
|
|
if not text:
|
|
return text
|
|
|
|
from common.utils.model_utils import replace_variable_in_template
|
|
|
|
# Find all tenant_* variables and replace them with tenant attribute values
|
|
# Format of variables: tenant_name, tenant_code, etc.
|
|
result = text
|
|
|
|
# Get all attributes of the tenant object
|
|
tenant_attrs = vars(self.tenant)
|
|
|
|
# Replace all tenant_* variables
|
|
for attr_name, attr_value in tenant_attrs.items():
|
|
variable = f"tenant_{attr_name}"
|
|
if variable in result:
|
|
result = replace_variable_in_template(result, variable, str(attr_value))
|
|
|
|
return result
|
|
|
|
@abstractmethod
|
|
def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult:
|
|
"""Execute the specialist's logic"""
|
|
raise NotImplementedError
|
|
|
|
|
|
def get_specialist_class(specialist_type: str, type_version: str):
|
|
major_minor = '_'.join(type_version.split('.')[:2])
|
|
specialist_config = cache_manager.specialists_config_cache.get_config(specialist_type, type_version)
|
|
partner = specialist_config.get("partner", None)
|
|
current_app.logger.debug(f"Specialist partner for {specialist_type} {type_version} is {partner}")
|
|
if partner:
|
|
module_path = f"eveai_chat_workers.specialists.{partner}.{specialist_type}.{major_minor}"
|
|
else:
|
|
module_path = f"eveai_chat_workers.specialists.{specialist_type}.{major_minor}"
|
|
current_app.logger.debug(f"Importing specialist class from {module_path}")
|
|
module = importlib.import_module(module_path)
|
|
return module.SpecialistExecutor
|