import importlib import json from abc import ABC, abstractmethod, abstractproperty from typing import Dict, Any, List, Optional, Tuple from flask import current_app from sqlalchemy import func, or_, desc from sqlalchemy.exc import SQLAlchemyError from common.extensions import db, cache_manager from common.models.document import Document, DocumentVersion, Catalog, Retriever from common.utils.model_utils import get_embedding_model_and_class from eveai_chat_workers.retrievers.retriever_typing import RetrieverResult, RetrieverArguments, RetrieverMetadata from config.logging_config import TuningLogger class BaseRetriever(ABC): """Base class for all retrievers""" def __init__(self, tenant_id: int, retriever_id: int): self.tenant_id = tenant_id self.retriever_id = retriever_id self.retriever = Retriever.query.get_or_404(retriever_id) self.catalog_id = self.retriever.catalog_id self.tuning = self.retriever.tuning self.tuning_logger = None self._setup_tuning_logger() self.embedding_model, self.embedding_model_class = ( get_embedding_model_and_class(tenant_id=tenant_id, catalog_id=self.catalog_id)) @property @abstractmethod def type(self) -> str: """The type of the retriever""" raise NotImplementedError @abstractmethod def type_version(self) -> str: """The type version of the retriever""" raise NotImplementedError def _setup_tuning_logger(self): try: self.tuning_logger = TuningLogger( 'tuning', tenant_id=self.tenant_id, retriever_id=self.retriever_id, ) # Verify logger is working with a test message if self.tuning: self.tuning_logger.log_tuning('retriever', "Tuning logger initialized") except Exception as e: current_app.logger.error(f"Failed to setup tuning logger: {str(e)}") raise def _parse_metadata(self, metadata: Any) -> Dict[str, Any]: """ Parse metadata ensuring it's a dictionary Args: metadata: Input metadata which could be string, dict, or None Returns: Dict[str, Any]: Parsed metadata as dictionary """ if metadata is None: return {} if isinstance(metadata, dict): return metadata if isinstance(metadata, str): try: return json.loads(metadata) except json.JSONDecodeError: current_app.logger.warning(f"Failed to parse metadata JSON string: {metadata}") return {} current_app.logger.warning(f"Unexpected metadata type: {type(metadata)}") return {} def log_tuning(self, message: str, data: Dict[str, Any] = None) -> None: if self.tuning and self.tuning_logger: try: self.tuning_logger.log_tuning('retriever', message, data) except Exception as e: current_app.logger.error(f"Processor: Error in tuning logging: {e}") @abstractmethod def retrieve(self, arguments: RetrieverArguments) -> List[RetrieverResult]: """ Retrieve relevant documents based on provided arguments Args: arguments: Dictionary of arguments for the retrieval operation Returns: List[Dict[str, Any]]: List of retrieved documents/content """ raise NotImplementedError def get_retriever_class(retriever_type: str, type_version: str): major_minor = '_'.join(type_version.split('.')[:2]) retriever_config = cache_manager.retrievers_config_cache.get_config(retriever_type, type_version) partner = retriever_config.get("partner", None) if partner: module_path = f"eveai_chat_workers.retrievers.{partner}.{retriever_type}.{major_minor}" else: module_path = f"eveai_chat_workers.retrievers.globals.{retriever_type}.{major_minor}" module = importlib.import_module(module_path) return module.RetrieverExecutor