Files
eveAI/common/utils/pydantic_utils.py
Josako 25213f2004 - Implementation of specialist execution api, including SSE protocol
- eveai_chat becomes deprecated and should be replaced with SSE
- Adaptation of STANDARD_RAG specialist
- Base class definition allowing to realise specialists with crewai framework
- Implementation of SPIN_SPECIALIST
- Implementation of test app for testing specialists (test_specialist_client). Also serves as an example for future SSE-based client
- Improvements to startup scripts to better handle and scale multiple connections
- Small improvements to the interaction forms and views
- Caching implementation improved and augmented with additional caches
2025-02-20 05:50:16 +01:00

79 lines
3.7 KiB
Python

from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional, Type, Union
def flatten_pydantic_model(model: BaseModel, merge_strategy: Dict[str, str] = {}) -> Dict[str, Any]:
"""
Flattens a nested Pydantic model by bringing all attributes to the highest level.
:param model: Pydantic model instance to be flattened.
:param merge_strategy: Dictionary defining how to handle duplicate attributes.
:return: Flattened dictionary representation of the model.
"""
flat_dict = {}
def recursive_flatten(obj: BaseModel, parent_key=""):
for field_name, value in obj.model_dump(exclude_unset=True, by_alias=True).items():
new_key = field_name # Maintain original field names
if isinstance(value, BaseModel):
# Recursively flatten nested models
recursive_flatten(value, new_key)
elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value):
# If it's a list of Pydantic models, flatten each element
for item in value:
recursive_flatten(item, new_key)
else:
if new_key in flat_dict and new_key in merge_strategy:
# Apply merge strategy
if merge_strategy[new_key] == "add":
if isinstance(flat_dict[new_key], list) and isinstance(value, list):
flat_dict[new_key] += value # Concatenate lists
elif isinstance(flat_dict[new_key], (int, float)) and isinstance(value, (int, float)):
flat_dict[new_key] += value # Sum numbers
elif isinstance(flat_dict[new_key], str) and isinstance(value, str):
flat_dict[new_key] += "\n" + value # Concatenate strings
elif merge_strategy[new_key] == "first":
pass # Keep the first occurrence
elif merge_strategy[new_key] == "last":
flat_dict[new_key] = value
else:
flat_dict[new_key] = value
recursive_flatten(model)
return flat_dict
def merge_dicts(base_dict: Dict[str, Any], new_data: Union[Dict[str, Any], BaseModel], merge_strategy: Dict[str, str]) \
-> Dict[str, Any]:
"""
Merges a Pydantic model (or dictionary) into an existing dictionary based on a merge strategy.
:param base_dict: The base dictionary to merge into.
:param new_data: The new Pydantic model or dictionary to merge.
:param merge_strategy: Dict defining how to merge duplicate attributes.
:return: Updated dictionary after merging.
"""
if isinstance(new_data, BaseModel):
new_data = flatten_pydantic_model(new_data) # Convert Pydantic model to dict
for key, value in new_data.items():
if key in base_dict and key in merge_strategy:
strategy = merge_strategy[key]
if strategy == "add":
if isinstance(base_dict[key], list) and isinstance(value, list):
base_dict[key] += value # Concatenate lists
elif isinstance(base_dict[key], (int, float)) and isinstance(value, (int, float)):
base_dict[key] += value # Sum numbers
elif isinstance(base_dict[key], str) and isinstance(value, str):
base_dict[key] += " " + value # Concatenate strings
elif strategy == "first":
pass # Keep the first occurrence (do nothing)
elif strategy == "last":
base_dict[key] = value # Always overwrite with latest value
else:
base_dict[key] = value # Add new field
return base_dict