from pydantic import BaseModel, Field from typing import Any, Dict, List, Optional, Type, Union def flatten_pydantic_model(model: BaseModel, merge_strategy: Dict[str, str] = {}) -> Dict[str, Any]: """ Flattens a nested Pydantic model by bringing all attributes to the highest level. :param model: Pydantic model instance to be flattened. :param merge_strategy: Dictionary defining how to handle duplicate attributes. :return: Flattened dictionary representation of the model. """ flat_dict = {} def recursive_flatten(obj: BaseModel, parent_key=""): for field_name, value in obj.model_dump(exclude_unset=True, by_alias=True).items(): new_key = field_name # Maintain original field names if isinstance(value, BaseModel): # Recursively flatten nested models recursive_flatten(value, new_key) elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value): # If it's a list of Pydantic models, flatten each element for item in value: recursive_flatten(item, new_key) else: if new_key in flat_dict and new_key in merge_strategy: # Apply merge strategy if merge_strategy[new_key] == "add": if isinstance(flat_dict[new_key], list) and isinstance(value, list): flat_dict[new_key] += value # Concatenate lists elif isinstance(flat_dict[new_key], (int, float)) and isinstance(value, (int, float)): flat_dict[new_key] += value # Sum numbers elif isinstance(flat_dict[new_key], str) and isinstance(value, str): flat_dict[new_key] += "\n" + value # Concatenate strings elif merge_strategy[new_key] == "first": pass # Keep the first occurrence elif merge_strategy[new_key] == "last": flat_dict[new_key] = value else: flat_dict[new_key] = value recursive_flatten(model) return flat_dict def merge_dicts(base_dict: Dict[str, Any], new_data: Union[Dict[str, Any], BaseModel], merge_strategy: Dict[str, str]) \ -> Dict[str, Any]: """ Merges a Pydantic model (or dictionary) into an existing dictionary based on a merge strategy. :param base_dict: The base dictionary to merge into. :param new_data: The new Pydantic model or dictionary to merge. :param merge_strategy: Dict defining how to merge duplicate attributes. :return: Updated dictionary after merging. """ if isinstance(new_data, BaseModel): new_data = flatten_pydantic_model(new_data) # Convert Pydantic model to dict for key, value in new_data.items(): if key in base_dict and key in merge_strategy: strategy = merge_strategy[key] if strategy == "add": if isinstance(base_dict[key], list) and isinstance(value, list): base_dict[key] += value # Concatenate lists elif isinstance(base_dict[key], (int, float)) and isinstance(value, (int, float)): base_dict[key] += value # Sum numbers elif isinstance(base_dict[key], str) and isinstance(value, str): base_dict[key] += " " + value # Concatenate strings elif strategy == "first": pass # Keep the first occurrence (do nothing) elif strategy == "last": base_dict[key] = value # Always overwrite with latest value else: base_dict[key] = value # Add new field return base_dict