From d35ec9f5aeb96cac76ed24aa9dd40ead6038ad6f Mon Sep 17 00:00:00 2001 From: Josako Date: Thu, 5 Dec 2024 15:19:37 +0100 Subject: [PATCH] - Addition of general chunking parameters chunking_heading_level and chunking patterns - Addition of Processor types docx and markdown --- common/utils/config_field_types.py | 51 ++++++- common/utils/document_utils.py | 114 +++++++++++++++- config/config.py | 6 +- config/type_defs/processor_types.py | 114 +++++++++++++++- eveai_api/api/document_api.py | 120 +++++++++++++++- eveai_app/views/document_forms.py | 15 +- eveai_app/views/document_views.py | 5 +- eveai_app/views/dynamic_form_base.py | 62 ++++++++- eveai_workers/processors/audio_processor.py | 12 +- eveai_workers/processors/base_processor.py | 8 +- eveai_workers/processors/docx_processor.py | 129 ++++++++++++++++++ eveai_workers/processors/html_processor.py | 6 +- .../processors/markdown_processor.py | 48 +++++++ eveai_workers/processors/pdf_processor.py | 4 +- .../processors/transcription_processor.py | 4 +- eveai_workers/tasks.py | 83 ++++++++--- requirements.txt | 3 +- 17 files changed, 718 insertions(+), 66 deletions(-) create mode 100644 eveai_workers/processors/docx_processor.py create mode 100644 eveai_workers/processors/markdown_processor.py diff --git a/common/utils/config_field_types.py b/common/utils/config_field_types.py index 6346a10..5fee4d9 100644 --- a/common/utils/config_field_types.py +++ b/common/utils/config_field_types.py @@ -64,6 +64,20 @@ class TaggingFields(BaseModel): } +class ChunkingPatternsField(BaseModel): + """Represents a set of chunking patterns""" + patterns: List[str] + + @field_validator('patterns') + def validate_patterns(cls, patterns): + for pattern in patterns: + try: + re.compile(pattern) + except re.error as e: + raise ValueError(f"Invalid regex pattern '{pattern}': {str(e)}") + return patterns + + class ArgumentConstraint(BaseModel): """Base class for all argument constraints""" description: Optional[str] = None @@ -610,4 +624,39 @@ def _generate_yaml_docs(fields: Dict[str, Any], version: str) -> str: } } - return yaml.dump(doc, sort_keys=False, default_flow_style=False) \ No newline at end of file + return yaml.dump(doc, sort_keys=False, default_flow_style=False) + + +def patterns_to_json(text_area_content: str) -> str: + """Convert line-based patterns to JSON""" + text_area_content = text_area_content.strip() + if len(text_area_content) == 0: + return json.dumps([]) + # Split on newlines and remove empty lines + patterns = [line.strip() for line in text_area_content.split('\n') if line.strip()] + return json.dumps(patterns) + + +def json_to_patterns(json_content: str) -> str: + """Convert JSON patterns list to text area content""" + try: + patterns = json.loads(json_content) + if not isinstance(patterns, list): + raise ValueError("JSON must contain a list of patterns") + # Join with newlines + return '\n'.join(patterns) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON format: {e}") + + +def json_to_pattern_list(json_content: str) -> list: + """Convert JSON patterns list to text area content""" + try: + patterns = json.loads(json_content) + if not isinstance(patterns, list): + raise ValueError("JSON must contain a list of patterns") + # Unescape if needed + patterns = [pattern.replace('\\\\', '\\') for pattern in patterns] + return patterns + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON format: {e}") diff --git a/common/utils/document_utils.py b/common/utils/document_utils.py index 1a6ed1f..c97b552 100644 --- a/common/utils/document_utils.py +++ b/common/utils/document_utils.py @@ -12,7 +12,7 @@ import requests from urllib.parse import urlparse, unquote, urlunparse import os from .eveai_exceptions import (EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, - EveAIInvalidCatalog, EveAIInvalidDocument, EveAIInvalidDocumentVersion) + EveAIInvalidCatalog, EveAIInvalidDocument, EveAIInvalidDocumentVersion, EveAIException) from ..models.user import Tenant @@ -219,12 +219,6 @@ def start_embedding_task(tenant_id, doc_vers_id): return task.id -def validate_file_type(extension): - if extension not in current_app.config['SUPPORTED_FILE_TYPES']: - raise EveAIUnsupportedFileType(f"Filetype {extension} is currently not supported. " - f"Supported filetypes: {', '.join(current_app.config['SUPPORTED_FILE_TYPES'])}") - - def get_filename_from_url(url): parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') @@ -363,3 +357,109 @@ def cope_with_local_url(url): return url +def lookup_document(tenant_id: int, lookup_criteria: dict, metadata_type: str) -> tuple[Document, DocumentVersion]: + """ + Look up a document using metadata criteria + + Args: + tenant_id: ID of the tenant + lookup_criteria: Dictionary of key-value pairs to match in metadata + metadata_type: Which metadata to search in ('user_metadata' or 'system_metadata') + + Returns: + Tuple of (Document, DocumentVersion) if found + + Raises: + ValueError: If invalid metadata_type provided + EveAIException: If lookup fails + """ + if metadata_type not in ['user_metadata', 'system_metadata']: + raise ValueError(f"Invalid metadata_type: {metadata_type}") + + try: + # Query for the latest document version matching the criteria + query = (db.session.query(Document, DocumentVersion) + .join(DocumentVersion) + .filter(Document.id == DocumentVersion.doc_id) + .order_by(DocumentVersion.id.desc())) + + # Add metadata filtering using PostgreSQL JSONB operators + metadata_field = getattr(DocumentVersion, metadata_type) + for key, value in lookup_criteria.items(): + query = query.filter(metadata_field[key].astext == str(value)) + + # Get first result + result = query.first() + + if not result: + raise EveAIException( + f"No document found matching criteria in {metadata_type}", + status_code=404 + ) + + return result + + except SQLAlchemyError as e: + current_app.logger.error(f'Database error during document lookup for tenant {tenant_id}: {e}') + raise EveAIException( + "Database error during document lookup", + status_code=500 + ) + except Exception as e: + current_app.logger.error(f'Error during document lookup for tenant {tenant_id}: {e}') + raise EveAIException( + "Error during document lookup", + status_code=500 + ) + + +# Add to common/utils/document_utils.py + +def refresh_document_with_content(doc_id: int, tenant_id: int, file_content: bytes, api_input: dict) -> tuple: + """ + Refresh document with new content + + Args: + doc_id: Document ID + tenant_id: Tenant ID + file_content: New file content + api_input: Additional document information + + Returns: + Tuple of (new_version, task_id) + """ + doc = Document.query.get(doc_id) + if not doc: + raise EveAIInvalidDocument(tenant_id, doc_id) + + old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first() + + # Create new version with same file type as original + extension = old_doc_vers.file_type + + new_doc_vers = create_version_for_document( + doc, tenant_id, + '', # No URL for content-based updates + old_doc_vers.sub_file_type, + api_input.get('language', old_doc_vers.language), + api_input.get('user_context', old_doc_vers.user_context), + api_input.get('user_metadata', old_doc_vers.user_metadata), + api_input.get('catalog_properties', old_doc_vers.catalog_properties), + ) + + try: + db.session.add(new_doc_vers) + db.session.commit() + except SQLAlchemyError as e: + db.session.rollback() + return None, str(e) + + # Upload new content + upload_file_for_version(new_doc_vers, file_content, extension, tenant_id) + + # Start embedding task + task = current_celery.send_task('create_embeddings', args=[tenant_id, new_doc_vers.id], queue='embeddings') + current_app.logger.info(f'Embedding creation started for document {doc_id} on version {new_doc_vers.id} ' + f'with task id: {task.id}.') + + return new_doc_vers, task.id diff --git a/config/config.py b/config/config.py index 48650cf..3f0e96d 100644 --- a/config/config.py +++ b/config/config.py @@ -55,7 +55,6 @@ class Config(object): # file upload settings MAX_CONTENT_LENGTH = 50 * 1024 * 1024 - UPLOAD_EXTENSIONS = ['.txt', '.pdf', '.png', '.jpg', '.jpeg', '.gif'] # supported languages SUPPORTED_LANGUAGES = ['en', 'fr', 'nl', 'de', 'es'] @@ -143,10 +142,7 @@ class Config(object): LANGCHAIN_ENDPOINT = 'https://api.smith.langchain.com' LANGCHAIN_PROJECT = "eveai" - - SUPPORTED_FILE_TYPES = ['pdf', 'html', 'md', 'txt', 'mp3', 'mp4', 'ogg', 'srt'] - - TENANT_TYPES = ['Active', 'Demo', 'Inactive', 'Test', 'Wordpress Starter'] + TENANT_TYPES = ['Active', 'Demo', 'Inactive', 'Test'] # The maximum number of seconds allowed for audio compression (to save resources) MAX_COMPRESSION_DURATION = 60*10 # 10 minutes diff --git a/config/type_defs/processor_types.py b/config/type_defs/processor_types.py index b0ec047..37973ee 100644 --- a/config/type_defs/processor_types.py +++ b/config/type_defs/processor_types.py @@ -5,6 +5,19 @@ PROCESSOR_TYPES = { "file_types": "html", "Description": "A processor for HTML files", "configuration": { + "chunking_patterns": { + "name": "Chunking Patterns", + "description": "A list of Patterns used to chunk files into logical pieces", + "type": "chunking_patterns", + "required": False + }, + "chunking_heading_level": { + "name": "Chunking Heading Level", + "type": "integer", + "description": "Maximum heading level to consider for chunking (1-6)", + "required": False, + "default": 2 + }, "html_tags": { "name": "HTML Tags", "type": "string", @@ -45,7 +58,21 @@ PROCESSOR_TYPES = { "name": "PDF Processor", "file_types": "pdf", "Description": "A Processor for PDF files", - "configuration": {} + "configuration": { + "chunking_patterns": { + "name": "Chunking Patterns", + "description": "A list of Patterns used to chunk files into logical pieces", + "type": "chunking_patterns", + "required": False + }, + "chunking_heading_level": { + "name": "Chunking Heading Level", + "type": "integer", + "description": "Maximum heading level to consider for chunking (1-6)", + "required": False, + "default": 2 + }, + }, }, "AUDIO_PROCESSOR": { "name": "AUDIO Processor", @@ -53,4 +80,89 @@ PROCESSOR_TYPES = { "Description": "A Processor for audio files", "configuration": {} }, + "MARKDOWN_PROCESSOR": { + "name": "Markdown Processor", + "file_types": "md", + "Description": "A Processor for markdown files", + "configuration": { + "chunking_patterns": { + "name": "Chunking Patterns", + "description": "A list of Patterns used to chunk files into logical pieces", + "type": "chunking_patterns", + "required": False + }, + "chunking_heading_level": { + "name": "Chunking Heading Level", + "type": "integer", + "description": "Maximum heading level to consider for chunking (1-6)", + "required": False, + "default": 2 + }, + } + }, + "DOCX_PROCESSOR": { + "name": "DOCX Processor", + "file_types": "docx", + "Description": "A processor for DOCX files", + "configuration": { + "chunking_patterns": { + "name": "Chunking Patterns", + "description": "A list of Patterns used to chunk files into logical pieces", + "type": "chunking_patterns", + "required": False + }, + "chunking_heading_level": { + "name": "Chunking Heading Level", + "type": "integer", + "description": "Maximum heading level to consider for chunking (1-6)", + "required": False, + "default": 2 + }, + "extract_comments": { + "name": "Extract Comments", + "type": "boolean", + "description": "Whether to include document comments in the markdown", + "required": False, + "default": False + }, + "extract_headers_footers": { + "name": "Extract Headers/Footers", + "type": "boolean", + "description": "Whether to include headers and footers in the markdown", + "required": False, + "default": False + }, + "preserve_formatting": { + "name": "Preserve Formatting", + "type": "boolean", + "description": "Whether to preserve bold, italic, and other text formatting", + "required": False, + "default": True + }, + "list_style": { + "name": "List Style", + "type": "enum", + "description": "How to format lists in markdown", + "required": False, + "default": "dash", + "allowed_values": ["dash", "asterisk", "plus"] + }, + "image_handling": { + "name": "Image Handling", + "type": "enum", + "description": "How to handle embedded images", + "required": False, + "default": "skip", + "allowed_values": ["skip", "extract", "placeholder"] + }, + "table_alignment": { + "name": "Table Alignment", + "type": "enum", + "description": "How to align table contents", + "required": False, + "default": "left", + "allowed_values": ["left", "center", "preserve"] + } + } + } } diff --git a/eveai_api/api/document_api.py b/eveai_api/api/document_api.py index 3e5e004..fac912e 100644 --- a/eveai_api/api/document_api.py +++ b/eveai_api/api/document_api.py @@ -11,9 +11,10 @@ from common.utils.document_utils import ( create_document_stack, process_url, start_embedding_task, validate_file_type, EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, get_documents_list, edit_document, refresh_document, edit_document_version, - refresh_document_with_info + refresh_document_with_info, lookup_document ) from common.utils.eveai_exceptions import EveAIException +from eveai_api.api.auth import requires_service def validate_date(date_str): @@ -59,6 +60,7 @@ add_document_response = document_ns.model('AddDocumentResponse', { @document_ns.route('/add_document') class AddDocument(Resource): @jwt_required() + @requires_service('DOCAPI') @document_ns.expect(upload_parser) @document_ns.response(201, 'Document added successfully', add_document_response) @document_ns.response(400, 'Validation Error') @@ -134,6 +136,7 @@ add_url_response = document_ns.model('AddURLResponse', { @document_ns.route('/add_url') class AddURL(Resource): @jwt_required() + @requires_service('DOCAPI') @document_ns.expect(add_url_model) @document_ns.response(201, 'Document added successfully', add_url_response) @document_ns.response(400, 'Validation Error') @@ -190,6 +193,7 @@ document_list_model = document_ns.model('DocumentList', { @document_ns.route('/list') class DocumentList(Resource): @jwt_required() + @requires_service('DOCAPI') @document_ns.doc('list_documents') @document_ns.marshal_list_with(document_list_model, envelope='documents') def get(self): @@ -210,6 +214,7 @@ edit_document_model = document_ns.model('EditDocument', { @document_ns.route('/') class DocumentResource(Resource): @jwt_required() + @requires_service('DOCAPI') @document_ns.doc('edit_document') @document_ns.expect(edit_document_model) @document_ns.response(200, 'Document updated successfully') @@ -232,6 +237,7 @@ class DocumentResource(Resource): return e.to_dict(), e.status_code @jwt_required() + @requires_service('DOCAPI') @document_ns.doc('refresh_document') @document_ns.response(200, 'Document refreshed successfully') def post(self, document_id): @@ -253,6 +259,7 @@ edit_document_version_model = document_ns.model('EditDocumentVersion', { @document_ns.route('/version/') class DocumentVersionResource(Resource): @jwt_required() + @requires_service('DOCAPI') @document_ns.doc('edit_document_version') @document_ns.expect(edit_document_version_model) @document_ns.response(200, 'Document version updated successfully') @@ -280,6 +287,7 @@ refresh_document_model = document_ns.model('RefreshDocument', { @document_ns.route('//refresh') class RefreshDocument(Resource): @jwt_required() + @requires_service('DOCAPI') @document_ns.response(200, 'Document refreshed successfully') @document_ns.response(404, 'Document not found') def post(self, document_id): @@ -310,6 +318,7 @@ class RefreshDocument(Resource): @document_ns.route('//refresh_with_info') class RefreshDocumentWithInfo(Resource): @jwt_required() + @requires_service('DOCAPI') @document_ns.expect(refresh_document_model) @document_ns.response(200, 'Document refreshed successfully') @document_ns.response(400, 'Validation Error') @@ -338,3 +347,112 @@ class RefreshDocumentWithInfo(Resource): except Exception as e: current_app.logger.error(f'Error refreshing document with info: {str(e)}') return {'message': 'Internal server error'}, 500 + + +# Define models for lookup requests +lookup_model = document_ns.model('DocumentLookup', { + 'lookup_criteria': fields.Raw(required=True, + description='JSON object containing key-value pairs to match in metadata. ' + 'Example: {"external_id": "123", "source": "zapier", "source_type": "google_docs"}'), + 'metadata_type': fields.String(required=True, enum=['user_metadata', 'system_metadata'], + description='Which metadata field to search in') +}) + +lookup_response = document_ns.model('DocumentLookupResponse', { + 'document_id': fields.Integer(description='ID of the found document'), + 'document_version_id': fields.Integer(description='ID of the latest document version'), + 'name': fields.String(description='Document name'), + 'metadata': fields.Raw(description='Full metadata of the found document') +}) + + +@document_ns.route('/lookup') +class DocumentLookup(Resource): + @jwt_required() + @requires_service('DOCAPI') + @document_ns.expect(lookup_model) + @document_ns.marshal_with(lookup_response) + @document_ns.response(200, 'Document found', lookup_response) + @document_ns.response(404, 'No document found matching criteria') + def post(self): + """ + Look up a document using metadata criteria + """ + tenant_id = get_jwt_identity() + try: + data = request.json + document, version = lookup_document( + tenant_id, + data['lookup_criteria'], + data['metadata_type'] + ) + + return { + 'document_id': document.id, + 'document_version_id': version.id, + 'name': document.name, + 'metadata': getattr(version, data['metadata_type']) + } + + except EveAIException as e: + return e.to_dict(), e.status_code + + except KeyError as e: + return {'message': f'Missing required field: {str(e)}'}, 400 + + +refresh_content_model = document_ns.model('RefreshDocumentContent', { + 'file_content': fields.Raw(required=True, description='The new file content'), + 'language': fields.String(required=False, description='Language of the document'), + 'user_context': fields.String(required=False, description='User context for the document'), + 'user_metadata': fields.Raw(required=False, description='Custom metadata fields'), + 'catalog_properties': fields.Raw(required=False, description='Catalog-specific properties'), + 'trigger_service': fields.String(required=False, description='Service that triggered the update') +}) + + +@document_ns.route('//refresh_content') +class RefreshDocumentContent(Resource): + @jwt_required() + @requires_service('DOCAPI') + @document_ns.expect(refresh_content_model) + @document_ns.response(200, 'Document refreshed successfully') + def post(self, document_id): + """Refresh a document with new content""" + tenant_id = get_jwt_identity() + try: + data = request.json + file_content = data['file_content'] + + # Build user_metadata by merging: + # 1. Existing metadata (if any) + # 2. New metadata from request + # 3. Zapier-specific fields + user_metadata = data.get('user_metadata', {}) + user_metadata.update({ + 'source': 'zapier', + 'trigger_service': data.get('trigger_service') + }) + data['user_metadata'] = user_metadata + + # Keep catalog_properties separate + if 'catalog_properties' in data: + # We could add validation here against catalog configuration + data['catalog_properties'] = data['catalog_properties'] + + new_version, task_id = refresh_document_with_content( + document_id, + tenant_id, + file_content, + data + ) + + return { + 'message': f'Document refreshed successfully. New version: {new_version.id}. Task ID: {task_id}', + 'document_id': document_id, + 'document_version_id': new_version.id, + 'task_id': task_id + }, 200 + + except EveAIException as e: + return e.to_dict(), e.status_code diff --git a/eveai_app/views/document_forms.py b/eveai_app/views/document_forms.py index fcc812c..71346ad 100644 --- a/eveai_app/views/document_forms.py +++ b/eveai_app/views/document_forms.py @@ -15,14 +15,6 @@ from config.type_defs.retriever_types import RETRIEVER_TYPES from .dynamic_form_base import DynamicFormBase -def allowed_file(form, field): - if field.data: - filename = field.data.filename - allowed_extensions = current_app.config.get('SUPPORTED_FILE_TYPES', []) - if not ('.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions): - raise ValidationError('Unsupported file type.') - - def validate_json(form, field): if field.data: try: @@ -101,7 +93,10 @@ class ProcessorForm(FlaskForm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Dynamically populate the 'type' field using the constructor - self.type.choices = [(key, value['name']) for key, value in PROCESSOR_TYPES.items()] + self.type.choices = sorted( + [(key, value['name']) for key, value in PROCESSOR_TYPES.items()], + key=lambda x: x[1], + ) class EditProcessorForm(DynamicFormBase): @@ -177,7 +172,7 @@ class EditRetrieverForm(DynamicFormBase): class AddDocumentForm(DynamicFormBase): - file = FileField('File', validators=[FileRequired(), allowed_file]) + file = FileField('File', validators=[FileRequired()]) catalog = StringField('Catalog', render_kw={'readonly': True}) sub_file_type = StringField('Sub File Type', validators=[Optional(), Length(max=50)]) name = StringField('Name', validators=[Length(max=100)]) diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index 2af18d2..9d8ed42 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -14,7 +14,7 @@ import json from common.models.document import Document, DocumentVersion, Catalog, Retriever, Processor from common.extensions import db from common.models.interaction import Specialist, SpecialistRetriever -from common.utils.document_utils import validate_file_type, create_document_stack, start_embedding_task, process_url, \ +from common.utils.document_utils import create_document_stack, start_embedding_task, process_url, \ edit_document, \ edit_document_version, refresh_document from common.utils.eveai_exceptions import EveAIInvalidLanguageException, EveAIUnsupportedFileType, \ @@ -391,9 +391,6 @@ def add_document(): sub_file_type = form.sub_file_type.data filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() - - validate_file_type(extension) - catalog_properties = {} document_version_configurations = CATALOG_TYPES[catalog.type]['document_version_configurations'] for config in document_version_configurations: diff --git a/eveai_app/views/dynamic_form_base.py b/eveai_app/views/dynamic_form_base.py index 760de9e..bee14ab 100644 --- a/eveai_app/views/dynamic_form_base.py +++ b/eveai_app/views/dynamic_form_base.py @@ -5,7 +5,46 @@ import json from wtforms.fields.choices import SelectField from wtforms.fields.datetime import DateField -from common.utils.config_field_types import TaggingFields +from common.utils.config_field_types import TaggingFields, json_to_patterns, patterns_to_json + + +class TaggingFieldsField(TextAreaField): + def __init__(self, *args, **kwargs): + kwargs['render_kw'] = { + 'class': 'chunking-patterns-field', + 'data-handle-enter': 'true' + } + super().__init__(*args, **kwargs) + + # def _value(self): + # if self.data: + # return json.dumps(self.data) + # return '' + # + # def process_formdata(self, valuelist): + # if valuelist and valuelist[0]: + # try: + # self.data = json.loads(valuelist[0]) + # except json.JSONDecodeError as e: + # raise ValueError('Not valid JSON content') + + +class ChunkingPatternsField(TextAreaField): + def __init__(self, *args, **kwargs): + kwargs['render_kw'] = { + 'class': 'chunking-patterns-field', + 'data-handle-enter': 'true' + } + super().__init__(*args, **kwargs) + + # def _value(self): + # if self.data: + # return '\n'.join(self.data) + # return '' + # + # def process_formdata(self, valuelist): + # if valuelist and valuelist[0]: + # self.data = [line.strip() for line in valuelist[0].split('\n') if line.strip()] class DynamicFormBase(FlaskForm): @@ -80,7 +119,7 @@ class DynamicFormBase(FlaskForm): # Handle special case for tagging_fields if field_type == 'tagging_fields': - field_class = TextAreaField + field_class = TaggingFieldsField extra_classes = 'json-editor' field_kwargs = {} elif field_type == 'enum': @@ -89,6 +128,10 @@ class DynamicFormBase(FlaskForm): choices = [(str(val), str(val)) for val in allowed_values] extra_classes = '' field_kwargs = {'choices': choices} + elif field_type == 'chunking_patterns': + field_class = ChunkingPatternsField + extra_classes = ['monospace-text', 'pattern-input'] + field_kwargs = {} else: extra_classes = '' field_class = { @@ -111,6 +154,12 @@ class DynamicFormBase(FlaskForm): except (TypeError, ValueError) as e: current_app.logger.error(f"Error converting initial data to JSON: {e}") field_data = "{}" + elif field_type == 'chunking_patterns': + try: + field_data = json_to_patterns(field_data) + except (TypeError, ValueError) as e: + current_app.logger.error(f"Error converting initial data to a list of patterns: {e}") + field_data = {} elif default is not None: field_data = default @@ -173,12 +222,17 @@ class DynamicFormBase(FlaskForm): original_field_name = full_field_name[prefix_length:] field = getattr(self, full_field_name) # Parse JSON for tagging_fields type - if isinstance(field, TextAreaField) and field.data: + if isinstance(field, TaggingFieldsField) and field.data: try: data[original_field_name] = json.loads(field.data) except json.JSONDecodeError: # Validation should catch this, but just in case data[original_field_name] = field.data + elif isinstance(field, ChunkingPatternsField): + try: + data[original_field_name] = patterns_to_json(field.data) + except Exception as e: + current_app.logger.error(f"Error converting initial data to patterns: {e}") else: data[original_field_name] = field.data return data @@ -230,5 +284,3 @@ def validate_tagging_fields(form, field): except (TypeError, ValueError) as e: raise ValidationError(f"Invalid field definition: {str(e)}") - - diff --git a/eveai_workers/processors/audio_processor.py b/eveai_workers/processors/audio_processor.py index 30518ee..c8063a0 100644 --- a/eveai_workers/processors/audio_processor.py +++ b/eveai_workers/processors/audio_processor.py @@ -46,7 +46,7 @@ class AudioProcessor(TranscriptionBaseProcessor): try: audio_info = AudioSegment.from_file(temp_file_path, format=self.document_version.file_type) total_duration = len(audio_info) - self._log_tuning("_compress_audio", { + self.log_tuning("_compress_audio", { "Audio Duration (ms)": total_duration, }) segment_length = self.max_compression_duration * 1000 # Convert to milliseconds @@ -55,7 +55,7 @@ class AudioProcessor(TranscriptionBaseProcessor): compressed_segments = AudioSegment.empty() for i in range(total_chunks): - self._log_tuning("_compress_audio", { + self.log_tuning("_compress_audio", { "Segment Nr": f"{i + 1} of {total_chunks}" }) @@ -87,7 +87,7 @@ class AudioProcessor(TranscriptionBaseProcessor): compressed_filename, compressed_buffer.read() ) - self._log_tuning("_compress_audio", { + self.log_tuning("_compress_audio", { "Compressed audio to MinIO": compressed_filename }) @@ -172,14 +172,14 @@ class AudioProcessor(TranscriptionBaseProcessor): transcriptions.append(trans) - self._log_tuning("_transcribe_audio", { + self.log_tuning("_transcribe_audio", { "Chunk Nr": f"{i + 1} of {total_chunks}", "Segment Duration": segment_duration, "Transcription": trans, }) else: self._log("Warning: Received empty transcription", level='warning') - self._log_tuning("_transcribe_audio", {"ERROR": "No transcription"}) + self.log_tuning("_transcribe_audio", {"ERROR": "No transcription"}) except Exception as e: self._log(f"Error during transcription: {str(e)}", level='error') @@ -202,7 +202,7 @@ class AudioProcessor(TranscriptionBaseProcessor): transcription_filename, full_transcription.encode('utf-8') ) - self._log_tuning(f"Saved transcription to MinIO: {transcription_filename}") + self.log_tuning(f"Saved transcription to MinIO: {transcription_filename}") return full_transcription diff --git a/eveai_workers/processors/base_processor.py b/eveai_workers/processors/base_processor.py index 5cf6358..b072841 100644 --- a/eveai_workers/processors/base_processor.py +++ b/eveai_workers/processors/base_processor.py @@ -17,7 +17,7 @@ class BaseProcessor(ABC): self.tuning_logger = None self._setup_tuning_logger() - self._log_tuning("Processor initialized", { + self.log_tuning("Processor initialized", { "processor_type": processor.type if processor else None, "document_version": document_version.id if document_version else None, "catalog": catalog.id if catalog else None @@ -42,6 +42,10 @@ class BaseProcessor(ABC): def process(self): pass + @property + def configuration(self): + return self.processor.configuration + def _save_markdown(self, markdown): markdown_filename = f"{self.document_version.id}.md" minio_client.upload_document_file( @@ -78,7 +82,7 @@ class BaseProcessor(ABC): return markdown - def _log_tuning(self, message: str, data: Dict[str, Any] = None) -> None: + def log_tuning(self, message: str, data: Dict[str, Any] = None) -> None: if self.tuning and self.tuning_logger: try: self.tuning_logger.log_tuning('processor', message, data) diff --git a/eveai_workers/processors/docx_processor.py b/eveai_workers/processors/docx_processor.py new file mode 100644 index 0000000..68b3c6e --- /dev/null +++ b/eveai_workers/processors/docx_processor.py @@ -0,0 +1,129 @@ +import docx +import io +from .base_processor import BaseProcessor +from .processor_registry import ProcessorRegistry +from common.extensions import minio_client +import re + + +class DocxProcessor(BaseProcessor): + def __init__(self, tenant, model_variables, document_version, catalog, processor): + super().__init__(tenant, model_variables, document_version, catalog, processor) + self.config = processor.configuration + self.extract_comments = self.config.get('extract_comments', False) + self.extract_headers_footers = self.config.get('extract_headers_footers', False) + self.preserve_formatting = self.config.get('preserve_formatting', True) + self.list_style = self.config.get('list_style', 'dash') + self.image_handling = self.config.get('image_handling', 'skip') + self.table_alignment = self.config.get('table_alignment', 'left') + + def process(self): + try: + file_data = minio_client.download_document_file( + self.tenant.id, + self.document_version.bucket_name, + self.document_version.object_name, + ) + + doc = docx.Document(io.BytesIO(file_data)) + markdown = self._convert_to_markdown(doc) + title = self._extract_title(doc) + + self._save_markdown(markdown) + return markdown, title + + except Exception as e: + self._log(f"Error processing DOCX: {str(e)}", level='error') + raise + + def _convert_to_markdown(self, doc): + markdown_parts = [] + + if self.extract_headers_footers: + for section in doc.sections: + if section.header.paragraphs: + markdown_parts.extend(self._process_paragraphs(section.header.paragraphs)) + + markdown_parts.extend(self._process_paragraphs(doc.paragraphs)) + + if self.extract_comments and doc.comments: + markdown_parts.append("\n## Comments\n") + for comment in doc.comments: + markdown_parts.append(f"> {comment.text}\n") + + return "\n".join(markdown_parts) + + def _process_paragraphs(self, paragraphs): + markdown_parts = [] + in_list = False + + for para in paragraphs: + if not para.text.strip(): + continue + + style = para.style.name.lower() + + if 'heading' in style: + level = int(style[-1]) if style[-1].isdigit() else 1 + markdown_parts.append(f"{'#' * level} {para.text}\n") + + elif para._p.pPr and para._p.pPr.numPr: # List item + marker = self._get_list_marker() + markdown_parts.append(f"{marker} {para.text}\n") + in_list = True + + else: + if in_list: + markdown_parts.append("\n") + in_list = False + + text = para.text + if self.preserve_formatting: + text = self._apply_formatting(para) + + markdown_parts.append(f"{text}\n") + + return markdown_parts + + def _get_list_marker(self): + return { + 'dash': '-', + 'asterisk': '*', + 'plus': '+' + }.get(self.list_style, '-') + + def _apply_formatting(self, paragraph): + text = paragraph.text + if not text: + return "" + + runs = paragraph.runs + formatted_parts = [] + + for run in runs: + part = run.text + if run.bold: + part = f"**{part}**" + if run.italic: + part = f"*{part}*" + if run.underline: + part = f"__{part}__" + formatted_parts.append(part) + + return "".join(formatted_parts) + + def _extract_title(self, doc): + if doc.paragraphs: + first_para = doc.paragraphs[0] + if 'heading' in first_para.style.name.lower(): + return first_para.text.strip() + + # Look for first Heading 1 in document + for para in doc.paragraphs: + if para.style.name.lower() == 'heading 1': + return para.text.strip() + + return "Untitled Document" + + +ProcessorRegistry.register("DOCX_PROCESSOR", DocxProcessor) \ No newline at end of file diff --git a/eveai_workers/processors/html_processor.py b/eveai_workers/processors/html_processor.py index e47553f..75e46fe 100644 --- a/eveai_workers/processors/html_processor.py +++ b/eveai_workers/processors/html_processor.py @@ -24,7 +24,7 @@ class HTMLProcessor(BaseProcessor): # Add verification logging self._log(f"HTML Processor initialized with tuning={self.tuning}") if self.tuning: - self._log_tuning("HTML Processor initialized", { + self.log_tuning("HTML Processor initialized", { "html_tags": self.html_tags, "html_end_tags": self.html_end_tags, "included_elements": self.html_included_elements, @@ -75,7 +75,7 @@ class HTMLProcessor(BaseProcessor): title = soup.find('title').get_text(strip=True) if soup.find('title') else '' self._log(f'Finished parsing HTML for tenant {self.tenant.id}') - self._log_tuning("_parse_html", {"extracted_html": extracted_html, "title": title}) + self.log_tuning("_parse_html", {"extracted_html": extracted_html, "title": title}) return extracted_html, title def _generate_markdown_from_html(self, html_content): @@ -96,7 +96,7 @@ class HTMLProcessor(BaseProcessor): input_html = {"html": chunk} markdown_chunk = chain.invoke(input_html) markdown_chunks.append(markdown_chunk) - self._log_tuning("_generate_markdown_from_html", {"chunk": chunk, "markdown_chunk": markdown_chunk}) + self.log_tuning("_generate_markdown_from_html", {"chunk": chunk, "markdown_chunk": markdown_chunk}) markdown = "\n\n".join(markdown_chunks) self._log(f'Finished generating markdown from HTML for tenant {self.tenant.id}') diff --git a/eveai_workers/processors/markdown_processor.py b/eveai_workers/processors/markdown_processor.py new file mode 100644 index 0000000..d23ff6d --- /dev/null +++ b/eveai_workers/processors/markdown_processor.py @@ -0,0 +1,48 @@ +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +import re +from langchain_core.runnables import RunnablePassthrough + +from common.extensions import minio_client +from common.utils.model_utils import create_language_template +from .base_processor import BaseProcessor +from common.utils.business_event_context import current_event +from .processor_registry import ProcessorRegistry + + +def _find_first_h1(markdown: str) -> str: + # Look for # Header (allowing spaces after #) + match = re.search(r'^#\s+(.+)$', markdown, re.MULTILINE) + return match.group(1).strip() if match else "" + + +class MarkdownProcessor(BaseProcessor): + def __init__(self, tenant, model_variables, document_version, catalog, processor): + super().__init__(tenant, model_variables, document_version, catalog, processor) + + self.chunk_size = catalog.max_chunk_size + self.chunk_overlap = 0 + self.tuning = self.processor.tuning + + def process(self): + self._log("Starting Markdown processing") + try: + file_data = minio_client.download_document_file( + self.tenant.id, + self.document_version.bucket_name, + self.document_version.object_name, + ) + + markdown = file_data.decode('utf-8') + title = _find_first_h1(markdown) + + self._save_markdown(markdown) + self._log("Finished processing Markdown") + return markdown, title + except Exception as e: + self._log(f"Error processing Markdown: {str(e)}", level='error') + raise + + +ProcessorRegistry.register("MARKDOWN_PROCESSOR", MarkdownProcessor) diff --git a/eveai_workers/processors/pdf_processor.py b/eveai_workers/processors/pdf_processor.py index 91e4b2d..c4dcbe5 100644 --- a/eveai_workers/processors/pdf_processor.py +++ b/eveai_workers/processors/pdf_processor.py @@ -57,7 +57,7 @@ class PDFProcessor(BaseProcessor): 'figures': self._extract_figures(page, page_num, figure_counter), 'tables': self._extract_tables(page) } - self._log_tuning("_extract_content", {"page_num": page_num, "page_content": page_content}) + self.log_tuning("_extract_content", {"page_num": page_num, "page_content": page_content}) figure_counter += len(page_content['figures']) extracted_content.append(page_content) @@ -119,7 +119,7 @@ class PDFProcessor(BaseProcessor): markdown_table = self._table_to_markdown(table) if markdown_table: # Only add non-empty tables tables.append(markdown_table) - self._log_tuning("_extract_tables", {"markdown_table": markdown_table}) + self.log_tuning("_extract_tables", {"markdown_table": markdown_table}) except Exception as e: self._log(f"Error extracting tables from page: {str(e)}", level='error') return tables diff --git a/eveai_workers/processors/transcription_processor.py b/eveai_workers/processors/transcription_processor.py index 8db2c82..3e3959a 100644 --- a/eveai_workers/processors/transcription_processor.py +++ b/eveai_workers/processors/transcription_processor.py @@ -45,7 +45,7 @@ class TranscriptionBaseProcessor(BaseProcessor): return text_splitter.split_text(transcription) def _process_chunks(self, chunks): - self._log_tuning("_process_chunks", {"Nr of Chunks": len(chunks)}) + self.log_tuning("_process_chunks", {"Nr of Chunks": len(chunks)}) llm = self.model_variables.get_llm() template = self.model_variables.get_template('transcript') language_template = create_language_template(template, self.document_version.language) @@ -64,7 +64,7 @@ class TranscriptionBaseProcessor(BaseProcessor): } markdown = chain.invoke(input_transcript) markdown = self._clean_markdown(markdown) - self._log_tuning("_process_chunks", { + self.log_tuning("_process_chunks", { "Chunk Number": f"{i + 1} of {len(chunks)}", "Chunk": chunk, "Previous Chunk": previous_part, diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index e1d0026..2ac5166 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -1,3 +1,4 @@ +import re from datetime import datetime as dt, timezone as tz from celery import states @@ -23,6 +24,8 @@ from common.utils.business_event_context import current_event from config.type_defs.processor_types import PROCESSOR_TYPES from eveai_workers.processors.processor_registry import ProcessorRegistry +from common.utils.config_field_types import json_to_pattern_list + # Healthcheck task @current_celery.task(name='ping', queue='embeddings') @@ -99,9 +102,13 @@ def create_embeddings(tenant_id, document_version_id): processor=processor ) markdown, title = document_processor.process() + document_processor.log_tuning("Processor returned: ", { + 'markdown': markdown, + 'title': title + }) with current_event.create_span("Embedding"): - embed_markdown(tenant, model_variables, document_version, catalog, markdown, title) + embed_markdown(tenant, model_variables, document_version, catalog, document_processor, markdown, title) current_event.log("Finished Embedding Creation Task") @@ -129,16 +136,19 @@ def delete_embeddings_for_document_version(document_version): raise -def embed_markdown(tenant, model_variables, document_version, catalog, markdown, title): +def embed_markdown(tenant, model_variables, document_version, catalog, processor, markdown, title): # Create potential chunks - potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") + potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, processor, markdown) + processor.log_tuning("Potential Chunks: ", {'potential chunks': potential_chunks}) # Combine chunks for embedding - chunks = combine_chunks_for_markdown(potential_chunks, catalog.min_chunk_size, catalog.max_chunk_size) + chunks = combine_chunks_for_markdown(potential_chunks, catalog.min_chunk_size, catalog.max_chunk_size, processor) + processor.log_tuning("Chunks: ", {'chunks': chunks}) # Enrich chunks with current_event.create_span("Enrich Chunks"): enriched_chunks = enrich_chunks(tenant, model_variables, document_version, title, chunks) + processor.log_tuning("Enriched Chunks: ", {'enriched_chunks': enriched_chunks}) # Create embeddings with current_event.create_span("Create Embeddings"): @@ -238,23 +248,17 @@ def embed_chunks(tenant, model_variables, document_version, chunks): return new_embeddings -def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): +def create_potential_chunks_for_markdown(tenant_id, document_version, processor, markdown): try: current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') - markdown_on = document_version.object_name.rsplit('.', 1)[0] + '.md' - - # Download the markdown file from MinIO - markdown_data = minio_client.download_document_file(tenant_id, - document_version.bucket_name, - markdown_on, - ) - markdown = markdown_data.decode('utf-8') + heading_level = processor.configuration.get('chunking_heading_level', 2) headers_to_split_on = [ - ("#", "Header 1"), - ("##", "Header 2"), + (f"{'#' * i}", f"Header {i}") for i in range(1, min(heading_level + 1, 7)) ] + processor.log_tuning('Headers to split on', {'header list: ': headers_to_split_on}) + markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) md_header_splits = markdown_splitter.split_text(markdown) potential_chunks = [doc.page_content for doc in md_header_splits] @@ -265,14 +269,61 @@ def create_potential_chunks_for_markdown(tenant_id, document_version, input_file raise -def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars): +def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars, processor): actual_chunks = [] current_chunk = "" current_length = 0 + def matches_chunking_pattern(text, patterns): + if not patterns: + return False + + # Get the first line of the text + first_line = text.split('\n', 1)[0].strip() + + # Check if it's a header at appropriate level + header_match = re.match(r'^(#{1,6})\s+(.+)$', first_line) + if not header_match: + return False + + # Get the heading level (number of #s) + header_level = len(header_match.group(1)) + # Get the header text + header_text = header_match.group(2) + + # Check if header matches any pattern + for pattern in patterns: + try: + processor.log_tuning('Pattern check: ', { + 'pattern: ': pattern, + 'text': header_text + }) + if re.search(pattern, header_text, re.IGNORECASE): + return True + except Exception as e: + current_app.logger.warning(f"Invalid regex pattern '{pattern}': {str(e)}") + continue + + return False + + chunking_patterns = json_to_pattern_list(processor.configuration.get('chunking_patterns', [])) + + processor.log_tuning(f'Chunking Patterns Extraction: ', { + 'Full Configuration': processor.configuration, + 'Chunking Patterns': chunking_patterns, + }) + for chunk in potential_chunks: chunk_length = len(chunk) + # Force new chunk if pattern matches + if chunking_patterns and matches_chunking_pattern(chunk, chunking_patterns): + if current_chunk and current_length >= min_chars: + actual_chunks.append(current_chunk) + current_chunk = chunk + current_length = chunk_length + continue + if current_length + chunk_length > max_chars: if current_length >= min_chars: actual_chunks.append(current_chunk) diff --git a/requirements.txt b/requirements.txt index 472c5a6..71c37c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -88,4 +88,5 @@ typing_extensions~=4.12.2 prometheus_flask_exporter~=0.23.1 prometheus_client~=0.20.0 babel~=2.16.0 -dogpile.cache~=1.3.3 \ No newline at end of file +dogpile.cache~=1.3.3 +python-docx~=1.1.2