- Improvements to EntitlementsDomain & Services - Prechecks in Document domain - Add audit information to LicenseUsage
54 lines
1.9 KiB
Python
54 lines
1.9 KiB
Python
from typing import Dict, Any
|
|
from datetime import datetime, timedelta
|
|
|
|
from flask import Flask
|
|
|
|
from eveai_client.platform.config.config import Config
|
|
|
|
|
|
class ConfigManager:
|
|
"""Manages configuration and caching of specialist configurations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the configuration manager."""
|
|
self.specialist_configs = {}
|
|
self.config_cache_expiry = Config.CONFIG_CACHE_EXPIRY
|
|
|
|
def init_app(self, app: Flask) -> None:
|
|
self.config_cache_expiry = app.config["CONFIG_CACHE_EXPIRY"]
|
|
|
|
def get_specialist_config(self, specialist_id: int) -> Dict[str, Any]:
|
|
"""Get specialist configuration, either from cache or from API."""
|
|
cached_config = self._get_cached_config(specialist_id)
|
|
if cached_config:
|
|
return cached_config
|
|
return {} # Empty config, will be fetched by API client
|
|
|
|
def save_specialist_config(self, specialist_id: int, config: Dict[str, Any]) -> None:
|
|
"""Save specialist configuration to cache."""
|
|
self.specialist_configs[specialist_id] = {
|
|
'config': config,
|
|
'updated_at': datetime.utcnow()
|
|
}
|
|
|
|
def _get_cached_config(self, specialist_id: int) -> Dict[str, Any]:
|
|
"""Get cached specialist configuration if valid."""
|
|
if specialist_id not in self.specialist_configs:
|
|
return {}
|
|
|
|
cached_entry = self.specialist_configs[specialist_id]
|
|
updated_at = cached_entry['updated_at']
|
|
|
|
if datetime.utcnow() - updated_at > timedelta(seconds=self.config_cache_expiry):
|
|
return {} # Cache expired
|
|
|
|
return cached_entry['config']
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear all cached configurations."""
|
|
self.specialist_configs = {}
|
|
|
|
def get_available_specialists(self) -> list:
|
|
"""Get list of available specialists from config."""
|
|
return Config.SPECIALISTS
|