import os from typing import Dict, Any, Optional, Tuple import langcodes from langchain_core.language_models import BaseChatModel from common.langchain.llm_metrics_handler import LLMMetricsHandler from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic from langchain_mistralai import ChatMistralAI from flask import current_app from common.eveai_model.tracked_mistral_embeddings import TrackedMistralAIEmbeddings from common.langchain.tracked_transcription import TrackedOpenAITranscription from common.models.user import Tenant from config.model_config import MODEL_CONFIG from common.extensions import template_manager from common.models.document import EmbeddingMistral from common.utils.eveai_exceptions import EveAITenantNotFound, EveAIInvalidEmbeddingModel from crewai import LLM embedding_llm_model_cache: Dict[Tuple[str, float], BaseChatModel] = {} crewai_llm_model_cache: Dict[Tuple[str, float], LLM] = {} llm_metrics_handler = LLMMetricsHandler() def create_language_template(template: str, language: str) -> str: """ Replace language placeholder in template with specified language Args: template: Template string with {language} placeholder language: Language code to insert Returns: str: Template with language placeholder replaced """ try: full_language = langcodes.Language.make(language=language) language_template = template.replace('{language}', full_language.display_name()) except ValueError: language_template = template.replace('{language}', language) return language_template def replace_variable_in_template(template: str, variable: str, value: str) -> str: """ Replace a variable placeholder in template with specified value Args: template: Template string with variable placeholder variable: Variable placeholder to replace (e.g. "{tenant_context}") value: Value to insert Returns: str: Template with variable placeholder replaced """ return template.replace(variable, value or "") def get_embedding_model_and_class(tenant_id, catalog_id, full_embedding_name): """ Retrieve the embedding model and embedding model class to store Embeddings Args: tenant_id: ID of the tenant catalog_id: ID of the catalog full_embedding_name: The full name of the embedding model: . Returns: embedding_model, embedding_model_class """ embedding_provider, embedding_model_name = full_embedding_name.split('.') # Calculate the embedding model to be used if embedding_provider == "mistral": api_key = current_app.config['MISTRAL_API_KEY'] embedding_model = TrackedMistralAIEmbeddings( model=embedding_model_name ) else: raise EveAIInvalidEmbeddingModel(tenant_id, catalog_id) # Calculate the Embedding Model Class to be used to store embeddings if embedding_model_name == "mistral-embed": embedding_model_class = EmbeddingMistral else: raise EveAIInvalidEmbeddingModel(tenant_id, catalog_id) return embedding_model, embedding_model_class def get_embedding_llm(full_model_name='mistral.mistral-small-latest', temperature=0.3): llm = embedding_llm_model_cache.get((full_model_name, temperature)) if not llm: llm_provider, llm_model_name = full_model_name.split('.') if llm_provider == "openai": llm = ChatOpenAI( api_key=current_app.config['OPENAI_API_KEY'], model=llm_model_name, temperature=temperature, callbacks=[llm_metrics_handler] ) elif llm_provider == "mistral": llm = ChatMistralAI( api_key=current_app.config['MISTRAL_API_KEY'], model=llm_model_name, temperature=temperature, callbacks=[llm_metrics_handler] ) embedding_llm_model_cache[(full_model_name, temperature)] = llm return llm def get_crewai_llm(full_model_name='mistral.mistral-large-latest', temperature=0.3): llm = crewai_llm_model_cache.get((full_model_name, temperature)) if not llm: llm_provider, llm_model_name = full_model_name.split('.') crew_full_model_name = f"{llm_provider}/{llm_model_name}" api_key = None if llm_provider == "openai": api_key = current_app.config['OPENAI_API_KEY'] elif llm_provider == "mistral": api_key = current_app.config['MISTRAL_API_KEY'] llm = LLM( model=crew_full_model_name, temperature=temperature, api_key=api_key ) crewai_llm_model_cache[(full_model_name, temperature)] = llm return llm class ModelVariables: """Manages model-related variables and configurations""" def __init__(self, tenant_id: int, variables: Dict[str, Any] = None): """ Initialize ModelVariables with tenant and optional template manager Args: tenant_id: Tenant instance variables: Optional variables """ current_app.logger.info(f'Model variables initialized with tenant {tenant_id} and variables \n{variables}') self.tenant_id = tenant_id self._variables = variables if variables is not None else self._initialize_variables() current_app.logger.info(f'Model _variables initialized to {self._variables}') self._llm_instances = {} self.llm_metrics_handler = LLMMetricsHandler() self._transcription_model = None def _initialize_variables(self) -> Dict[str, Any]: """Initialize the variables dictionary""" variables = {} tenant = Tenant.query.get(self.tenant_id) if not tenant: raise EveAITenantNotFound(self.tenant_id) # Set model providers variables['llm_provider'], variables['llm_model'] = tenant.llm_model.split('.') variables['llm_full_model'] = tenant.llm_model # Set model-specific configurations model_config = MODEL_CONFIG.get(variables['llm_provider'], {}).get(variables['llm_model'], {}) variables.update(model_config) # Additional configurations variables['annotation_chunk_length'] = current_app.config['ANNOTATION_TEXT_CHUNK_LENGTH'][tenant.llm_model] variables['max_compression_duration'] = current_app.config['MAX_COMPRESSION_DURATION'] variables['max_transcription_duration'] = current_app.config['MAX_TRANSCRIPTION_DURATION'] variables['compression_cpu_limit'] = current_app.config['COMPRESSION_CPU_LIMIT'] variables['compression_process_delay'] = current_app.config['COMPRESSION_PROCESS_DELAY'] return variables @property def annotation_chunk_length(self): return self._variables['annotation_chunk_length'] @property def max_compression_duration(self): return self._variables['max_compression_duration'] @property def max_transcription_duration(self): return self._variables['max_transcription_duration'] @property def compression_cpu_limit(self): return self._variables['compression_cpu_limit'] @property def compression_process_delay(self): return self._variables['compression_process_delay'] def get_llm(self, temperature: float = 0.3, **kwargs) -> Any: """ Get an LLM instance with specific configuration Args: temperature: The temperature for the LLM **kwargs: Additional configuration parameters Returns: An instance of the configured LLM """ cache_key = f"{temperature}_{hash(frozenset(kwargs.items()))}" if cache_key not in self._llm_instances: provider = self._variables['llm_provider'] model = self._variables['llm_model'] if provider == 'openai': self._llm_instances[cache_key] = ChatOpenAI( api_key=os.getenv('OPENAI_API_KEY'), model=model, temperature=temperature, callbacks=[self.llm_metrics_handler], **kwargs ) elif provider == 'anthropic': self._llm_instances[cache_key] = ChatAnthropic( api_key=os.getenv('ANTHROPIC_API_KEY'), model=current_app.config['ANTHROPIC_LLM_VERSIONS'][model], temperature=temperature, callbacks=[self.llm_metrics_handler], **kwargs ) else: raise ValueError(f"Unsupported LLM provider: {provider}") return self._llm_instances[cache_key] @property def transcription_model(self) -> TrackedOpenAITranscription: """Get the transcription model instance""" if self._transcription_model is None: api_key = os.getenv('OPENAI_API_KEY') self._transcription_model = TrackedOpenAITranscription( api_key=api_key, model='whisper-1' ) return self._transcription_model # Remove the old transcription-related methods since they're now handled by TrackedOpenAITranscription @property def transcription_client(self): raise DeprecationWarning("Use transcription_model instead") def transcribe(self, *args, **kwargs): raise DeprecationWarning("Use transcription_model.transcribe() instead") def get_template(self, template_name: str, version: Optional[str] = None) -> str: """ Get a template for the tenant's configured LLM Args: template_name: Name of the template to retrieve version: Optional specific version to retrieve Returns: The template content """ try: template = template_manager.get_template( self._variables['llm_full_model'], template_name, version ) return template.content except Exception as e: current_app.logger.error(f"Error getting template {template_name}: {str(e)}") # Fall back to old template loading if template_manager fails if template_name in self._variables.get('templates', {}): return self._variables['templates'][template_name] raise # Helper function to get cached model variables def get_model_variables(tenant_id: int) -> ModelVariables: return ModelVariables(tenant_id=tenant_id)