Files
eveAI/eveai_app/views/dynamic_form_base.py

663 lines
29 KiB
Python

from datetime import date
from flask_wtf import FlaskForm
from wtforms import (IntegerField, FloatField, BooleanField, StringField, TextAreaField, FileField,
validators, ValidationError)
from flask import current_app, request, session
import json
from wtforms.fields.choices import SelectField
from wtforms.fields.datetime import DateField
from wtforms.fields.simple import ColorField
from common.models.user import TenantMake
from common.utils.config_field_types import TaggingFields, json_to_patterns, patterns_to_json
class TaggingFieldsField(TextAreaField):
def __init__(self, *args, **kwargs):
kwargs['render_kw'] = {
'class': 'chunking-patterns-field',
'data-handle-enter': 'true'
}
super().__init__(*args, **kwargs)
class TaggingFieldsFilterField(TextAreaField):
"""Field for tagging fields filter conditions"""
def __init__(self, *args, **kwargs):
kwargs['render_kw'] = {
'class': 'json-editor',
'data-handle-enter': 'true'
}
super().__init__(*args, **kwargs)
class DynamicArgumentsField(TextAreaField):
"""Field for dynamic arguments configuration"""
def __init__(self, *args, **kwargs):
kwargs['render_kw'] = {
'class': 'json-editor',
'data-handle-enter': 'true'
}
super().__init__(*args, **kwargs)
class ChunkingPatternsField(TextAreaField):
def __init__(self, *args, **kwargs):
kwargs['render_kw'] = {
'class': 'chunking-patterns-field',
'data-handle-enter': 'true'
}
super().__init__(*args, **kwargs)
class OrderedListField(TextAreaField):
"""Field for ordered list data that will be rendered as a Tabulator table"""
def __init__(self, *args, **kwargs):
list_type = kwargs.pop('list_type', '')
# Behoud bestaande render_kw attributen als die er zijn
if 'render_kw' in kwargs:
existing_render_kw = kwargs['render_kw']
else:
existing_render_kw = {}
# Stel nieuwe render_kw samen
new_render_kw = {
'data-list-type': list_type,
'data-handle-enter': 'true'
}
# Voeg klasse toe en behoud bestaande klassen
if 'class' in existing_render_kw:
existing_classes = existing_render_kw['class']
if isinstance(existing_classes, list):
existing_classes += ' ordered-list-field'
new_render_kw['class'] = existing_classes
else:
# String classes samenvoegen
new_render_kw['class'] = f"{existing_classes} ordered-list-field"
else:
new_render_kw['class'] = 'ordered-list-field'
# Voeg alle bestaande attributen toe aan nieuwe render_kw
for key, value in existing_render_kw.items():
if key != 'class': # Klassen hebben we al verwerkt
new_render_kw[key] = value
# Update kwargs met de nieuwe gecombineerde render_kw
kwargs['render_kw'] = new_render_kw
super().__init__(*args, **kwargs)
class DynamicFormBase(FlaskForm):
def __init__(self, formdata=None, *args, **kwargs):
# Belangrijk: formdata doorgeven aan FlaskForm zodat WTForms POST-data kan binden
super(DynamicFormBase, self).__init__(formdata=formdata, *args, **kwargs)
# Maps collection names to lists of field names
self.dynamic_fields = {}
# Store formdata for later use
self.formdata = formdata
self.raw_formdata = request.form.to_dict()
def _create_field_validators(self, field_def):
"""Create validators based on field definition"""
validators_list = []
# Required validator
if field_def.get('required', False) and field_def.get('type', None) != 'boolean':
validators_list.append(validators.DataRequired())
else:
validators_list.append(validators.Optional())
# Type-specific validators
field_type = field_def.get('type')
if field_type in ['integer', 'float']:
min_value = field_def.get('min_value')
max_value = field_def.get('max_value')
if min_value is not None or max_value is not None:
validators_list.append(
validators.NumberRange(
min=min_value if min_value is not None else -float('inf'),
max=max_value if max_value is not None else float('inf'),
message=f"Value must be between {min_value or '-∞'} and {max_value or ''}"
)
)
elif field_type in ['string', 'str']:
validators_list.append(self._validate_string_not_empty)
elif field_type == 'tagging_fields':
validators_list.append(self._validate_tagging_fields)
elif field_type == 'tagging_fields_filter':
validators_list.append(self._validate_tagging_fields_filter)
elif field_type == 'dynamic_arguments':
validators_list.append(self._validate_dynamic_arguments)
elif field_type == 'ordered_list':
validators_list.append(self._validate_ordered_list)
return validators_list
def _validate_tagging_fields(self, form, field):
"""Validate the tagging fields structure"""
if not field.data:
return
try:
# Parse JSON data
fields_data = json.loads(field.data)
# Validate using TaggingFields model
try:
TaggingFields.from_dict(fields_data)
except ValueError as e:
raise ValidationError(str(e))
except json.JSONDecodeError:
raise ValidationError("Invalid JSON format")
except Exception as e:
raise ValidationError(f"Invalid field definition: {str(e)}")
def _validate_tagging_fields_filter(self, form, field):
"""Validate the tagging fields filter structure"""
if not field.data:
return
try:
# Parse JSON data
filter_data = json.loads(field.data)
# Basic validation of filter structure
self._validate_filter_condition(filter_data)
except json.JSONDecodeError:
raise ValidationError("Invalid JSON format")
except ValidationError as e:
# Re-raise ValidationError from _validate_filter_condition
raise e
except Exception as e:
raise ValidationError(f"Invalid filter definition: {str(e)}")
def _validate_string_not_empty(self, form, field):
"""Validator om te controleren of een StringField niet leeg is"""
if not field.data or field.data.strip() == "":
raise ValidationError("This field cannot be empty")
def _validate_filter_condition(self, condition):
"""Recursively validate a filter condition structure"""
# Check if this is a logical condition (AND/OR/NOT)
if 'logical' in condition:
if condition['logical'] not in ['and', 'or', 'not']:
raise ValidationError(f"Invalid logical operator: {condition['logical']}")
if 'conditions' not in condition:
raise ValidationError("Missing 'conditions' array for logical operator")
if not isinstance(condition['conditions'], list):
raise ValidationError("'conditions' must be an array")
# Special case for NOT which should have exactly one condition
if condition['logical'] == 'not' and len(condition['conditions']) != 1:
raise ValidationError("'not' operator must have exactly one condition")
# Validate each sub-condition
for sub_condition in condition['conditions']:
self._validate_filter_condition(sub_condition)
# Check if this is a field condition
elif 'field' in condition:
if 'operator' not in condition:
raise ValidationError(f"Missing 'operator' for field condition on {condition['field']}")
if 'value' not in condition:
raise ValidationError(f"Missing 'value' for field condition on {condition['field']}")
# Validate operator types
# This is a simplified check - in a real implementation, you would validate
# against the actual field type from the catalog definition
valid_operators = {
'string': ['eq', 'neq', 'contains', 'not_contains', 'starts_with',
'ends_with', 'in', 'not_in', 'regex', 'not_regex'],
'numeric': ['eq', 'neq', 'gt', 'gte', 'lt', 'lte', 'between',
'not_between', 'in', 'not_in'],
'enum': ['eq', 'neq', 'in', 'not_in']
}
# For now, we just check that the operator is one of the known types
all_operators = set().union(*valid_operators.values())
if condition['operator'] not in all_operators:
raise ValidationError(f"Unknown operator '{condition['operator']}' for field {condition['field']}")
# Validate variable references if present
if isinstance(condition['value'], str) and condition['value'].startswith('$'):
# This is a variable reference - no further validation needed at form time
pass
# Additional validation based on operator type could be added here
else:
raise ValidationError("Filter condition must have either 'logical' or 'field' property")
def _validate_dynamic_arguments(self, form, field):
"""Validate the dynamic arguments structure"""
if not field.data:
return
try:
# Parse JSON data
args_data = json.loads(field.data)
# Validate basic structure (should be an object with argument definitions)
if not isinstance(args_data, dict):
raise ValidationError("Dynamic arguments must be an object with argument definitions")
# Validate each argument definition
for arg_name, arg_def in args_data.items():
if not isinstance(arg_def, dict):
raise ValidationError(f"Argument definition for '{arg_name}' must be an object")
# Check required properties
if 'type' not in arg_def:
raise ValidationError(f"Argument '{arg_name}' missing required 'type' property")
# Validate type
if arg_def['type'] not in ['string', 'integer', 'float', 'boolean', 'date', 'enum', 'object', 'array']:
raise ValidationError(f"Argument '{arg_name}' has invalid type: {arg_def['type']}")
# Validate enum fields have allowed_values
if arg_def['type'] == 'enum' and 'allowed_values' not in arg_def:
raise ValidationError(f"Enum argument '{arg_name}' missing required 'allowed_values' list")
except json.JSONDecodeError:
raise ValidationError("Invalid JSON format")
except Exception as e:
raise ValidationError(f"Invalid argument definition: {str(e)}")
def _validate_ordered_list(self, form, field):
"""Validate the ordered list structure"""
if not field.data:
return
try:
# Parse JSON data
list_data = json.loads(field.data)
# Validate it's a list
if not isinstance(list_data, list):
raise ValidationError("Ordered list must be a list")
# Validate each item in the list is a dictionary
for i, item in enumerate(list_data):
if not isinstance(item, dict):
raise ValidationError(f"Item {i} in ordered list must be an object")
except json.JSONDecodeError:
raise ValidationError("Invalid JSON format")
except Exception as e:
raise ValidationError(f"Invalid ordered list: {str(e)}")
def _get_system_field(self, system_name):
"""Get the field class and kwargs for a system field. Add system field cases as you need them."""
field_class = None
extra_classes = ''
field_kwargs = {}
match system_name:
case 'tenant_make':
field_class = SelectField
tenant_id = session.get('tenant').get('id')
makes = TenantMake.query.filter_by(tenant_id=tenant_id).all()
choices = [(make.id, make.name) for make in makes]
extra_classes = ''
field_kwargs = {'choices': choices}
return field_class, extra_classes, field_kwargs
def add_dynamic_fields(self, collection_name, config, initial_data=None):
"""Add dynamic fields to the form based on the configuration.
Args:
collection_name: The name of the collection of fields to add
config: The full configuration object, which should contain the field definitions
for the collection_name and may also contain list_type definitions
initial_data: Optional initial data for the fields
"""
if isinstance(initial_data, str):
try:
initial_data = json.loads(initial_data)
except (json.JSONDecodeError, TypeError):
current_app.logger.error(f"Invalid JSON in initial_data: {initial_data}")
initial_data = {}
elif initial_data is None:
initial_data = {}
# Store the full configuration for later use in get_list_type_configs_js
if not hasattr(self, '_full_configs'):
self._full_configs = {}
self._full_configs[collection_name] = config
# Get the specific field configuration for this collection
field_config = config.get(collection_name, {})
if not field_config:
# Handle the case where config is already the specific field configuration
return
self.dynamic_fields[collection_name] = []
for field_name, field_def in field_config.items():
# Prefix the field name with the collection name
full_field_name = f"{collection_name}_{field_name}"
label = field_def.get('name', field_name)
field_type = field_def.get('type')
description = field_def.get('description', '')
default = field_def.get('default')
# Determine standard validators
field_validators = self._create_field_validators(field_def)
# Handle special case for field types
if field_type == 'tagging_fields':
field_class = TaggingFieldsField
extra_classes = 'json-editor'
field_kwargs = {}
elif field_type == 'tagging_fields_filter':
field_class = TaggingFieldsFilterField
extra_classes = 'json-editor'
field_kwargs = {}
elif field_type == 'dynamic_arguments':
field_class = DynamicArgumentsField
extra_classes = 'json-editor'
field_kwargs = {}
elif field_type == 'enum':
field_class = SelectField
allowed_values = field_def.get('allowed_values', [])
choices = [(str(val), str(val)) for val in allowed_values]
extra_classes = ''
field_kwargs = {'choices': choices}
elif field_type == 'chunking_patterns':
field_class = ChunkingPatternsField
extra_classes = ['monospace-text', 'pattern-input']
field_kwargs = {}
elif field_type == 'ordered_list':
field_class = OrderedListField
extra_classes = ''
list_type = field_def.get('list_type', '')
field_kwargs = {'list_type': list_type}
elif field_type == 'system':
field_class, extra_classes, field_kwargs = self._get_system_field(field_def.get('system_name', ''))
else:
extra_classes = ''
field_class = {
'integer': IntegerField,
'float': FloatField,
'boolean': BooleanField,
'string': StringField,
'str': StringField,
'text': TextAreaField,
'date': DateField,
'file': FileField,
'color': ColorField,
}.get(field_type, StringField)
field_kwargs = {}
# Prepare field data
field_data = None
if initial_data and field_name in initial_data:
current_app.logger.debug(f"Using initial data for field '{field_name}': {initial_data[field_name]}")
field_data = initial_data[field_name]
if field_type in ['tagging_fields', 'tagging_fields_filter', 'dynamic_arguments'] and isinstance(
field_data, dict):
try:
field_data = json.dumps(field_data, indent=2)
except (TypeError, ValueError) as e:
current_app.logger.error(f"Error converting initial data to JSON: {e}")
field_data = "{}"
elif field_type == 'ordered_list' and isinstance(field_data, list):
try:
field_data = json.dumps(field_data)
except (TypeError, ValueError) as e:
current_app.logger.error(f"Error converting ordered list data to JSON: {e}")
field_data = "[]"
elif field_type == 'chunking_patterns':
try:
field_data = json_to_patterns(field_data)
except (TypeError, ValueError) as e:
current_app.logger.error(f"Error converting initial data to a list of patterns: {e}")
field_data = {}
elif field_type == 'date' and isinstance(field_data, str):
try:
field_data = date.fromisoformat(field_data)
except ValueError:
current_app.logger.error(f"Error converting ISO date string '{field_data}' to date object")
field_data = None
elif default is not None:
field_data = default
# Create render_kw with classes and any other HTML attributes
render_kw = {'class': extra_classes} if extra_classes else {}
if description:
render_kw['title'] = description # For tooltip
render_kw['data-bs-toggle'] = 'tooltip'
render_kw['data-bs-placement'] = 'right'
# Add special styling for color fields to make them more compact and visible
if field_type == 'color':
render_kw['style'] = 'width: 100px; height: 40px;'
if 'class' in render_kw:
render_kw['class'] = f"{render_kw['class']} color-field"
else:
render_kw['class'] = 'color-field'
# Create the field
field_kwargs.update({
'label': label,
'description': description,
'validators': field_validators,
'default': field_data,
'render_kw': render_kw
})
unbound_field = field_class(**field_kwargs)
# Bind the field to the form
bound_field = unbound_field.bind(form=self, name=full_field_name)
# Process the field with formdata
if self.formdata and full_field_name in self.formdata:
bound_field.process(self.formdata)
else:
bound_field.process(formdata=None, data=field_data) # Use prepared field_data
# Add the field to the form
setattr(self, full_field_name, bound_field)
self._fields[full_field_name] = bound_field
self.dynamic_fields[collection_name].append(full_field_name)
def get_static_fields(self):
"""Return a list of static field instances."""
# Get names of dynamic fields
dynamic_field_names = set()
for field_list in self.dynamic_fields.values():
dynamic_field_names.update(field_list)
# Return all fields that are not dynamic
return [field for name, field in self._fields.items() if name not in dynamic_field_names]
def get_list_type_configs_js(self):
"""Generate JavaScript code for list type configurations used by ordered_list fields."""
from common.extensions import cache_manager
list_types = {}
# First check if we have any full configurations stored
if hasattr(self, '_full_configs'):
# Look for list types in the stored full configurations
for config_name, config in self._full_configs.items():
for key, value in config.items():
# Check if this is a list type definition (not a field definition)
if isinstance(value, dict) and all(isinstance(v, dict) for v in value.values()):
# This looks like a list type definition
list_types[key] = value
# Collect all list types used in ordered_list fields
for collection_name, field_names in self.dynamic_fields.items():
for full_field_name in field_names:
field = getattr(self, full_field_name)
if isinstance(field, OrderedListField):
list_type = field.render_kw.get('data-list-type')
if list_type and list_type not in list_types:
# First try to get from current_app.config
list_type_config = current_app.config.get('LIST_TYPES', {}).get(list_type)
if list_type_config:
list_types[list_type] = list_type_config
else:
# Try to find the list type in specialist configurations using the cache
try:
# Get all specialist types
specialist_types = cache_manager.specialists_types_cache.get_types()
# For each specialist type, check if it has the list type we're looking for
for specialist_type in specialist_types:
try:
# Get the latest version for this specialist type
latest_version = cache_manager.specialists_version_tree_cache.get_latest_version(specialist_type)
# Get the configuration for this specialist type and version
specialist_config = cache_manager.specialists_config_cache.get_config(specialist_type, latest_version)
# Check if this specialist has the list type we're looking for
if list_type in specialist_config:
list_types[list_type] = specialist_config[list_type]
break
except Exception as e:
current_app.logger.error(f"Error checking specialist {specialist_type}: {e}")
continue
except Exception as e:
current_app.logger.error(f"Error retrieving specialist configurations: {e}")
# If no list types found, return empty script
if not list_types:
return ""
# Generate JavaScript code
js_code = "<script>\n"
js_code += "window.listTypeConfigs = window.listTypeConfigs || {};\n"
for list_type, config in list_types.items():
js_code += f"window.listTypeConfigs['{list_type}'] = {json.dumps(config, indent=2)};\n"
js_code += "</script>\n"
return js_code
def get_dynamic_fields(self):
"""Return a dictionary of dynamic fields per collection."""
result = {}
for collection_name, field_names in self.dynamic_fields.items():
result[collection_name] = [getattr(self, name) for name in field_names]
return result
def get_dynamic_data(self, collection_name):
"""Retrieve the data from dynamic fields of a specific collection."""
data = {}
if collection_name not in self.dynamic_fields:
return data
prefix_length = len(collection_name) + 1 # +1 for the underscore
for full_field_name in self.dynamic_fields[collection_name]:
original_field_name = full_field_name[prefix_length:]
field = getattr(self, full_field_name)
# Parse JSON for special field types
if field.type == 'BooleanField':
data[original_field_name] = full_field_name in self.raw_formdata
elif isinstance(field, (TaggingFieldsField, TaggingFieldsFilterField, DynamicArgumentsField, OrderedListField)) and field.data:
try:
data[original_field_name] = json.loads(field.data)
except json.JSONDecodeError:
# Validation should catch this, but just in case
data[original_field_name] = field.data
elif isinstance(field, ChunkingPatternsField):
try:
data[original_field_name] = patterns_to_json(field.data)
except Exception as e:
current_app.logger.error(f"Error converting initial data to patterns: {e}")
elif isinstance(field, DateField):
if field.data:
data[original_field_name] = field.data.isoformat()
else:
data[original_field_name] = None
else:
data[original_field_name] = field.data
return data
# def validate_on_submit(self):
# """Aangepaste validate_on_submit die dynamische velden correct verwerkt"""
# if request.method == 'POST':
# # Update formdata met de huidige request data
# self.formdata = request.form
# self.raw_formdata = request.form.to_dict()
#
# # Verwerk alle dynamische velden opnieuw met de actuele formuliergegevens
# for collection_name, field_names in self.dynamic_fields.items():
# for full_field_name in field_names:
# field = self._fields.get(full_field_name)
# if field:
# # Log voor debug
# current_app.logger.debug(
# f"Re-processing dynamic field {full_field_name} with current form data")
# # Verwerk het veld opnieuw met de huidige request data
# field.process(self.formdata)
#
# # Nu voeren we de standaard validatie uit
# return super().validate()
# return False
def validate_tagging_fields(form, field):
"""Validate the tagging fields structure"""
if not field.data:
return
try:
# Parse JSON data
fields_data = json.loads(field.data)
# Validate it's a dictionary
if not isinstance(fields_data, dict):
raise ValidationError("Tagging fields must be a dictionary")
# Validate each field definition
for field_name, field_def in fields_data.items():
if not isinstance(field_def, dict):
raise ValidationError(f"Field definition for {field_name} must be a dictionary")
# Check required properties
if 'type' not in field_def:
raise ValidationError(f"Field {field_name} missing required 'type' property")
# Validate type
if field_def['type'] not in ['string', 'integer', 'float', 'date', 'enum', 'color']:
raise ValidationError(f"Field {field_name} has invalid type: {field_def['type']}")
# Validate enum fields have allowed_values
if field_def['type'] == 'enum':
if 'allowed_values' not in field_def:
raise ValidationError(f"Enum field {field_name} missing required 'allowed_values' list")
if not isinstance(field_def['allowed_values'], list):
raise ValidationError(f"Field {field_name} allowed_values must be a list")
# Validate numeric fields
if field_def['type'] in ['integer', 'float']:
if 'min_value' in field_def and 'max_value' in field_def:
min_val = float(field_def['min_value'])
max_val = float(field_def['max_value'])
if min_val >= max_val:
raise ValidationError(f"Field {field_name} min_value must be less than max_value")
except json.JSONDecodeError:
raise ValidationError("Invalid JSON format")
except (TypeError, ValueError) as e:
raise ValidationError(f"Invalid field definition: {str(e)}")