- Finish editing of Specialists with overview, agent - task - tool editor

- Split differrent caching mechanisms (types, version tree, config) into different cachers
- Improve resource usage on starting components, and correct gevent usage
- Refine repopack usage for eveai_app (too large)
- Change nginx dockerfile to allow for specialist overviews being served statically
This commit is contained in:
Josako
2025-01-23 09:43:48 +01:00
parent 7bddeb0ebd
commit d106520d22
39 changed files with 1312 additions and 281 deletions

View File

@@ -73,6 +73,7 @@ class EveAITask(db.Model):
description = db.Column(db.Text, nullable=True)
type = db.Column(db.String(50), nullable=False, default="STANDARD_RAG")
type_version = db.Column(db.String(20), nullable=True, default="1.0.0")
task_description = db.Column(db.Text, nullable=True)
expected_output = db.Column(db.Text, nullable=True)
tuning = db.Column(db.Boolean, nullable=True, default=False)
configuration = db.Column(JSONB, nullable=True)

View File

@@ -1,7 +1,8 @@
from typing import Any, Dict, List, Optional, TypeVar, Generic, Type
from dataclasses import dataclass
from flask import Flask
from flask import Flask, current_app
from dogpile.cache import CacheRegion
from abc import ABC, abstractmethod
T = TypeVar('T') # Generic type parameter for cached data
@@ -47,6 +48,46 @@ class CacheHandler(Generic[T]):
self.prefix = prefix
self._key_components = [] # List of required key components
@abstractmethod
def _to_cache_data(self, instance: T) -> Any:
"""
Convert the data to a cacheable format for internal use.
Args:
instance: The data to be cached.
Returns:
A serializable format of the instance.
"""
pass
@abstractmethod
def _from_cache_data(self, data: Any, **kwargs) -> T:
"""
Convert cached data back to usable format for internal use.
Args:
data: The cached data.
**kwargs: Additional context.
Returns:
The data in its usable format.
"""
pass
@abstractmethod
def _should_cache(self, value: T) -> bool:
"""
Validate if the value should be cached for internal use.
Args:
value: The value to be cached.
Returns:
True if the value should be cached, False otherwise.
"""
pass
def configure_keys(self, *components: str):
"""
Configure required components for cache key generation.
@@ -77,8 +118,13 @@ class CacheHandler(Generic[T]):
if missing:
raise ValueError(f"Missing key components: {missing}")
region_name = getattr(self.region, 'name', 'default_region')
current_app.logger.debug(f"Generating cache key in region {region_name} with prefix {self.prefix} "
f"for {self._key_components}")
key = CacheKey({k: identifiers[k] for k in self._key_components})
return f"{self.prefix}:{str(key)}"
return f"{region_name}_{self.prefix}:{str(key)}"
def get(self, creator_func, **identifiers) -> T:
"""
@@ -92,18 +138,19 @@ class CacheHandler(Generic[T]):
Cached or newly created value
"""
cache_key = self.generate_key(**identifiers)
current_app.logger.debug(f"Cache key: {cache_key}")
def creator():
instance = creator_func(**identifiers)
return self.to_cache_data(instance)
return self._to_cache_data(instance)
cached_data = self.region.get_or_create(
cache_key,
creator,
should_cache_fn=self.should_cache
should_cache_fn=self._should_cache
)
return self.from_cache_data(cached_data, **identifiers)
return self._from_cache_data(cached_data, **identifiers)
def invalidate(self, **identifiers):
"""
@@ -128,3 +175,21 @@ class CacheHandler(Generic[T]):
except ValueError:
pass # Skip if cache key can't be generated from provided identifiers
def invalidate_region(self):
"""
Invalidate all cache entries within this region.
Deletes all keys that start with the region prefix.
"""
# Construct the pattern for all keys in this region
pattern = f"{self.region}_{self.prefix}:*"
# Assuming Redis backend with dogpile, use `delete_multi` or direct Redis access
if hasattr(self.region.backend, 'client'):
redis_client = self.region.backend.client
keys_to_delete = redis_client.keys(pattern)
if keys_to_delete:
redis_client.delete(*keys_to_delete)
else:
# Fallback for other backends
raise NotImplementedError("Region invalidation is only supported for Redis backend.")

View File

