Files
eveAI/eveai_client/platform/config/config_manager.py
Josako 5c982fcc2c - Added EveAI Client to project
- Improvements to EntitlementsDomain & Services
- Prechecks in Document domain
- Add audit information to LicenseUsage
2025-05-17 15:56:14 +02:00

54 lines
1.9 KiB
Python

from typing import Dict, Any
from datetime import datetime, timedelta
from flask import Flask
from eveai_client.platform.config.config import Config
class ConfigManager:
"""Manages configuration and caching of specialist configurations."""
def __init__(self):
"""Initialize the configuration manager."""
self.specialist_configs = {}
self.config_cache_expiry = Config.CONFIG_CACHE_EXPIRY
def init_app(self, app: Flask) -> None:
self.config_cache_expiry = app.config["CONFIG_CACHE_EXPIRY"]
def get_specialist_config(self, specialist_id: int) -> Dict[str, Any]:
"""Get specialist configuration, either from cache or from API."""
cached_config = self._get_cached_config(specialist_id)
if cached_config:
return cached_config
return {} # Empty config, will be fetched by API client
def save_specialist_config(self, specialist_id: int, config: Dict[str, Any]) -> None:
"""Save specialist configuration to cache."""
self.specialist_configs[specialist_id] = {
'config': config,
'updated_at': datetime.utcnow()
}
def _get_cached_config(self, specialist_id: int) -> Dict[str, Any]:
"""Get cached specialist configuration if valid."""
if specialist_id not in self.specialist_configs:
return {}
cached_entry = self.specialist_configs[specialist_id]
updated_at = cached_entry['updated_at']
if datetime.utcnow() - updated_at > timedelta(seconds=self.config_cache_expiry):
return {} # Cache expired
return cached_entry['config']
def clear_cache(self) -> None:
"""Clear all cached configurations."""
self.specialist_configs = {}
def get_available_specialists(self) -> list:
"""Get list of available specialists from config."""
return Config.SPECIALISTS