Files
eveAI/common/utils/cache/config_cache.py

539 lines
22 KiB
Python

from typing import Dict, Any, Optional
from pathlib import Path
import yaml
from packaging import version
import os
from flask import current_app
from common.utils.cache.base import CacheHandler, CacheKey
from config.type_defs import agent_types, task_types, tool_types, specialist_types, retriever_types, prompt_types, \
catalog_types, partner_service_types, processor_types, customisation_types, specialist_form_types
def is_major_minor(version: str) -> bool:
parts = version.strip('.').split('.')
return len(parts) == 2 and all(part.isdigit() for part in parts)
class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
"""Base handler for configuration caching"""
def __init__(self, region, config_type: str):
"""
Args:
region: Cache region
config_type: Type of configuration (agents, tasks, etc.)
"""
super().__init__(region, f'config_{config_type}')
self.config_type = config_type
self._types_module = None # Set by subclasses
self._config_dir = None # Set by subclasses
self.version_tree_cache = None
self.configure_keys('type_name', 'version')
def _to_cache_data(self, instance: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the data to a cacheable format"""
# For configuration data, we can just return the dictionary as is
# since it's already in a serializable format
return instance
def _from_cache_data(self, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Convert cached data back to usable format"""
# Similarly, we can return the data directly since it's already
# in the format we need
return data
def _should_cache(self, value: Dict[str, Any]) -> bool:
"""
Validate if the value should be cached
Args:
value: The value to be cached
Returns:
bool: True if the value should be cached
"""
return isinstance(value, dict) # Cache all dictionaries
def set_version_tree_cache(self, cache):
"""Set the version tree cache dependency."""
self.version_tree_cache = cache
def _load_specific_config(self, type_name: str, version_str: str = 'latest') -> Dict[str, Any]:
"""
Load a specific configuration version
Automatically handles global vs partner-specific configs
"""
version_tree = self.version_tree_cache.get_versions(type_name)
versions = version_tree['versions']
if version_str == 'latest':
version_str = version_tree['latest_version']
if version_str not in versions:
raise ValueError(f"Version {version_str} not found for {type_name}")
version_info = versions[version_str]
file_path = version_info['file_path']
partner = version_info.get('partner')
try:
with open(file_path) as f:
config = yaml.safe_load(f)
# Add partner information to the config
if partner:
config['partner'] = partner
return config
except Exception as e:
raise ValueError(f"Error loading config from {file_path}: {e}")
def get_config(self, type_name: str, version: Optional[str] = None) -> Dict[str, Any]:
"""
Get configuration for a specific type and version
If version not specified, returns latest
Args:
type_name: Configuration type name
version: Optional specific version to retrieve
Returns:
Configuration data
"""
if version is None:
version_str = self.version_tree_cache.get_latest_version(type_name)
elif is_major_minor(version):
version_str = self.version_tree_cache.get_latest_patch_version(type_name, version)
else:
version_str = version
result = self.get(
lambda type_name, version: self._load_specific_config(type_name, version),
type_name=type_name,
version=version_str
)
return result
class BaseConfigVersionTreeCacheHandler(CacheHandler[Dict[str, Any]]):
"""Base handler for configuration version tree caching"""
def __init__(self, region, config_type: str):
"""
Args:
region: Cache region
config_type: Type of configuration (agents, tasks, etc.)
"""
super().__init__(region, f'config_{config_type}_version_tree')
self.config_type = config_type
self._types_module = None # Set by subclasses
self._config_dir = None # Set by subclasses
self.configure_keys('type_name')
def _load_version_tree(self, type_name: str) -> Dict[str, Any]:
"""
Load version tree for a specific type without loading full configurations
Checks both global and partner-specific directories
"""
# First check the global path
global_path = Path(self._config_dir) / "globals" / type_name
# If global path doesn't exist, check if the type exists directly in the root
# (for backward compatibility)
if not global_path.exists():
global_path = Path(self._config_dir) / type_name
if not global_path.exists():
# Check if it exists in any partner subdirectories
partner_dirs = [d for d in Path(self._config_dir).iterdir()
if d.is_dir() and d.name != "globals"]
for partner_dir in partner_dirs:
partner_type_path = partner_dir / type_name
if partner_type_path.exists():
# Found in partner directory
return self._load_versions_from_path(partner_type_path)
# If we get here, the type wasn't found anywhere
raise ValueError(f"No configuration found for type {type_name}")
return self._load_versions_from_path(global_path)
def _load_versions_from_path(self, path: Path) -> Dict[str, Any]:
"""Load all versions from a specific path"""
version_files = list(path.glob('*.yaml'))
if not version_files:
raise ValueError(f"No versions found in {path}")
versions = {}
latest_version = None
latest_version_obj = None
for file_path in version_files:
ver = file_path.stem # Get version from filename
try:
ver_obj = version.parse(ver)
# Only load minimal metadata for version tree
with open(file_path) as f:
yaml_data = yaml.safe_load(f)
metadata = yaml_data.get('metadata', {})
# Add partner information if available
partner = None
if "globals" not in str(file_path):
# Extract partner name from path
# Path format: config_dir/partner_name/type_name/version.yaml
partner = file_path.parent.parent.name
versions[ver] = {
'metadata': metadata,
'file_path': str(file_path),
'partner': partner
}
# Track latest version
if latest_version_obj is None or ver_obj > latest_version_obj:
latest_version = ver
latest_version_obj = ver_obj
except Exception as e:
current_app.logger.error(f"Error loading version {ver}: {e}")
continue
return {
'versions': versions,
'latest_version': latest_version
}
def _to_cache_data(self, instance: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the data to a cacheable format"""
# For configuration data, we can just return the dictionary as is
# since it's already in a serializable format
return instance
def _from_cache_data(self, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Convert cached data back to usable format"""
# Similarly, we can return the data directly since it's already
# in the format we need
return data
def _should_cache(self, value: Dict[str, Any]) -> bool:
"""
Validate if the value should be cached
Args:
value: The value to be cached
Returns:
bool: True if the value should be cached
"""
return isinstance(value, dict) # Cache all dictionaries
def get_versions(self, type_name: str) -> Dict[str, Any]:
"""
Get version tree for a type
Args:
type_name: Type to get versions for
Returns:
Dict with version information
"""
return self.get(
lambda type_name: self._load_version_tree(type_name),
type_name=type_name
)
def get_latest_version(self, type_name: str) -> str:
"""
Get the latest version for a given type name.
Args:
type_name: Name of the configuration type
Returns:
Latest version string
Raises:
ValueError: If type not found or no versions available
"""
version_tree = self.get_versions(type_name)
if not version_tree or 'latest_version' not in version_tree:
raise ValueError(f"No versions found for {type_name}")
return version_tree['latest_version']
def get_latest_patch_version(self, type_name: str, major_minor: str) -> str:
"""
Get the latest patch version for a given major.minor version.
Args:
type_name: Name of the configuration type
major_minor: Major.minor version (e.g. "1.0")
Returns:
Latest patch version string (e.g. "1.0.3")
Raises:
ValueError: If type not found or no matching versions
"""
version_tree = self.get_versions(type_name)
if not version_tree or 'versions' not in version_tree:
raise ValueError(f"No versions found for {type_name}")
# Filter versions that match the major.minor prefix
matching_versions = [
ver for ver in version_tree['versions'].keys()
if ver.startswith(major_minor + '.')
]
if not matching_versions:
raise ValueError(f"No versions found for {type_name} with prefix {major_minor}")
# Return highest matching version
latest_patch = max(matching_versions, key=version.parse)
return latest_patch
class BaseConfigTypesCacheHandler(CacheHandler[Dict[str, Any]]):
"""Base handler for configuration types caching"""
def __init__(self, region, config_type: str):
"""
Args:
region: Cache region
config_type: Type of configuration (agents, tasks, etc.)
"""
super().__init__(region, f'config_{config_type}_types')
self.config_type = config_type
self._types_module = None # Set by subclasses
self._config_dir = None # Set by subclasses
self.configure_keys()
def _to_cache_data(self, instance: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the data to a cacheable format"""
# For configuration data, we can just return the dictionary as is
# since it's already in a serializable format
return instance
def _from_cache_data(self, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Convert cached data back to usable format"""
# Similarly, we can return the data directly since it's already
# in the format we need
return data
def _should_cache(self, value: Dict[str, Any]) -> bool:
"""
Validate if the value should be cached
Args:
value: The value to be cached
Returns:
bool: True if the value should be cached
"""
return isinstance(value, dict) # Cache all dictionaries
def _load_type_definitions(self) -> Dict[str, Dict[str, str]]:
"""Load type definitions from the corresponding type_defs module"""
if not self._types_module:
raise ValueError("_types_module must be set by subclass")
type_definitions = {
type_id: {
'name': info['name'],
'description': info['description'],
'partner': info.get('partner') # Include partner info if available
}
for type_id, info in self._types_module.items()
}
return type_definitions
def get_types(self) -> Dict[str, Dict[str, str]]:
"""Get dictionary of available types with name and description"""
result = self.get(
lambda type_name: self._load_type_definitions(),
type_name=f'{self.config_type}_types',
)
return result
def create_config_cache_handlers(config_type: str, config_dir: str, types_module: dict) -> tuple:
"""
Factory function to dynamically create the 3 cache handler classes for a given configuration type.
The following cache names are created:
- <config_type>_config_cache
- <config_type>_version_tree_cache
- <config_type>_types_cache
Args:
config_type: The configuration type (e.g., 'agents', 'tasks').
config_dir: The directory where configuration files are stored.
types_module: The types module defining the available types for this config.
Returns:
A tuple of dynamically created classes for config, version tree, and types handlers.
"""
class ConfigCacheHandler(BaseConfigCacheHandler):
handler_name = f"{config_type}_config_cache"
def __init__(self, region):
super().__init__(region, config_type)
self._types_module = types_module
self._config_dir = config_dir
class VersionTreeCacheHandler(BaseConfigVersionTreeCacheHandler):
handler_name = f"{config_type}_version_tree_cache"
def __init__(self, region):
super().__init__(region, config_type)
self._types_module = types_module
self._config_dir = config_dir
class TypesCacheHandler(BaseConfigTypesCacheHandler):
handler_name = f"{config_type}_types_cache"
def __init__(self, region):
super().__init__(region, config_type)
self._types_module = types_module
self._config_dir = config_dir
return ConfigCacheHandler, VersionTreeCacheHandler, TypesCacheHandler
AgentConfigCacheHandler, AgentConfigVersionTreeCacheHandler, AgentConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='agents',
config_dir='config/agents',
types_module=agent_types.AGENT_TYPES
))
TaskConfigCacheHandler, TaskConfigVersionTreeCacheHandler, TaskConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='tasks',
config_dir='config/tasks',
types_module=task_types.TASK_TYPES
))
ToolConfigCacheHandler, ToolConfigVersionTreeCacheHandler, ToolConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='tools',
config_dir='config/tools',
types_module=tool_types.TOOL_TYPES
))
SpecialistConfigCacheHandler, SpecialistConfigVersionTreeCacheHandler, SpecialistConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='specialists',
config_dir='config/specialists',
types_module=specialist_types.SPECIALIST_TYPES
))
RetrieverConfigCacheHandler, RetrieverConfigVersionTreeCacheHandler, RetrieverConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='retrievers',
config_dir='config/retrievers',
types_module=retriever_types.RETRIEVER_TYPES
))
PromptConfigCacheHandler, PromptConfigVersionTreeCacheHandler, PromptConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='prompts',
config_dir='config/prompts',
types_module=prompt_types.PROMPT_TYPES
))
CatalogConfigCacheHandler, CatalogConfigVersionTreeCacheHandler, CatalogConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='catalogs',
config_dir='config/catalogs',
types_module=catalog_types.CATALOG_TYPES
))
ProcessorConfigCacheHandler, ProcessorConfigVersionTreeCacheHandler, ProcessorConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='processors',
config_dir='config/processors',
types_module=processor_types.PROCESSOR_TYPES
))
PartnerServiceConfigCacheHandler, PartnerServiceConfigVersionTreeCacheHandler, PartnerServiceConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='partner_services',
config_dir='config/partner_services',
types_module=partner_service_types.PARTNER_SERVICE_TYPES
))
CustomisationConfigCacheHandler, CustomisationConfigVersionTreeCacheHandler, CustomisationConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='customisations',
config_dir='config/customisations',
types_module=customisation_types.CUSTOMISATION_TYPES
)
)
SpecialistFormConfigCacheHandler, SpecialistFormConfigVersionTreeCacheHandler, SpecialistFormConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='specialist_forms',
config_dir='config/specialist_forms',
types_module=specialist_form_types.SPECIALIST_FORM_TYPES
)
)
def register_config_cache_handlers(cache_manager) -> None:
cache_manager.register_handler(AgentConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(AgentConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(AgentConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(TaskConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(TaskConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(TaskConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(ToolConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(ToolConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(ToolConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(SpecialistConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(SpecialistConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(SpecialistConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(RetrieverConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(RetrieverConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(RetrieverConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(PromptConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(PromptConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(PromptConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(CatalogConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(CatalogConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(CatalogConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(ProcessorConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(ProcessorConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(ProcessorConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(AgentConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(AgentConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(AgentConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(PartnerServiceConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(PartnerServiceConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(PartnerServiceConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(CustomisationConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(CustomisationConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(CustomisationConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.register_handler(SpecialistFormConfigCacheHandler, 'eveai_config')
cache_manager.register_handler(SpecialistFormConfigTypesCacheHandler, 'eveai_config')
cache_manager.register_handler(SpecialistFormConfigVersionTreeCacheHandler, 'eveai_config')
cache_manager.agents_config_cache.set_version_tree_cache(cache_manager.agents_version_tree_cache)
cache_manager.tasks_config_cache.set_version_tree_cache(cache_manager.tasks_version_tree_cache)
cache_manager.tools_config_cache.set_version_tree_cache(cache_manager.tools_version_tree_cache)
cache_manager.specialists_config_cache.set_version_tree_cache(cache_manager.specialists_version_tree_cache)
cache_manager.retrievers_config_cache.set_version_tree_cache(cache_manager.retrievers_version_tree_cache)
cache_manager.prompts_config_cache.set_version_tree_cache(cache_manager.prompts_version_tree_cache)
cache_manager.catalogs_config_cache.set_version_tree_cache(cache_manager.catalogs_version_tree_cache)
cache_manager.processors_config_cache.set_version_tree_cache(cache_manager.processors_version_tree_cache)
cache_manager.partner_services_config_cache.set_version_tree_cache(cache_manager.partner_services_version_tree_cache)
cache_manager.customisations_config_cache.set_version_tree_cache(cache_manager.customisations_version_tree_cache)
cache_manager.specialist_forms_config_cache.set_version_tree_cache(cache_manager.specialist_forms_version_tree_cache)