@@ -5,11 +5,15 @@ from packaging import version
import os
from flask import current_app
from common.utils.cache.base import CacheHandler
from common.utils.cache.base import CacheHandler, CacheKey
from config.type_defs import agent_types, task_types, tool_types, specialist_types
def is_major_minor(version: str) -> bool:
parts = version.strip('.').split('.')
return len(parts) == 2 and all(part.isdigit() for part in parts)
class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
"""Base handler for configuration caching"""
@@ -23,18 +27,111 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
self.config_type = config_type
self._types_module = None # Set by subclasses
self._config_dir = None # Set by subclasses
self.version_tree_cache = None
self.configure_keys('type_name', 'version')
def configure_keys_for_operation(self, operation: str):
"""Configure required keys based on operation"""
match operation:
case 'get_types':
self.configure_keys('type_name') # Only require type_name for type definitions
case 'get_versions':
self.configure_keys('type_name') # Only type_name needed for version tree
case 'get_config':
self.configure_keys('type_name', 'version') # Both needed for specific config
case _:
raise ValueError(f"Unknown operation: {operation}")
def _to_cache_data(self, instance: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the data to a cacheable format"""
# For configuration data, we can just return the dictionary as is
# since it's already in a serializable format
return instance
def _from_cache_data(self, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Convert cached data back to usable format"""
# Similarly, we can return the data directly since it's already
# in the format we need
return data
def _should_cache(self, value: Dict[str, Any]) -> bool:
"""
Validate if the value should be cached
Args:
value: The value to be cached
Returns:
bool: True if the value should be cached
"""
return isinstance(value, dict) # Cache all dictionaries
def set_version_tree_cache(self, cache):
"""Set the version tree cache dependency."""
self.version_tree_cache = cache
def _load_specific_config(self, type_name: str, version_str: str) -> Dict[str, Any]:
"""
Load a specific configuration version
Args:
type_name: Type name
version_str: Version string
Returns:
Configuration data
"""
current_app.logger.debug(f"Loading specific configuration for {type_name}, version: {version_str} - no cache")
version_tree = self.version_tree_cache.get_versions(type_name)
versions = version_tree['versions']
if version_str == 'latest':
version_str = version_tree['latest_version']
if version_str not in versions:
raise ValueError(f"Version {version_str} not found for {type_name}")
file_path = versions[version_str]['file_path']
try:
with open(file_path) as f:
config = yaml.safe_load(f)
current_app.logger.debug(f"Loaded config for {type_name}{version_str}: {config}")
return config
except Exception as e:
raise ValueError(f"Error loading config from {file_path}: {e}")
def get_config(self, type_name: str, version: Optional[str] = None) -> Dict[str, Any]:
"""
Get configuration for a specific type and version
If version not specified, returns latest
Args:
type_name: Configuration type name
version: Optional specific version to retrieve
Returns:
Configuration data
"""
current_app.logger.debug(f"Trying to retrieve config for {self.config_type}, type name: {type_name}, "
f"version: {version}")
if version is None:
version_str = self.version_tree_cache.get_latest_version(type_name)
elif is_major_minor(version):
version_str = self.version_tree_cache.get_latest_patch_version(type_name, version)
else:
version_str = version
result = self.get(
lambda type_name, version: self._load_specific_config(type_name, version),
type_name=type_name,
version=version_str
)
return result
class BaseConfigVersionTreeCacheHandler(CacheHandler[Dict[str, Any]]):
"""Base handler for configuration version tree caching"""
def __init__(self, region, config_type: str):
"""
Args:
region: Cache region
config_type: Type of configuration (agents, tasks, etc.)
"""
super().__init__(region, f'config_{config_type}_version_tree')
self.config_type = config_type
self._types_module = None # Set by subclasses
self._config_dir = None # Set by subclasses
self.configure_keys('type_name')
def _load_version_tree(self, type_name: str) -> Dict[str, Any]:
"""
@@ -46,6 +143,7 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
Returns:
Dict containing available versions and their metadata
"""
current_app.logger.debug(f"Loading version tree for {type_name} - no cache")
type_path = Path(self._config_dir) / type_name
if not type_path.exists():
raise ValueError(f"No configuration found for type {type_name}")
@@ -81,25 +179,25 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
continue
current_app.logger.debug(f"Loaded versions for {type_name}: {versions}")
current_app.logger.debug(f"Loaded versions for {type_name}: {latest_version}")
current_app.logger.debug(f"Latest version for {type_name}: {latest_version}")
return {
'versions': versions,
'latest_version': latest_version
}
def to_cache_data(self, instance: Dict[str, Any]) -> Dict[str, Any]:
def _to_cache_data(self, instance: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the data to a cacheable format"""
# For configuration data, we can just return the dictionary as is
# since it's already in a serializable format
return instance
def from_cache_data(self, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
def _from_cache_data(self, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Convert cached data back to usable format"""
# Similarly, we can return the data directly since it's already
# in the format we need
return data
def should_cache(self, value: Dict[str, Any]) -> bool:
def _should_cache(self, value: Dict[str, Any]) -> bool:
"""
Validate if the value should be cached
@@ -109,65 +207,7 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
Returns:
bool: True if the value should be cached
"""
if not isinstance(value, dict):
return False
# For type definitions
if 'name' in value and 'description' in value:
return True
# For configurations
if 'versions' in value and 'latest_version' in value:
return True
return False
def _load_type_definitions(self) -> Dict[str, Dict[str, str]]:
"""Load type definitions from the corresponding type_defs module"""
if not self._types_module:
raise ValueError("_types_module must be set by subclass")
type_definitions = {
type_id: {
'name': info['name'],
'description': info['description']
}
for type_id, info in self._types_module.items()
}
current_app.logger.debug(f"Loaded type definitions: {type_definitions}")
return type_definitions
def _load_specific_config(self, type_name: str, version_str: str) -> Dict[str, Any]:
"""
Load a specific configuration version
Args:
type_name: Type name
version_str: Version string
Returns:
Configuration data
"""
version_tree = self.get_versions(type_name)
versions = version_tree['versions']
if version_str == 'latest':
version_str = version_tree['latest_version']
if version_str not in versions:
raise ValueError(f"Version {version_str} not found for {type_name}")
file_path = versions[version_str]['file_path']
try:
with open(file_path) as f:
config = yaml.safe_load(f)
current_app.logger.debug(f"Loaded config for {type_name}{version_str}: {config}")
return config
except Exception as e:
raise ValueError(f"Error loading config from {file_path}: {e}")
return isinstance(value, dict) # Cache all dictionaries
def get_versions(self, type_name: str) -> Dict[str, Any]:
"""
@@ -179,7 +219,7 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
Returns:
Dict with version information
"""
self.configure_keys_for_operation('get_versions')
current_app.logger.debug(f"Trying to get version tree for {self.config_type}, {type_name}")
return self.get(
lambda type_name: self._load_version_tree(type_name),
type_name=type_name
@@ -235,72 +275,146 @@ class BaseConfigCacheHandler(CacheHandler[Dict[str, Any]]):
latest_patch = max(matching_versions, key=version.parse)
return latest_patch
class BaseConfigTypesCacheHandler(CacheHandler[Dict[str, Any]]):
"""Base handler for configuration types caching"""
def __init__(self, region, config_type: str):
"""
Args:
region: Cache region
config_type: Type of configuration (agents, tasks, etc.)
"""
super().__init__(region, f'config_{config_type}_types')
self.config_type = config_type
self._types_module = None # Set by subclasses
self._config_dir = None # Set by subclasses
self.configure_keys()
def _to_cache_data(self, instance: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the data to a cacheable format"""
# For configuration data, we can just return the dictionary as is
# since it's already in a serializable format
return instance
def _from_cache_data(self, data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Convert cached data back to usable format"""
# Similarly, we can return the data directly since it's already
# in the format we need
return data
def _should_cache(self, value: Dict[str, Any]) -> bool:
"""
Validate if the value should be cached
Args:
value: The value to be cached
Returns:
bool: True if the value should be cached
"""
return isinstance(value, dict) # Cache all dictionaries
def _load_type_definitions(self) -> Dict[str, Dict[str, str]]:
"""Load type definitions from the corresponding type_defs module"""
current_app.logger.debug(f"Loading type definitions for {self.config_type} - no cache")
if not self._types_module:
raise ValueError("_types_module must be set by subclass")
type_definitions = {
type_id: {
'name': info['name'],
'description': info['description']
}
for type_id, info in self._types_module.items()
}
current_app.logger.debug(f"Loaded type definitions: {type_definitions}")
return type_definitions
def get_types(self) -> Dict[str, Dict[str, str]]:
"""Get dictionary of available types with name and description"""
self.configure_keys_for_operation('get_types')
result = self.get(
current_app.logger.debug(f"Trying to retrieve type definitions for {self.config_type}")
result = self.get(
lambda type_name: self._load_type_definitions(),
type_name=f'{self.config_type}_types',
)
return result
def get_config(self, type_name: str, version: Optional[str] = None) -> Dict[str, Any]:
"""
Get configuration for a specific type and version
If version not specified, returns latest
Args:
type_name: Configuration type name
version: Optional specific version to retrieve
Returns:
Configuration data
"""
self.configure_keys_for_operation('get_config')
version_str = version or 'latest'
return self.get(
lambda type_name, version: self._load_specific_config(type_name, version),
type_name=type_name,
version=version_str
)
def create_config_cache_handlers(config_type: str, config_dir: str, types_module: dict) -> tuple:
"""
Factory function to dynamically create the 3 cache handler classes for a given configuration type.
The following cache names are created:
- <config_type>_config_cache
- <config_type>_version_tree_cache
- <config_type>_types_cache
class AgentConfigCacheHandler(BaseConfigCacheHandler):
"""Handler for agent configurations"""
handler_name = 'agent_config_cache'
Args:
config_type: The configuration type (e.g., 'agents', 'tasks').
config_dir: The directory where configuration files are stored.
types_module: The types module defining the available types for this config.
def __init__(self, region):
super().__init__(region, 'agents')
self._types_module = agent_types.AGENT_TYPES
self._config_dir = os.path.join('config', 'agents')
Returns:
A tuple of dynamically created classes for config, version tree, and types handlers.
"""
class ConfigCacheHandler(BaseConfigCacheHandler):
handler_name = f"{config_type}_config_cache"
def __init__(self, region):
super().__init__(region, config_type)
self._types_module = types_module
self._config_dir = config_dir
class VersionTreeCacheHandler(BaseConfigVersionTreeCacheHandler):
handler_name = f"{config_type}_version_tree_cache"
def __init__(self, region):
super().__init__(region, config_type)
self._types_module = types_module
self._config_dir = config_dir
class TypesCacheHandler(BaseConfigTypesCacheHandler):
handler_name = f"{config_type}_types_cache"
def __init__(self, region):
super().__init__(region, config_type)
self._types_module = types_module
self._config_dir = config_dir
return ConfigCacheHandler, VersionTreeCacheHandler, TypesCacheHandler
class TaskConfigCacheHandler(BaseConfigCacheHandler):
"""Handler for task configurations"""
handler_name = 'task_config_cache'
def __init__(self, region):
super().__init__(region, 'tasks')
self._types_module = task_types.TASK_TYPES
self._config_dir = os.path.join('config', 'tasks')
AgentConfigCacheHandler, AgentConfigVersionTreeCacheHandler, AgentConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='agents',
config_dir='config/agents',
types_module=agent_types.AGENT_TYPES
))
class ToolConfigCacheHandler(BaseConfigCacheHandler):
"""Handler for tool configurations"""
handler_name = 'tool_config_cache'
def __init__(self, region):
super().__init__(region, 'tools')
self._types_module = tool_types.TOOL_TYPES
self._config_dir = os.path.join('config', 'tools')
TaskConfigCacheHandler, TaskConfigVersionTreeCacheHandler, TaskConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='tasks',
config_dir='config/tasks',
types_module=task_types.TASK_TYPES
))
class SpecialistConfigCacheHandler(BaseConfigCacheHandler):
"""Handler for specialist configurations"""
handler_name = 'specialist_config_cache'
ToolConfigCacheHandler, ToolConfigVersionTreeCacheHandler, ToolConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='tools',
config_dir='config/tools',
types_module=tool_types.TOOL_TYPES
))
def __init__(self, region):
super().__init__(region, 'specialists')
self._types_module = specialist_types.SPECIALIST_TYPES
self._config_dir = os.path.join('config', 'specialists')
SpecialistConfigCacheHandler, SpecialistConfigVersionTreeCacheHandler, SpecialistConfigTypesCacheHandler = (
create_config_cache_handlers(
config_type='specialists',
config_dir='config/specialists',
types_module=specialist_types.SPECIALIST_TYPES
))

