# common/utils/cache/chat_session_handler.py from typing import Dict, List, Any, Optional from datetime import datetime as dt, timezone as tz from dataclasses import dataclass from flask import current_app from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import joinedload from common.extensions import db, cache_manager from common.models.interaction import ChatSession, Interaction from common.utils.cache.base import CacheHandler @dataclass class CachedInteraction: """Lightweight representation of an interaction for history purposes""" specialist_arguments: Dict[str, Any] # Contains the original question and other arguments specialist_results: Dict[str, Any] # Contains detailed question, answer and other results @dataclass class CachedSession: """Cached representation of a chat session with its interactions""" id: int session_id: str interactions: List[CachedInteraction] timezone: str class ChatSessionCacheHandler(CacheHandler[CachedSession]): """Handles caching of chat sessions focused on interaction history""" handler_name = 'chat_session_cache' def __init__(self, region): super().__init__(region, 'chat_session') self.configure_keys('session_id') def get_cached_session(self, session_id: str, *, create_params: Optional[Dict[str, Any]] = None) -> CachedSession: """ Get or create a cached session with its interaction history. If not in cache, loads from database and caches it. Args: session_id: The session identifier create_params: Optional parameters for session creation if it doesn't exist. Must include 'timezone' if provided. Returns: CachedSession with interaction history """ def creator_func(session_id: str) -> CachedSession: # Load session and interactions from database session = ( ChatSession.query .options(joinedload(ChatSession.interactions)) .filter_by(session_id=session_id) .first() ) if not session: if not create_params: raise ValueError(f"Chat session {session_id} not found and no creation parameters provided") if 'timezone' not in create_params: raise ValueError("timezone is required in create_params for new session creation") # Create new session session = ChatSession( session_id=session_id, session_start=dt.now(tz.utc), timezone=create_params['timezone'] ) try: db.session.add(session) db.session.commit() except SQLAlchemyError as e: db.session.rollback() raise ValueError(f"Failed to create new session: {str(e)}") # Convert to cached format cached_interactions = [ CachedInteraction( specialist_arguments=interaction.specialist_arguments, specialist_results=interaction.specialist_results ) for interaction in session.interactions if interaction.specialist_results is not None # Only include completed interactions ] cached_session = CachedSession( id=session.id, session_id=session_id, interactions=cached_interactions, timezone=session.timezone ) return cached_session return self.get(creator_func, session_id=session_id) def add_completed_interaction(self, session_id: str, interaction: Interaction) -> None: """ Add a completed interaction to the cached session history. Should only be called once the interaction has an answer. Args: session_id: The session identifier interaction: The completed interaction to add Note: Only adds the interaction if it has an answer """ if not interaction.specialist_results: return # Skip incomplete interactions try: cached_session = self.get_cached_session(session_id) # Add new interaction to cache cached_session.interactions.append( CachedInteraction( specialist_arguments=interaction.specialist_arguments, specialist_results=interaction.specialist_results, ) ) # Update cache directly with modified session using region's set() key = self.generate_key(session_id=session_id) self.region.set(key, self._to_cache_data(cached_session)) except ValueError: # If session not in cache yet, load it fresh from DB self.get_cached_session(session_id) def _to_cache_data(self, instance: CachedSession) -> Dict[str, Any]: """Convert CachedSession to cache data""" cached_data = { 'id': instance.id, 'session_id': instance.session_id, 'timezone': instance.timezone, 'interactions': [ { 'specialist_arguments': interaction.specialist_arguments, 'specialist_results': interaction.specialist_results, } for interaction in instance.interactions ], 'last_updated': dt.now(tz=tz.utc).isoformat() } return cached_data def _from_cache_data(self, data: Dict[str, Any], session_id: str, **kwargs) -> CachedSession: """Create CachedSession from cache data""" interactions = [ CachedInteraction( specialist_arguments=int_data['specialist_arguments'], specialist_results=int_data['specialist_results'] ) for int_data in data['interactions'] ] return CachedSession( id=data['id'], session_id=data['session_id'], interactions=interactions, timezone=data['timezone'] ) def _should_cache(self, value: Dict[str, Any]) -> bool: """Validate cache data""" required_fields = {'id', 'session_id', 'timezone', 'interactions'} return all(field in value for field in required_fields) def register_chat_session_cache_handlers(cache_manager): cache_manager.register_handler(ChatSessionCacheHandler, 'eveai_chat_workers') # Helper function similar to get_model_variables def get_chat_history(session_id: str) -> CachedSession: """ Get cached chat history for a session, loading from database if needed Args: session_id: Session ID to look up Returns: CachedSession with interaction history Raises: ValueError: If session doesn't exist """ return cache_manager.chat_session_cache.get_cached_session(session_id)