411 lines
17 KiB
Python
411 lines
17 KiB
Python
from flask_wtf import FlaskForm
|
|
from wtforms import (IntegerField, FloatField, BooleanField, StringField, TextAreaField, FileField,
|
|
validators, ValidationError)
|
|
from flask import current_app
|
|
import json
|
|
|
|
from wtforms.fields.choices import SelectField
|
|
from wtforms.fields.datetime import DateField
|
|
from common.utils.config_field_types import TaggingFields, json_to_patterns, patterns_to_json
|
|
|
|
|
|
class TaggingFieldsField(TextAreaField):
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['render_kw'] = {
|
|
'class': 'chunking-patterns-field',
|
|
'data-handle-enter': 'true'
|
|
}
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
class TaggingFieldsFilterField(TextAreaField):
|
|
"""Field for tagging fields filter conditions"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['render_kw'] = {
|
|
'class': 'json-editor',
|
|
'data-handle-enter': 'true'
|
|
}
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
class DynamicArgumentsField(TextAreaField):
|
|
"""Field for dynamic arguments configuration"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['render_kw'] = {
|
|
'class': 'json-editor',
|
|
'data-handle-enter': 'true'
|
|
}
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
class ChunkingPatternsField(TextAreaField):
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['render_kw'] = {
|
|
'class': 'chunking-patterns-field',
|
|
'data-handle-enter': 'true'
|
|
}
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
class DynamicFormBase(FlaskForm):
|
|
def __init__(self, formdata=None, *args, **kwargs):
|
|
super(DynamicFormBase, self).__init__(*args, **kwargs)
|
|
# Maps collection names to lists of field names
|
|
self.dynamic_fields = {}
|
|
# Store formdata for later use
|
|
self.formdata = formdata
|
|
|
|
def _create_field_validators(self, field_def):
|
|
"""Create validators based on field definition"""
|
|
validators_list = []
|
|
|
|
# Required validator
|
|
if field_def.get('required', False):
|
|
validators_list.append(validators.InputRequired())
|
|
else:
|
|
validators_list.append(validators.Optional())
|
|
|
|
# Type-specific validators
|
|
field_type = field_def.get('type')
|
|
if field_type in ['integer', 'float']:
|
|
min_value = field_def.get('min_value')
|
|
max_value = field_def.get('max_value')
|
|
if min_value is not None or max_value is not None:
|
|
validators_list.append(
|
|
validators.NumberRange(
|
|
min=min_value if min_value is not None else -float('inf'),
|
|
max=max_value if max_value is not None else float('inf'),
|
|
message=f"Value must be between {min_value or '-∞'} and {max_value or '∞'}"
|
|
)
|
|
)
|
|
elif field_type == 'tagging_fields':
|
|
validators_list.append(self._validate_tagging_fields)
|
|
elif field_type == 'tagging_fields_filter':
|
|
validators_list.append(self._validate_tagging_fields_filter)
|
|
elif field_type == 'dynamic_arguments':
|
|
validators_list.append(self._validate_dynamic_arguments)
|
|
|
|
return validators_list
|
|
|
|
def _validate_tagging_fields(self, form, field):
|
|
"""Validate the tagging fields structure"""
|
|
if not field.data:
|
|
return
|
|
|
|
try:
|
|
# Parse JSON data
|
|
fields_data = json.loads(field.data)
|
|
|
|
# Validate using TaggingFields model
|
|
try:
|
|
TaggingFields.from_dict(fields_data)
|
|
except ValueError as e:
|
|
raise ValidationError(str(e))
|
|
|
|
except json.JSONDecodeError:
|
|
raise ValidationError("Invalid JSON format")
|
|
except Exception as e:
|
|
raise ValidationError(f"Invalid field definition: {str(e)}")
|
|
|
|
def _validate_tagging_fields_filter(self, form, field):
|
|
"""Validate the tagging fields filter structure"""
|
|
if not field.data:
|
|
return
|
|
|
|
try:
|
|
# Parse JSON data
|
|
filter_data = json.loads(field.data)
|
|
|
|
# Basic validation of filter structure
|
|
self._validate_filter_condition(filter_data)
|
|
|
|
except json.JSONDecodeError:
|
|
raise ValidationError("Invalid JSON format")
|
|
except ValidationError as e:
|
|
# Re-raise ValidationError from _validate_filter_condition
|
|
raise e
|
|
except Exception as e:
|
|
raise ValidationError(f"Invalid filter definition: {str(e)}")
|
|
|
|
def _validate_filter_condition(self, condition):
|
|
"""Recursively validate a filter condition structure"""
|
|
# Check if this is a logical condition (AND/OR/NOT)
|
|
if 'logical' in condition:
|
|
if condition['logical'] not in ['and', 'or', 'not']:
|
|
raise ValidationError(f"Invalid logical operator: {condition['logical']}")
|
|
|
|
if 'conditions' not in condition:
|
|
raise ValidationError("Missing 'conditions' array for logical operator")
|
|
|
|
if not isinstance(condition['conditions'], list):
|
|
raise ValidationError("'conditions' must be an array")
|
|
|
|
# Special case for NOT which should have exactly one condition
|
|
if condition['logical'] == 'not' and len(condition['conditions']) != 1:
|
|
raise ValidationError("'not' operator must have exactly one condition")
|
|
|
|
# Validate each sub-condition
|
|
for sub_condition in condition['conditions']:
|
|
self._validate_filter_condition(sub_condition)
|
|
|
|
# Check if this is a field condition
|
|
elif 'field' in condition:
|
|
if 'operator' not in condition:
|
|
raise ValidationError(f"Missing 'operator' for field condition on {condition['field']}")
|
|
|
|
if 'value' not in condition:
|
|
raise ValidationError(f"Missing 'value' for field condition on {condition['field']}")
|
|
|
|
# Validate operator types
|
|
# This is a simplified check - in a real implementation, you would validate
|
|
# against the actual field type from the catalog definition
|
|
valid_operators = {
|
|
'string': ['eq', 'neq', 'contains', 'not_contains', 'starts_with',
|
|
'ends_with', 'in', 'not_in', 'regex', 'not_regex'],
|
|
'numeric': ['eq', 'neq', 'gt', 'gte', 'lt', 'lte', 'between',
|
|
'not_between', 'in', 'not_in'],
|
|
'enum': ['eq', 'neq', 'in', 'not_in']
|
|
}
|
|
|
|
# For now, we just check that the operator is one of the known types
|
|
all_operators = set().union(*valid_operators.values())
|
|
if condition['operator'] not in all_operators:
|
|
raise ValidationError(f"Unknown operator '{condition['operator']}' for field {condition['field']}")
|
|
|
|
# Validate variable references if present
|
|
if isinstance(condition['value'], str) and condition['value'].startswith('$'):
|
|
# This is a variable reference - no further validation needed at form time
|
|
pass
|
|
|
|
# Additional validation based on operator type could be added here
|
|
|
|
else:
|
|
raise ValidationError("Filter condition must have either 'logical' or 'field' property")
|
|
|
|
def _validate_dynamic_arguments(self, form, field):
|
|
"""Validate the dynamic arguments structure"""
|
|
if not field.data:
|
|
return
|
|
|
|
try:
|
|
# Parse JSON data
|
|
args_data = json.loads(field.data)
|
|
|
|
# Validate basic structure (should be an object with argument definitions)
|
|
if not isinstance(args_data, dict):
|
|
raise ValidationError("Dynamic arguments must be an object with argument definitions")
|
|
|
|
# Validate each argument definition
|
|
for arg_name, arg_def in args_data.items():
|
|
if not isinstance(arg_def, dict):
|
|
raise ValidationError(f"Argument definition for '{arg_name}' must be an object")
|
|
|
|
# Check required properties
|
|
if 'type' not in arg_def:
|
|
raise ValidationError(f"Argument '{arg_name}' missing required 'type' property")
|
|
|
|
# Validate type
|
|
if arg_def['type'] not in ['string', 'integer', 'float', 'boolean', 'date', 'enum', 'object', 'array']:
|
|
raise ValidationError(f"Argument '{arg_name}' has invalid type: {arg_def['type']}")
|
|
|
|
# Validate enum fields have allowed_values
|
|
if arg_def['type'] == 'enum' and 'allowed_values' not in arg_def:
|
|
raise ValidationError(f"Enum argument '{arg_name}' missing required 'allowed_values' list")
|
|
|
|
except json.JSONDecodeError:
|
|
raise ValidationError("Invalid JSON format")
|
|
except Exception as e:
|
|
raise ValidationError(f"Invalid argument definition: {str(e)}")
|
|
|
|
def add_dynamic_fields(self, collection_name, config, initial_data=None):
|
|
"""Add dynamic fields to the form based on the configuration."""
|
|
self.dynamic_fields[collection_name] = []
|
|
for field_name, field_def in config.items():
|
|
# Prefix the field name with the collection name
|
|
full_field_name = f"{collection_name}_{field_name}"
|
|
label = field_def.get('name', field_name)
|
|
field_type = field_def.get('type')
|
|
description = field_def.get('description', '')
|
|
default = field_def.get('default')
|
|
|
|
# Determine standard validators
|
|
field_validators = self._create_field_validators(field_def)
|
|
|
|
# Handle special case for field types
|
|
if field_type == 'tagging_fields':
|
|
field_class = TaggingFieldsField
|
|
extra_classes = 'json-editor'
|
|
field_kwargs = {}
|
|
elif field_type == 'tagging_fields_filter':
|
|
field_class = TaggingFieldsFilterField
|
|
extra_classes = 'json-editor'
|
|
field_kwargs = {}
|
|
elif field_type == 'dynamic_arguments':
|
|
field_class = DynamicArgumentsField
|
|
extra_classes = 'json-editor'
|
|
field_kwargs = {}
|
|
elif field_type == 'enum':
|
|
field_class = SelectField
|
|
allowed_values = field_def.get('allowed_values', [])
|
|
choices = [(str(val), str(val)) for val in allowed_values]
|
|
extra_classes = ''
|
|
field_kwargs = {'choices': choices}
|
|
elif field_type == 'chunking_patterns':
|
|
field_class = ChunkingPatternsField
|
|
extra_classes = ['monospace-text', 'pattern-input']
|
|
field_kwargs = {}
|
|
else:
|
|
extra_classes = ''
|
|
field_class = {
|
|
'integer': IntegerField,
|
|
'float': FloatField,
|
|
'boolean': BooleanField,
|
|
'string': StringField,
|
|
'text': TextAreaField,
|
|
'date': DateField,
|
|
'file': FileField,
|
|
}.get(field_type, StringField)
|
|
field_kwargs = {}
|
|
|
|
# Prepare field data
|
|
field_data = None
|
|
if initial_data and field_name in initial_data:
|
|
field_data = initial_data[field_name]
|
|
if field_type in ['tagging_fields', 'tagging_fields_filter', 'dynamic_arguments'] and isinstance(
|
|
field_data, dict):
|
|
try:
|
|
field_data = json.dumps(field_data, indent=2)
|
|
except (TypeError, ValueError) as e:
|
|
current_app.logger.error(f"Error converting initial data to JSON: {e}")
|
|
field_data = "{}"
|
|
elif field_type == 'chunking_patterns':
|
|
try:
|
|
field_data = json_to_patterns(field_data)
|
|
except (TypeError, ValueError) as e:
|
|
current_app.logger.error(f"Error converting initial data to a list of patterns: {e}")
|
|
field_data = {}
|
|
elif default is not None:
|
|
field_data = default
|
|
|
|
# Create render_kw with classes and any other HTML attributes
|
|
render_kw = {'class': extra_classes} if extra_classes else {}
|
|
if description:
|
|
render_kw['title'] = description # For tooltip
|
|
render_kw['data-bs-toggle'] = 'tooltip'
|
|
render_kw['data-bs-placement'] = 'right'
|
|
|
|
# Create the field
|
|
field_kwargs.update({
|
|
'label': label,
|
|
'description': description,
|
|
'validators': field_validators,
|
|
'default': field_data,
|
|
'render_kw': render_kw
|
|
})
|
|
|
|
unbound_field = field_class(**field_kwargs)
|
|
|
|
# Bind the field to the form
|
|
bound_field = unbound_field.bind(form=self, name=full_field_name)
|
|
|
|
# Process the field with formdata
|
|
if self.formdata and full_field_name in self.formdata:
|
|
bound_field.process(self.formdata)
|
|
else:
|
|
bound_field.process(formdata=None, data=field_data) # Use prepared field_data
|
|
|
|
# Add the field to the form
|
|
setattr(self, full_field_name, bound_field)
|
|
self._fields[full_field_name] = bound_field
|
|
self.dynamic_fields[collection_name].append(full_field_name)
|
|
|
|
def get_static_fields(self):
|
|
"""Return a list of static field instances."""
|
|
# Get names of dynamic fields
|
|
dynamic_field_names = set()
|
|
for field_list in self.dynamic_fields.values():
|
|
dynamic_field_names.update(field_list)
|
|
|
|
# Return all fields that are not dynamic
|
|
return [field for name, field in self._fields.items() if name not in dynamic_field_names]
|
|
|
|
def get_dynamic_fields(self):
|
|
"""Return a dictionary of dynamic fields per collection."""
|
|
result = {}
|
|
for collection_name, field_names in self.dynamic_fields.items():
|
|
result[collection_name] = [getattr(self, name) for name in field_names]
|
|
return result
|
|
|
|
def get_dynamic_data(self, collection_name):
|
|
"""Retrieve the data from dynamic fields of a specific collection."""
|
|
data = {}
|
|
if collection_name not in self.dynamic_fields:
|
|
return data
|
|
prefix_length = len(collection_name) + 1 # +1 for the underscore
|
|
for full_field_name in self.dynamic_fields[collection_name]:
|
|
original_field_name = full_field_name[prefix_length:]
|
|
field = getattr(self, full_field_name)
|
|
# Parse JSON for special field types
|
|
if isinstance(field, (TaggingFieldsField, TaggingFieldsFilterField, DynamicArgumentsField)) and field.data:
|
|
try:
|
|
data[original_field_name] = json.loads(field.data)
|
|
except json.JSONDecodeError:
|
|
# Validation should catch this, but just in case
|
|
data[original_field_name] = field.data
|
|
elif isinstance(field, ChunkingPatternsField):
|
|
try:
|
|
data[original_field_name] = patterns_to_json(field.data)
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error converting initial data to patterns: {e}")
|
|
else:
|
|
data[original_field_name] = field.data
|
|
return data
|
|
|
|
|
|
def validate_tagging_fields(form, field):
|
|
"""Validate the tagging fields structure"""
|
|
if not field.data:
|
|
return
|
|
|
|
try:
|
|
# Parse JSON data
|
|
fields_data = json.loads(field.data)
|
|
|
|
# Validate it's a dictionary
|
|
if not isinstance(fields_data, dict):
|
|
raise ValidationError("Tagging fields must be a dictionary")
|
|
|
|
# Validate each field definition
|
|
for field_name, field_def in fields_data.items():
|
|
if not isinstance(field_def, dict):
|
|
raise ValidationError(f"Field definition for {field_name} must be a dictionary")
|
|
|
|
# Check required properties
|
|
if 'type' not in field_def:
|
|
raise ValidationError(f"Field {field_name} missing required 'type' property")
|
|
|
|
# Validate type
|
|
if field_def['type'] not in ['string', 'integer', 'float', 'date', 'enum']:
|
|
raise ValidationError(f"Field {field_name} has invalid type: {field_def['type']}")
|
|
|
|
# Validate enum fields have allowed_values
|
|
if field_def['type'] == 'enum':
|
|
if 'allowed_values' not in field_def:
|
|
raise ValidationError(f"Enum field {field_name} missing required 'allowed_values' list")
|
|
if not isinstance(field_def['allowed_values'], list):
|
|
raise ValidationError(f"Field {field_name} allowed_values must be a list")
|
|
|
|
# Validate numeric fields
|
|
if field_def['type'] in ['integer', 'float']:
|
|
if 'min_value' in field_def and 'max_value' in field_def:
|
|
min_val = float(field_def['min_value'])
|
|
max_val = float(field_def['max_value'])
|
|
if min_val >= max_val:
|
|
raise ValidationError(f"Field {field_name} min_value must be less than max_value")
|
|
|
|
except json.JSONDecodeError:
|
|
raise ValidationError("Invalid JSON format")
|
|
except (TypeError, ValueError) as e:
|
|
raise ValidationError(f"Invalid field definition: {str(e)}") |