View File

@@ -65,11 +65,7 @@ def create_cache_regions(app):
eveai_config_region = make_region(name='eveai_config').configure(
'dogpile.cache.redis',
arguments={
**redis_config,
'redis_expiration_time': None, # No expiration in Redis
'key_mangler': lambda key: f"startup_{startup_time}:{key}" # Prefix all keys
},
arguments=redis_config,
replace_existing_backend=True
)
regions['eveai_config'] = eveai_config_region

View File

@@ -23,7 +23,7 @@ def initialize_specialist(specialist_id: int, specialist_type: str, specialist_v
ValueError: If specialist not found or invalid configuration
SQLAlchemyError: If database operations fail
"""
config = cache_manager.specialist_config_cache.get_config(specialist_type, specialist_version)
config = cache_manager.specialists_config_cache.get_config(specialist_type, specialist_version)
if not config:
raise ValueError(f"No configuration found for {specialist_type} version {specialist_version}")
if config['framework'] == 'langchain':
@@ -99,16 +99,18 @@ def _create_agent(
timestamp: Optional[dt] = None
) -> EveAIAgent:
"""Create an agent with the given configuration."""
current_app.logger.debug(f"Creating agent {agent_type} {agent_version} with {name}, {description}")
if timestamp is None:
timestamp = dt.now(tz=tz.utc)
# Get agent configuration from cache
agent_config = cache_manager.agent_config_cache.get_config(agent_type, agent_version)
agent_config = cache_manager.agents_config_cache.get_config(agent_type, agent_version)
current_app.logger.debug(f"Agent Config: {agent_config}")
agent = EveAIAgent(
specialist_id=specialist_id,
name=name or agent_config.get('name', agent_type),
description=description or agent_config.get('description', ''),
description=description or agent_config.get('metadata').get('description', ''),
type=agent_type,
type_version=agent_version,
role=None,
@@ -122,6 +124,7 @@ def _create_agent(
set_logging_information(agent, timestamp)
db.session.add(agent)
current_app.logger.info(f"Created agent {agent.id} of type {agent_type}")
return agent
@@ -138,14 +141,16 @@ def _create_task(
timestamp = dt.now(tz=tz.utc)
# Get task configuration from cache
task_config = cache_manager.task_config_cache.get_config(task_type, task_version)
task_config = cache_manager.tasks_config_cache.get_config(task_type, task_version)
current_app.logger.debug(f"Task Config: {task_config}")
task = EveAITask(
specialist_id=specialist_id,
name=name or task_config.get('name', task_type),
description=description or task_config.get('description', ''),
description=description or task_config.get('metadata').get('description', ''),
type=task_type,
type_version=task_version,
task_description=None,
expected_output=None,
tuning=False,
configuration=None,
@@ -157,6 +162,7 @@ def _create_task(
set_logging_information(task, timestamp)
db.session.add(task)
current_app.logger.info(f"Created task {task.id} of type {task_type}")
return task
@@ -173,12 +179,13 @@ def _create_tool(
timestamp = dt.now(tz=tz.utc)
# Get tool configuration from cache
tool_config = cache_manager.tool_config_cache.get_config(tool_type, tool_version)
tool_config = cache_manager.tools_config_cache.get_config(tool_type, tool_version)
current_app.logger.debug(f"Tool Config: {tool_config}")
tool = EveAITool(
specialist_id=specialist_id,
name=name or tool_config.get('name', tool_type),
description=description or tool_config.get('description', ''),
description=description or tool_config.get('metadata').get('description', ''),
type=tool_type,
type_version=tool_version,
tuning=False,
@@ -189,4 +196,5 @@ def _create_tool(
set_logging_information(tool, timestamp)
db.session.add(tool)
current_app.logger.info(f"Created tool {tool.id} of type {tool_type}")
return tool

View File

@@ -28,10 +28,15 @@ def perform_startup_invalidation(app):
try:
# Check if invalidation was already performed
if not redis_client.get(marker_key):
app.logger.debug(f"Performing cache invalidation at startup time {startup_time}")
app.logger.debug(f"Current cache keys: {redis_client.keys('*')}")
# Perform invalidation
cache_manager.invalidate_region('eveai_config')
# Set marker with 1 hour expiry (longer than any reasonable startup sequence)
redis_client.setex(marker_key, 300, str(startup_time))
app.logger.debug(f"Cache keys after invalidation: {redis_client.keys('*')}")
redis_client.setex(marker_key, 180, str(startup_time))
app.logger.info("Startup cache invalidation completed")
else:
app.logger.info("Startup cache invalidation already performed")