diff --git a/common/langchain/llm_metrics_handler.py b/common/langchain/llm_metrics_handler.py index 8ccfe73..e9073b8 100644 --- a/common/langchain/llm_metrics_handler.py +++ b/common/langchain/llm_metrics_handler.py @@ -3,7 +3,6 @@ from langchain.callbacks.base import BaseCallbackHandler from typing import Dict, Any, List from langchain.schema import LLMResult from common.utils.business_event_context import current_event -from flask import current_app class LLMMetricsHandler(BaseCallbackHandler): diff --git a/common/langchain/persistent_llm_metrics_handler.py b/common/langchain/persistent_llm_metrics_handler.py new file mode 100644 index 0000000..ee11406 --- /dev/null +++ b/common/langchain/persistent_llm_metrics_handler.py @@ -0,0 +1,47 @@ +import time +from langchain.callbacks.base import BaseCallbackHandler +from typing import Dict, Any, List +from langchain.schema import LLMResult +from common.utils.business_event_context import current_event + + +class PersistentLLMMetricsHandler(BaseCallbackHandler): + """Metrics handler that allows metrics to be retrieved from within any call. In case metrics are required for other + purposes than business event logging.""" + + def __init__(self): + self.total_tokens: int = 0 + self.prompt_tokens: int = 0 + self.completion_tokens: int = 0 + self.start_time: float = 0 + self.end_time: float = 0 + self.total_time: float = 0 + + def reset(self): + self.total_tokens = 0 + self.prompt_tokens = 0 + self.completion_tokens = 0 + self.start_time = 0 + self.end_time = 0 + self.total_time = 0 + + def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> None: + self.start_time = time.time() + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + self.end_time = time.time() + self.total_time = self.end_time - self.start_time + + usage = response.llm_output.get('token_usage', {}) + self.prompt_tokens += usage.get('prompt_tokens', 0) + self.completion_tokens += usage.get('completion_tokens', 0) + self.total_tokens = self.prompt_tokens + self.completion_tokens + + def get_metrics(self) -> Dict[str, int | float]: + return { + 'total_tokens': self.total_tokens, + 'prompt_tokens': self.prompt_tokens, + 'completion_tokens': self.completion_tokens, + 'time_elapsed': self.total_time, + 'interaction_type': 'LLM', + } diff --git a/common/models/document.py b/common/models/document.py index 81fa2e0..9b7a2d8 100644 --- a/common/models/document.py +++ b/common/models/document.py @@ -34,6 +34,7 @@ class Processor(db.Model): catalog_id = db.Column(db.Integer, db.ForeignKey('catalog.id'), nullable=True) type = db.Column(db.String(50), nullable=False) sub_file_type = db.Column(db.String(50), nullable=True) + active = db.Column(db.Boolean, nullable=True, default=True) # Tuning enablers tuning = db.Column(db.Boolean, nullable=True, default=False) diff --git a/common/models/user.py b/common/models/user.py index ca1a9ab..a29ff5d 100644 --- a/common/models/user.py +++ b/common/models/user.py @@ -331,8 +331,8 @@ class TranslationCache(db.Model): context = db.Column(db.Text, nullable=True) # Translation cost - input_tokens = db.Column(db.Integer, nullable=False) - output_tokens = db.Column(db.Integer, nullable=False) + prompt_tokens = db.Column(db.Integer, nullable=False) + completion_tokens = db.Column(db.Integer, nullable=False) # Tracking created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now()) diff --git a/common/services/utils/translation_services.py b/common/services/utils/translation_services.py new file mode 100644 index 0000000..d35fae0 --- /dev/null +++ b/common/services/utils/translation_services.py @@ -0,0 +1,43 @@ +import xxhash +import json + +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.runnables import RunnablePassthrough + +from common.langchain.persistent_llm_metrics_handler import PersistentLLMMetricsHandler +from common.utils.model_utils import get_template, replace_variable_in_template + +class TranslationService: + def __init__(self, tenant_id): + self.tenant_id = tenant_id + + def translate_text(self, text_to_translate: str, target_lang: str, source_lang: str = None, context: str = None) -> tuple[ + str, dict[str, int | float]]: + prompt_params = { + "text_to_translate": text_to_translate, + "target_lang": target_lang, + } + if context: + template, llm = get_template("translation_with_context") + prompt_params["context"] = context + else: + template, llm = get_template("translation_without_context") + + # Add a metrics handler to capture usage + + metrics_handler = PersistentLLMMetricsHandler() + existing_callbacks = llm.callbacks + llm.callbacks = existing_callbacks + [metrics_handler] + + translation_prompt = ChatPromptTemplate.from_template(template) + + setup = RunnablePassthrough() + + chain = (setup | translation_prompt | llm | StrOutputParser()) + + translation = chain.invoke(prompt_params) + + metrics = metrics_handler.get_metrics() + + return translation, metrics \ No newline at end of file diff --git a/common/utils/cache/translation_cache.py b/common/utils/cache/translation_cache.py new file mode 100644 index 0000000..5e1e409 --- /dev/null +++ b/common/utils/cache/translation_cache.py @@ -0,0 +1,156 @@ +import json +from typing import Dict, Any, Optional +from datetime import datetime as dt, timezone as tz + +import xxhash +from flask import current_app +from sqlalchemy import and_ +from sqlalchemy.inspection import inspect + +from common.utils.cache.base import CacheHandler, T +from common.extensions import db + +from common.models.user import TranslationCache +from common.services.utils.translation_services import TranslationService +from flask_security import current_user + + +class TranslationCacheHandler(CacheHandler[TranslationCache]): + """Handles caching of translations with fallback to database and external translation service""" + handler_name = 'translation_cache' + + def __init__(self, region): + super().__init__(region, 'translation') + self.configure_keys('hash_key') + + def _to_cache_data(self, instance: TranslationCache) -> Dict[str, Any]: + """Convert TranslationCache instance to cache data using SQLAlchemy inspection""" + if not instance: + return {} + + mapper = inspect(TranslationCache) + data = {} + + for column in mapper.columns: + value = getattr(instance, column.name) + + # Handle date serialization + if isinstance(value, dt): + data[column.name] = value.isoformat() + else: + data[column.name] = value + + return data + + def _from_cache_data(self, data: Dict[str, Any], **kwargs) -> TranslationCache: + if not data: + return None + + # Create a new TranslationCache instance + translation = TranslationCache() + mapper = inspect(TranslationCache) + + # Set all attributes dynamically + for column in mapper.columns: + if column.name in data: + value = data[column.name] + + # Handle date deserialization + if column.name.endswith('_date') and value: + if isinstance(value, str): + value = dt.fromisoformat(value).date() + + setattr(translation, column.name, value) + + return translation + + def _should_cache(self, value: TranslationCache) -> bool: + """Validate if the translation should be cached""" + return value is not None and value.cache_key is not None + + def get_translation(self, text: str, target_lang: str, source_lang:str=None, context: str=None) -> Optional[TranslationCache]: + """ + Get the translation for a text in a specific language + + Args: + text: The text to be translated + target_lang: The target language for the translation + source_lang: The source language of the text to be translated + context: Optional context for the translation + + Returns: + TranslationCache instance if found, None otherwise + """ + + def creator_func(text: str, target_lang: str, source_lang: str=None, context: str=None) -> Optional[TranslationCache]: + # Generate cache key based on inputs + cache_key = self._generate_cache_key(text, target_lang, source_lang, context) + + # Check if translation already exists in database + existing_translation = db.session.query(TranslationCache).filter_by(cache_key=cache_key).first() + + if existing_translation: + # Update last used timestamp + existing_translation.last_used_at = dt.now(tz=tz.utc) + db.session.commit() + return existing_translation + + # Translation not found in DB, need to create it + # Initialize translation service + translation_service = TranslationService(getattr(current_app, 'tenant_id', None)) + + # Get the translation and metrics + translated_text, metrics = translation_service.translate_text( + text_to_translate=text, + target_lang=target_lang, + source_lang=source_lang, + context=context + ) + + # Create new translation cache record + new_translation = TranslationCache( + cache_key=cache_key, + source_text=text, + translated_text=translated_text, + source_language=source_lang or 'auto', + target_language=target_lang, + context=context, + prompt_tokens=metrics.get('prompt_tokens', 0), + completion_tokens=metrics.get('completion_tokens', 0), + created_at=dt.now(tz=tz.utc), + created_by=getattr(current_user, 'id', None) if 'current_user' in globals() else None, + updated_at=dt.now(tz=tz.utc), + updated_by=getattr(current_user, 'id', None) if 'current_user' in globals() else None, + last_used_at=dt.now(tz=tz.utc) + ) + + # Save to database + db.session.add(new_translation) + db.session.commit() + + return new_translation + + return self.get(creator_func, text=text, target_lang=target_lang, source_lang=source_lang, context=context) + + def invalidate_tenant_translations(self, tenant_id: int): + """Invalidate cached translations for specific tenant""" + self.invalidate(tenant_id=tenant_id) + + def _generate_cache_key(self, text: str, target_lang: str, source_lang: str = None, context: str = None) -> str: + """Generate cache key for a translation""" + cache_data = { + "text": text.strip(), + "target_lang": target_lang.lower(), + "source_lang": source_lang.lower() if source_lang else None, + "context": context.strip() if context else None, + } + + cache_string = json.dumps(cache_data, sort_keys=True, ensure_ascii=False) + return xxhash.xxh64(cache_string.encode('utf-8')).hexdigest() + +def register_translation_cache_handlers(cache_manager) -> None: + """Register translation cache handlers with cache manager""" + cache_manager.register_handler( + TranslationCacheHandler, + 'eveai_model' # Use existing eveai_model region + ) diff --git a/common/utils/document_utils.py b/common/utils/document_utils.py index a07b910..79d02b1 100644 --- a/common/utils/document_utils.py +++ b/common/utils/document_utils.py @@ -3,7 +3,7 @@ from datetime import datetime as dt, timezone as tz from sqlalchemy import desc from sqlalchemy.exc import SQLAlchemyError from werkzeug.utils import secure_filename -from common.models.document import Document, DocumentVersion, Catalog +from common.models.document import Document, DocumentVersion, Catalog, Processor from common.extensions import db, minio_client from common.utils.celery_utils import current_celery from flask import current_app @@ -11,6 +11,7 @@ import requests from urllib.parse import urlparse, unquote, urlunparse, parse_qs import os +from config.type_defs.processor_types import PROCESSOR_TYPES from .config_field_types import normalize_json_field from .eveai_exceptions import (EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, EveAIInvalidCatalog, EveAIInvalidDocument, EveAIInvalidDocumentVersion, EveAIException) @@ -469,3 +470,15 @@ def lookup_document(tenant_id: int, lookup_criteria: dict, metadata_type: str) - "Error during document lookup", status_code=500 ) + +def is_file_type_supported_by_catalog(catalog_id, file_type): + processors = Processor.query.filter_by(catalog_id=catalog_id).filter_by(active=True).all() + + supported_file_types = [] + for processor in processors: + processor_file_types = PROCESSOR_TYPES[processor.type]['file_types'] + file_types = [f.strip() for f in processor_file_types.split(",")] + supported_file_types.extend(file_types) + + if file_type not in supported_file_types: + raise EveAIUnsupportedFileType() \ No newline at end of file diff --git a/common/utils/eveai_exceptions.py b/common/utils/eveai_exceptions.py index ff6793b..f12f66c 100644 --- a/common/utils/eveai_exceptions.py +++ b/common/utils/eveai_exceptions.py @@ -34,7 +34,25 @@ class EveAIDoubleURLException(EveAIException): class EveAIUnsupportedFileType(EveAIException): """Raised when an invalid file type is provided""" - def __init__(self, message="Filetype is not supported", status_code=400, payload=None): + def __init__(self, message="Filetype is not supported by current active processors", status_code=400, payload=None): + super().__init__(message, status_code, payload) + + +class EveAINoProcessorFound(EveAIException): + """Raised when no processor is found for a given file type""" + + def __init__(self, catalog_id, file_type, file_subtype, status_code=400, payload=None): + message = f"No active processor found for catalog {catalog_id} with file type {file_type} and subtype {file_subtype}" + super().__init__(message, status_code, payload) + + +class EveAINoContentFound(EveAIException): + """Raised when no content is found for a given document""" + + def __init__(self, document_id, document_version_id, status_code=400, payload=None): + self.document_id = document_id + self.document_version_id = document_version_id + message = f"No content found while processing Document with ID {document_id} and version {document_version_id}." super().__init__(message, status_code, payload) diff --git a/common/utils/translation_utils.py b/common/utils/translation_utils.py deleted file mode 100644 index 67ddafe..0000000 --- a/common/utils/translation_utils.py +++ /dev/null @@ -1,21 +0,0 @@ -import xxhash -import json - -from common.utils.model_utils import get_template, replace_variable_in_template - - -def generate_cache_key(text: str, target_lang: str, source_lang: str = None, context: str = None) -> str: - cache_data = { - "text": text.strip(), - "target_lang": target_lang.lower(), - "source_lang": source_lang.lower() if source_lang else None, - "context": context.strip() if context else "" - } - - cache_string = json.dumps(cache_data, sort_keys=True, ensure_ascii=False) - return xxhash.xxh64(cache_string.encode('utf-8')).hexdigest() - -def translate_text(text: str, target_lang: str, source_lang: str = None, context: str = None) -> str: - if context: - prompt_text = get_template("translation_with_context") - prompt_text = replace_variable_in_template(prompt_text, "context", context) \ No newline at end of file diff --git a/config/processors/globals/AUTOMAGIC_HTML_PROCESSOR/1.0.0.yaml b/config/processors/globals/AUTOMAGIC_HTML_PROCESSOR/1.0.0.yaml new file mode 100644 index 0000000..a9c5d60 --- /dev/null +++ b/config/processors/globals/AUTOMAGIC_HTML_PROCESSOR/1.0.0.yaml @@ -0,0 +1,14 @@ +version: "1.0.0" +name: "HTML Processor" +file_types: "html" +description: "A processor for HTML files, driven by AI" +configuration: + custom_instructions: + name: "Custom Instructions" + description: "Some custom instruction to guide our AI agent in parsing your HTML file" + type: "text" + required: false +metadata: + author: "Josako" + date_added: "2025-06-25" + description: "A processor for HTML files, driven by AI" \ No newline at end of file diff --git a/config/prompts/globals/automagic_html_parse/1.0.0.yaml b/config/prompts/globals/automagic_html_parse/1.0.0.yaml new file mode 100644 index 0000000..68ddca7 --- /dev/null +++ b/config/prompts/globals/automagic_html_parse/1.0.0.yaml @@ -0,0 +1,30 @@ +version: "1.0.0" +content: | + You are a top administrative assistant specialized in transforming given HTML into markdown formatted files. The + generated files will be used to generate embeddings in a RAG-system. + + # Best practices are: + - Respect wordings and language(s) used in the HTML. + - The following items need to be considered: headings, paragraphs, listed items (numbered or not) and tables. Images can be neglected. + - Sub-headers can be used as lists. This is true when a header is followed by a series of sub-headers without content (paragraphs or listed items). Present those sub-headers as a list. + - Be careful of encoding of the text. Everything needs to be human readable. + + You only return relevant information, and filter out non-relevant information, such as: + - information found in menu bars, sidebars, footers or headers + - information in forms, buttons + + Process the file or text carefully, and take a stepped approach. The resulting markdown should be the result of the + processing of the complete input html file. Answer with the pure markdown, without any other text. + + {custom_instructions} + + HTML to be processed is in between triple backquotes. + + ```{html}``` + +llm_model: "mistral.mistral-small-latest" +metadata: + author: "Josako" + date_added: "2025-06-25" + description: "An aid in transforming HTML-based inputs to markdown, fully automatic" + changes: "Initial version" \ No newline at end of file diff --git a/config/prompts/globals/translation_with_context/1.0.0.yaml b/config/prompts/globals/translation_with_context/1.0.0.yaml index 3fd5d8d..b7c5be5 100644 --- a/config/prompts/globals/translation_with_context/1.0.0.yaml +++ b/config/prompts/globals/translation_with_context/1.0.0.yaml @@ -7,7 +7,7 @@ content: > I only want you to return the translation. No explanation, no options. I need to be able to directly use your answer without further interpretation. If more than one option is available, present me with the most probable one. - +llm_model: "mistral.ministral-8b-latest" metadata: author: "Josako" date_added: "2025-06-23" diff --git a/config/prompts/globals/translation_without_context/1.0.0.yaml b/config/prompts/globals/translation_without_context/1.0.0.yaml index 1eece0b..08d2990 100644 --- a/config/prompts/globals/translation_without_context/1.0.0.yaml +++ b/config/prompts/globals/translation_without_context/1.0.0.yaml @@ -4,7 +4,7 @@ content: > I only want you to return the translation. No explanation, no options. I need to be able to directly use your answer without further interpretation. If more than one option is available, present me with the most probable one. - +llm_model: "mistral.ministral-8b-latest" metadata: author: "Josako" date_added: "2025-06-23" diff --git a/config/type_defs/processor_types.py b/config/type_defs/processor_types.py index 4ac0eac..c8cc479 100644 --- a/config/type_defs/processor_types.py +++ b/config/type_defs/processor_types.py @@ -24,5 +24,10 @@ PROCESSOR_TYPES = { "name": "DOCX Processor", "description": "A processor for DOCX files", "file_types": "docx", - } + }, + "AUTOMAGIC_HTML_PROCESSOR": { + "name": "AutoMagic HTML Processor", + "description": "A processor for HTML files, driven by AI", + "file_types": "html, htm", + }, } diff --git a/docker/compose_dev.yaml b/docker/compose_dev.yaml index 289bf3c..30f5c05 100644 --- a/docker/compose_dev.yaml +++ b/docker/compose_dev.yaml @@ -24,7 +24,7 @@ x-common-variables: &common-variables FLOWER_PASSWORD: 'Jungles' OPENAI_API_KEY: 'sk-proj-8R0jWzwjL7PeoPyMhJTZT3BlbkFJLb6HfRB2Hr9cEVFWEhU7' GROQ_API_KEY: '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' - MISTRAL_API_KEY: 'jGDc6fkCbt0iOC0jQsbuZhcjLWBPGc2b' + MISTRAL_API_KEY: '0f4ZiQ1kIpgIKTHX8d0a8GOD2vAgVqEn' ANTHROPIC_API_KEY: 'sk-ant-api03-c2TmkzbReeGhXBO5JxNH6BJNylRDonc9GmZd0eRbrvyekec2' JWT_SECRET_KEY: 'bsdMkmQ8ObfMD52yAFg4trrvjgjMhuIqg2fjDpD/JqvgY0ccCcmlsEnVFmR79WPiLKEA3i8a5zmejwLZKl4v9Q==' API_ENCRYPTION_KEY: 'xfF5369IsredSrlrYZqkM9ZNrfUASYYS6TCcAR9UKj4=' diff --git a/docker/compose_test.yaml b/docker/compose_test.yaml index 03a5f80..7aedc10 100644 --- a/docker/compose_test.yaml +++ b/docker/compose_test.yaml @@ -26,7 +26,7 @@ x-common-variables: &common-variables REDIS_PORT: '6379' FLOWER_USER: 'Felucia' FLOWER_PASSWORD: 'Jungles' - MISTRAL_API_KEY: 'Vkwgr67vUs6ScKmcFF2QVw7uHKgq0WEN' + MISTRAL_API_KEY: 'qunKSaeOkFfLteNiUO77RCsXXSLK65Ec' JWT_SECRET_KEY: '7e9c8b3a215f4d6e90712c5d8f3b97a60e482c15f39a7d68bcd45910ef23a784' API_ENCRYPTION_KEY: 'kJ7N9p3IstyRGkluYTryM8ZMnfUBSXWR3TCfDG9VLc4=' MINIO_ENDPOINT: minio:9000 diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index d3c8f0a..54bb883 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -201,8 +201,3 @@ def register_cache_handlers(app): register_specialist_cache_handlers(cache_manager) from common.utils.cache.license_cache import register_license_cache_handlers register_license_cache_handlers(cache_manager) - - - - - diff --git a/eveai_app/templates/document/document_versions.html b/eveai_app/templates/document/document_versions.html index 9ddc7f0..a29c563 100644 --- a/eveai_app/templates/document/document_versions.html +++ b/eveai_app/templates/document/document_versions.html @@ -4,13 +4,13 @@ {% block title %}Document Versions{% endblock %} {% block content_title %}Document Versions{% endblock %} -{% block content_description %}View Versions for {{ document }}{% endblock %} +{% block content_description %}View Versions for Document {{ document }}{% endblock %} {% block content_class %}
{% endblock %} {% block content %}
- {{ render_selectable_table(headers=["ID", "URL", "Object Name", "File Type", "Process.", "Proces. Start", "Proces. Finish", "Proces. Error"], rows=rows, selectable=True, id="versionsTable") }} + {{ render_selectable_table(headers=["ID", "File Type", "File Size", "Process.", "Proces. Start", "Proces. Finish", "Proces. Error"], rows=rows, selectable=True, id="versionsTable") }}
diff --git a/eveai_app/templates/document/documents.html b/eveai_app/templates/document/documents.html index 8dcc1dc..fa0c29f 100644 --- a/eveai_app/templates/document/documents.html +++ b/eveai_app/templates/document/documents.html @@ -4,14 +4,13 @@ {% block title %}Documents{% endblock %} {% block content_title %}Documents{% endblock %} -{% block content_description %}View Documents for Tenant{% endblock %} +{% block content_description %}View Documents for Catalog {% if session.catalog_name %}{{ session.catalog_name }}{% else %}No Catalog{% endif %}{% endblock %} {% block content_class %}
{% endblock %} {% block content %} {% set filter_form %} - {{ render_filter_field('catalog_id', 'Catalog', filter_options['catalog_id'], filters.get('catalog_id', [])) }} {{ render_filter_field('validity', 'Validity', filter_options['validity'], filters.get('validity', [])) }} @@ -27,7 +26,6 @@ headers=[ {"text": "ID", "sort": "id"}, {"text": "Name", "sort": "name"}, - {"text": "Catalog", "sort": "catalog_name"}, {"text": "Valid From", "sort": "valid_from"}, {"text": "Valid To", "sort": "valid_to"} ], diff --git a/eveai_app/templates/document/edit_processor.html b/eveai_app/templates/document/edit_processor.html index 4118994..849e5af 100644 --- a/eveai_app/templates/document/edit_processor.html +++ b/eveai_app/templates/document/edit_processor.html @@ -4,7 +4,7 @@ {% block title %}Edit Processor{% endblock %} {% block content_title %}Edit Processor{% endblock %} -{% block content_description %}Edit a Processor (for a Catalog){% endblock %} +{% block content_description %}Edit Processor for Catalog {% if session.catalog_name %}{{ session.catalog_name }}{% else %}No Catalog{% endif %}{% endblock %} {% block content %} diff --git a/eveai_app/templates/document/edit_retriever.html b/eveai_app/templates/document/edit_retriever.html index 3731c98..0bbee3b 100644 --- a/eveai_app/templates/document/edit_retriever.html +++ b/eveai_app/templates/document/edit_retriever.html @@ -4,7 +4,7 @@ {% block title %}Edit Retriever{% endblock %} {% block content_title %}Edit Retriever{% endblock %} -{% block content_description %}Edit a Retriever (for a Catalog){% endblock %} +{% block content_description %}Edit a Retriever for catalog {% if session.catalog_name %}{{ session.catalog_name }}{% else %}No Catalog{% endif %}{% endblock %} {% block content %} diff --git a/eveai_app/templates/document/processor.html b/eveai_app/templates/document/processor.html index bc57f5c..b8f1137 100644 --- a/eveai_app/templates/document/processor.html +++ b/eveai_app/templates/document/processor.html @@ -4,7 +4,7 @@ {% block title %}Processor Registration{% endblock %} {% block content_title %}Register Processor{% endblock %} -{% block content_description %}Define a new processor (for a catalog){% endblock %} +{% block content_description %}Define a new processor for Catalog {% if session.catalog_name %}{{ session.catalog_name }}{% else %}No Catalog{% endif %}{% endblock %} {% block content %} diff --git a/eveai_app/templates/document/processors.html b/eveai_app/templates/document/processors.html index a2dfeea..07555fa 100644 --- a/eveai_app/templates/document/processors.html +++ b/eveai_app/templates/document/processors.html @@ -4,13 +4,13 @@ {% block title %}Processors{% endblock %} {% block content_title %}Processors{% endblock %} -{% block content_description %}View Processors for Tenant{% endblock %} +{% block content_description %}View Processors for Catalog {% if session.catalog_name %}{{ session.catalog_name }}{% else %}No Catalog{% endif %}{% endblock %} {% block content_class %}
{% endblock %} {% block content %}
- {{ render_selectable_table(headers=["Processor ID", "Name", "Type", "Catalog ID"], rows=rows, selectable=True, id="retrieversTable") }} + {{ render_selectable_table(headers=["Processor ID", "Name", "Type", "Active"], rows=rows, selectable=True, id="retrieversTable") }}
diff --git a/eveai_app/templates/document/retriever.html b/eveai_app/templates/document/retriever.html index 9a90349..761b120 100644 --- a/eveai_app/templates/document/retriever.html +++ b/eveai_app/templates/document/retriever.html @@ -4,7 +4,7 @@ {% block title %}Retriever Registration{% endblock %} {% block content_title %}Register Retriever{% endblock %} -{% block content_description %}Define a new retriever (for a catalog){% endblock %} +{% block content_description %}Define a new retriever for Catalog {% if session.catalog_name %}{{ session.catalog_name }}{% else %}No Catalog{% endif %}{% endblock %} {% block content %} diff --git a/eveai_app/templates/document/retrievers.html b/eveai_app/templates/document/retrievers.html index 73fadac..a8fe208 100644 --- a/eveai_app/templates/document/retrievers.html +++ b/eveai_app/templates/document/retrievers.html @@ -4,13 +4,13 @@ {% block title %}Retrievers{% endblock %} {% block content_title %}Retrievers{% endblock %} -{% block content_description %}View Retrievers for Tenant{% endblock %} +{% block content_description %}View Retrievers for Catalog {% if session.catalog_name %}{{ session.catalog_name }}{% else %}No Catalog{% endif %}{% endblock %} {% block content_class %}
{% endblock %} {% block content %}
- {{ render_selectable_table(headers=["Retriever ID", "Name", "Type", "Catalog ID"], rows=rows, selectable=True, id="retrieversTable") }} + {{ render_selectable_table(headers=["Retriever ID", "Name", "Type"], rows=rows, selectable=True, id="retrieversTable") }}
diff --git a/eveai_app/views/document_forms.py b/eveai_app/views/document_forms.py index 8f01459..8c90a06 100644 --- a/eveai_app/views/document_forms.py +++ b/eveai_app/views/document_forms.py @@ -71,15 +71,6 @@ class ProcessorForm(FlaskForm): name = StringField('Name', validators=[DataRequired(), Length(max=50)]) description = TextAreaField('Description', validators=[Optional()]) - # Catalog for the Retriever - catalog = QuerySelectField( - 'Catalog ID', - query_factory=lambda: Catalog.query.all(), - allow_blank=True, - get_label='name', - validators=[DataRequired()], - ) - # Select Field for Catalog Type (Uses the CATALOG_TYPES defined in config) type = SelectField('Processor Type', validators=[DataRequired()]) @@ -89,6 +80,7 @@ class ProcessorForm(FlaskForm): default=2000) max_chunk_size = IntegerField('Maximum Chunk Size (3000)', validators=[NumberRange(min=0), Optional()], default=3000) + active = BooleanField('Active', default=True) tuning = BooleanField('Enable Embedding Tuning', default=False) # Metadata fields @@ -108,14 +100,6 @@ class EditProcessorForm(DynamicFormBase): name = StringField('Name', validators=[DataRequired(), Length(max=50)]) description = TextAreaField('Description', validators=[Optional()]) - # Catalog for the Retriever - catalog = QuerySelectField( - 'Catalog ID', - query_factory=lambda: Catalog.query.all(), - allow_blank=True, - get_label='name', - validators=[Optional()], - ) type = StringField('Processor Type', validators=[DataRequired()], render_kw={'readonly': True}) sub_file_type = StringField('Sub File Type', validators=[Optional(), Length(max=50)]) @@ -124,6 +108,7 @@ class EditProcessorForm(DynamicFormBase): default=2000) max_chunk_size = IntegerField('Maximum Chunk Size (3000)', validators=[NumberRange(min=0), Optional()], default=3000) + active = BooleanField('Active', default=True) tuning = BooleanField('Enable Embedding Tuning', default=False) # Metadata fields @@ -134,14 +119,7 @@ class EditProcessorForm(DynamicFormBase): class RetrieverForm(FlaskForm): name = StringField('Name', validators=[DataRequired(), Length(max=50)]) description = TextAreaField('Description', validators=[Optional()]) - # Catalog for the Retriever - catalog = QuerySelectField( - 'Catalog ID', - query_factory=lambda: Catalog.query.all(), - allow_blank=True, - get_label='name', - validators=[Optional()], - ) + # Select Field for Retriever Type (Uses the RETRIEVER_TYPES defined in config) type = SelectField('Retriever Type', validators=[DataRequired()]) tuning = BooleanField('Enable Tuning', default=False) @@ -160,14 +138,7 @@ class RetrieverForm(FlaskForm): class EditRetrieverForm(DynamicFormBase): name = StringField('Name', validators=[DataRequired(), Length(max=50)]) description = TextAreaField('Description', validators=[Optional()]) - # Catalog for the Retriever - catalog = QuerySelectField( - 'Catalog ID', - query_factory=lambda: Catalog.query.all(), - allow_blank=True, - get_label='name', - validators=[Optional()], - ) + # Select Field for Retriever Type (Uses the RETRIEVER_TYPES defined in config) type = StringField('Processor Type', validators=[DataRequired()], render_kw={'readonly': True}) tuning = BooleanField('Enable Tuning', default=False) diff --git a/eveai_app/views/document_list_view.py b/eveai_app/views/document_list_view.py index 5ad3490..c52513b 100644 --- a/eveai_app/views/document_list_view.py +++ b/eveai_app/views/document_list_view.py @@ -1,5 +1,5 @@ -from datetime import datetime -from flask import request, render_template, session +from datetime import datetime as dt, timezone as tz +from flask import request, render_template, session, current_app from sqlalchemy import desc, asc, or_, and_, cast, Integer from common.models.document import Document, Catalog from common.utils.filtered_list_view import FilteredListView @@ -7,31 +7,19 @@ from common.utils.view_assistants import prepare_table_for_macro class DocumentListView(FilteredListView): - allowed_filters = ['catalog_id', 'validity'] - allowed_sorts = ['id', 'name', 'catalog_name', 'valid_from', 'valid_to'] + allowed_filters = ['validity'] + allowed_sorts = ['id', 'name', 'valid_from', 'valid_to'] def get_query(self): - return Document.query.join(Catalog).add_columns( - Document.id, - Document.name, - Catalog.name.label('catalog_name'), - Document.valid_from, - Document.valid_to - ) + catalog_id = session.get('catalog_id') + current_app.logger.debug(f"Catalog ID: {catalog_id}") + return Document.query.filter_by(catalog_id=catalog_id) def apply_filters(self, query): filters = request.args.to_dict(flat=False) - if 'catalog_id' in filters: - catalog_ids = filters['catalog_id'] - if catalog_ids: - # Convert catalog_ids to a list of integers - catalog_ids = [int(cid) for cid in catalog_ids if cid.isdigit()] - if catalog_ids: - query = query.filter(Document.catalog_id.in_(catalog_ids)) - if 'validity' in filters: - now = datetime.utcnow().date() + now = dt.now(tz.utc).date() if 'valid' in filters['validity']: query = query.filter( and_( @@ -47,10 +35,7 @@ class DocumentListView(FilteredListView): sort_order = request.args.get('sort_order', 'asc') if sort_by in self.allowed_sorts: - if sort_by == 'catalog_name': - column = Catalog.name - else: - column = getattr(Document, sort_by) + column = getattr(Document, sort_by) if sort_order == 'asc': query = query.order_by(asc(column)) @@ -61,42 +46,39 @@ class DocumentListView(FilteredListView): def get(self): query = self.get_query() - query = self.apply_filters(query) - query = self.apply_sorting(query) + # query = self.apply_filters(query) + # query = self.apply_sorting(query) pagination = self.paginate(query) def format_date(date): - if isinstance(date, datetime): + if isinstance(date, dt): return date.strftime('%Y-%m-%d') elif isinstance(date, str): return date else: return '' + current_app.logger.debug(f"Items retrieved: {pagination.items}") rows = [ [ {'value': item.id, 'class': '', 'type': 'text'}, {'value': item.name, 'class': '', 'type': 'text'}, - {'value': item.catalog_name, 'class': '', 'type': 'text'}, {'value': format_date(item.valid_from), 'class': '', 'type': 'text'}, {'value': format_date(item.valid_to), 'class': '', 'type': 'text'} ] for item in pagination.items ] - catalogs = Catalog.query.all() - context = { 'rows': rows, 'pagination': pagination, 'filters': request.args.to_dict(flat=False), 'sort_by': request.args.get('sort_by', 'id'), 'sort_order': request.args.get('sort_order', 'asc'), - 'filter_options': self.get_filter_options(catalogs) + 'filter_options': self.get_filter_options() } return render_template(self.template, **context) - def get_filter_options(self, catalogs): + def get_filter_options(self): return { - 'catalog_id': [(str(cat.id), cat.name) for cat in catalogs], 'validity': [('valid', 'Valid'), ('all', 'All')] - } \ No newline at end of file + } diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index 2cc88ea..410cba2 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -16,7 +16,7 @@ from common.extensions import db, cache_manager, minio_client from common.models.interaction import Specialist, SpecialistRetriever from common.utils.document_utils import create_document_stack, start_embedding_task, process_url, \ edit_document, \ - edit_document_version, refresh_document, clean_url + edit_document_version, refresh_document, clean_url, is_file_type_supported_by_catalog from common.utils.dynamic_field_utils import create_default_config_from_type_config from common.utils.eveai_exceptions import EveAIInvalidLanguageException, EveAIUnsupportedFileType, \ EveAIDoubleURLException, EveAIException @@ -110,7 +110,6 @@ def handle_catalog_selection(): current_app.logger.info(f'Setting session catalog to {catalog.name}') session['catalog_id'] = catalog_id session['catalog_name'] = catalog.name - current_app.logger.info(f'Finished setting session catalog to {catalog.name}') elif action == 'edit_catalog': return redirect(prefixed_url_for('document_bp.edit_catalog', catalog_id=catalog_id)) @@ -157,7 +156,7 @@ def processor(): tenant_id = session.get('tenant').get('id') new_processor = Processor() form.populate_obj(new_processor) - new_processor.catalog_id = form.catalog.data.id + new_processor.catalog_id = session.get('catalog_id') processor_config = cache_manager.processors_config_cache.get_config(new_processor.type) new_processor.configuration = create_default_config_from_type_config( processor_config["configuration"]) @@ -204,9 +203,6 @@ def edit_processor(processor_id): form.populate_obj(processor) processor.configuration = form.get_dynamic_data('configuration') - # Update catalog relationship - processor.catalog_id = form.catalog.data.id if form.catalog.data else None - # Update logging information update_logging_information(processor, dt.now(tz.utc)) @@ -235,14 +231,19 @@ def processors(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) - query = Processor.query.order_by(Processor.id) + catalog_id = session.get('catalog_id', None) + if not catalog_id: + flash('You need to set a Session Catalog before adding Documents or URLs', 'warning') + return redirect(prefixed_url_for('document_bp.catalogs')) + + query = Processor.query.filter_by(catalog_id=catalog_id).order_by(Processor.id) pagination = query.paginate(page=page, per_page=per_page) the_processors = pagination.items # prepare table data rows = prepare_table_for_macro(the_processors, - [('id', ''), ('name', ''), ('type', ''), ('catalog_id', '')]) + [('id', ''), ('name', ''), ('type', ''), ('active', '')]) # Render the catalogs in a template return render_template('document/processors.html', rows=rows, pagination=pagination) @@ -272,7 +273,7 @@ def retriever(): tenant_id = session.get('tenant').get('id') new_retriever = Retriever() form.populate_obj(new_retriever) - new_retriever.catalog_id = form.catalog.data.id + new_retriever.catalog_id = session.get('catalog_id') new_retriever.type_version = cache_manager.retrievers_version_tree_cache.get_latest_version( new_retriever.type) @@ -301,12 +302,6 @@ def edit_retriever(retriever_id): # Get the retriever or return 404 retriever = Retriever.query.get_or_404(retriever_id) - if retriever.catalog_id: - # If catalog_id is just an ID, fetch the Catalog object - retriever.catalog = Catalog.query.get(retriever.catalog_id) - else: - retriever.catalog = None - # Create form instance with the retriever form = EditRetrieverForm(request.form, obj=retriever) @@ -319,9 +314,6 @@ def edit_retriever(retriever_id): form.populate_obj(retriever) retriever.configuration = form.get_dynamic_data('configuration') - # Update catalog relationship - retriever.catalog_id = form.catalog.data.id if form.catalog.data else None - # Update logging information update_logging_information(retriever, dt.now(tz.utc)) @@ -350,14 +342,19 @@ def retrievers(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) - query = Retriever.query.order_by(Retriever.id) + catalog_id = session.get('catalog_id', None) + if not catalog_id: + flash('You need to set a Session Catalog before adding Documents or URLs', 'warning') + return redirect(prefixed_url_for('document_bp.catalogs')) + + query = Retriever.query.filter_by(catalog_id=catalog_id).order_by(Retriever.id) pagination = query.paginate(page=page, per_page=per_page) the_retrievers = pagination.items # prepare table data rows = prepare_table_for_macro(the_retrievers, - [('id', ''), ('name', ''), ('type', ''), ('catalog_id', '')]) + [('id', ''), ('name', ''), ('type', '')]) # Render the catalogs in a template return render_template('document/retrievers.html', rows=rows, pagination=pagination) @@ -400,6 +397,8 @@ def add_document(): filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() + is_file_type_supported_by_catalog(catalog_id, extension) + catalog_properties = form.get_dynamic_data("tagging_fields") api_input = { @@ -451,6 +450,8 @@ def add_url(): file_content, filename, extension = process_url(url, tenant_id) + is_file_type_supported_by_catalog(catalog_id, extension) + catalog_properties = {} full_config = cache_manager.catalogs_config_cache.get_config(catalog.type) document_version_configurations = full_config['document_version_configurations'] @@ -489,6 +490,11 @@ def add_url(): @document_bp.route('/documents', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def documents(): + catalog_id = session.get('catalog_id', None) + if not catalog_id: + flash('You need to set a Session Catalog before adding Documents or URLs', 'warning') + return redirect(prefixed_url_for('document_bp.catalogs')) + view = DocumentListView(Document, 'document/documents.html', per_page=10) return view.get() @@ -609,7 +615,7 @@ def edit_document_version_view(document_version_id): @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def document_versions(document_id): doc = Document.query.get_or_404(document_id) - doc_desc = f'Document {doc.name}' + doc_desc = f'{doc.name}' page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) @@ -621,8 +627,7 @@ def document_versions(document_id): pagination = query.paginate(page=page, per_page=per_page, error_out=False) doc_langs = pagination.items - rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''), - ('object_name', ''), ('file_type', ''), + rows = prepare_table_for_macro(doc_langs, [('id', ''), ('file_type', ''), ('file_size', ''), ('processing', ''), ('processing_started_at', ''), ('processing_finished_at', ''), ('processing_error', '')]) diff --git a/eveai_app/views/dynamic_form_base.py b/eveai_app/views/dynamic_form_base.py index f30dfc1..cb135ec 100644 --- a/eveai_app/views/dynamic_form_base.py +++ b/eveai_app/views/dynamic_form_base.py @@ -328,6 +328,16 @@ class DynamicFormBase(FlaskForm): initial_data: Optional initial data for the fields """ current_app.logger.debug(f"Adding dynamic fields for collection {collection_name} with config: {config}") + + if isinstance(initial_data, str): + try: + initial_data = json.loads(initial_data) + except (json.JSONDecodeError, TypeError): + current_app.logger.error(f"Invalid JSON in initial_data: {initial_data}") + initial_data = {} + elif initial_data is None: + initial_data = {} + # Store the full configuration for later use in get_list_type_configs_js if not hasattr(self, '_full_configs'): self._full_configs = {} diff --git a/eveai_workers/processors/__init__.py b/eveai_workers/processors/__init__.py index adf3758..93e51d7 100644 --- a/eveai_workers/processors/__init__.py +++ b/eveai_workers/processors/__init__.py @@ -1,5 +1,5 @@ # Import all processor implementations to ensure registration -from . import audio_processor, html_processor, pdf_processor, markdown_processor, docx_processor +from . import audio_processor, html_processor, pdf_processor, markdown_processor, docx_processor, automagic_html_processor # List of all available processor implementations -__all__ = ['audio_processor', 'html_processor', 'pdf_processor', 'markdown_processor', 'docx_processor'] \ No newline at end of file +__all__ = ['audio_processor', 'html_processor', 'pdf_processor', 'markdown_processor', 'docx_processor', 'automagic_html_processor'] \ No newline at end of file diff --git a/eveai_workers/processors/automagic_html_processor.py b/eveai_workers/processors/automagic_html_processor.py new file mode 100644 index 0000000..f90b37e --- /dev/null +++ b/eveai_workers/processors/automagic_html_processor.py @@ -0,0 +1,65 @@ +import io +import pdfplumber +from flask import current_app +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +import re +from langchain_core.runnables import RunnablePassthrough + +from common.eveai_model.tracked_mistral_ocr_client import TrackedMistralOcrClient +from common.extensions import minio_client +from common.utils.model_utils import create_language_template, get_embedding_llm, get_template +from .base_processor import BaseProcessor +from common.utils.business_event_context import current_event +from .processor_registry import ProcessorRegistry + + +class AutomagicHTMLProcessor(BaseProcessor): + def __init__(self, tenant, document_version, catalog, processor): + super().__init__(tenant, document_version, catalog, processor) + + self.chunk_size = catalog.max_chunk_size + self.chunk_overlap = 0 + self.tuning = self.processor.tuning + + self.prompt_params = { + "custom_instructions": self.processor.configuration.get("custom_instructions", ""), + } + template, llm = get_template("automagic_html_parse") + + translation_prompt = ChatPromptTemplate.from_template(template) + setup = RunnablePassthrough() + output_parser = StrOutputParser() + self.chain = (setup | translation_prompt | llm | output_parser) + + + def process(self): + self._log("Starting Automagic HTML processing") + try: + # Get HTML-file data + file_data = minio_client.download_document_file( + self.tenant.id, + self.document_version.bucket_name, + self.document_version.object_name, + ) + + # Invoke HTML Processing Agent + self.prompt_params["html"] = file_data + with current_event.create_span("Markdown Generation"): + markdown = self.chain.invoke(self.prompt_params) + self._save_markdown(markdown) + + # Retrieve Title + match = re.search(r'^# (.+)', markdown, re.MULTILINE) + title = match.group(1).strip() if match else None + + self._log("Finished Automagic HTML Processing") + return markdown, title + except Exception as e: + self._log(f"Error automagically processing HTML: {str(e)}", level='error') + raise + + +# Register the processor +ProcessorRegistry.register("AUTOMAGIC_HTML_PROCESSOR", AutomagicHTMLProcessor) diff --git a/eveai_workers/processors/pdf_processor.py b/eveai_workers/processors/pdf_processor.py index 52eb25b..cc14997 100644 --- a/eveai_workers/processors/pdf_processor.py +++ b/eveai_workers/processors/pdf_processor.py @@ -44,185 +44,6 @@ class PDFProcessor(BaseProcessor): self._log(f"Error processing PDF: {str(e)}", level='error') raise - def _extract_content(self, file_data): - extracted_content = [] - with pdfplumber.open(io.BytesIO(file_data)) as pdf: - figure_counter = 1 - for page_num, page in enumerate(pdf.pages): - self._log(f"Extracting content from page {page_num + 1}") - page_content = { - 'text': page.extract_text(), - 'figures': self._extract_figures(page, page_num, figure_counter), - 'tables': self._extract_tables(page) - } - self.log_tuning("_extract_content", {"page_num": page_num, "page_content": page_content}) - figure_counter += len(page_content['figures']) - extracted_content.append(page_content) - - return extracted_content - - def _extract_figures(self, page, page_num, figure_counter): - figures = [] - # Omit figure processing for now! - # for img in page.images: - # try: - # # Try to get the bbox, use full page dimensions if not available - # bbox = img.get('bbox', (0, 0, page.width, page.height)) - # - # figure = { - # 'figure_number': figure_counter, - # 'filename': f"figure_{page_num + 1}_{figure_counter}.png", - # 'caption': self._find_figure_caption(page, bbox) - # } - # - # # Extract the figure as an image - # figure_image = page.within_bbox(bbox).to_image() - # - # # Save the figure using MinIO - # with io.BytesIO() as output: - # figure_image.save(output, format='PNG') - # output.seek(0) - # minio_client.upload_document_file( - # self.tenant.id, - # self.document_version.doc_id, - # self.document_version.language, - # self.document_version.id, - # figure['filename'], - # output.getvalue() - # ) - # - # figures.append(figure) - # figure_counter += 1 - # except Exception as e: - # self._log(f"Error processing figure on page {page_num + 1}: {str(e)}", level='error') - - return figures - - def _find_figure_caption(self, page, bbox): - try: - # Look for text below the figure - caption_bbox = (bbox[0], bbox[3], bbox[2], min(bbox[3] + 50, page.height)) - caption_text = page.crop(caption_bbox).extract_text() - if caption_text and caption_text.lower().startswith('figure'): - return caption_text - except Exception as e: - self._log(f"Error finding figure caption: {str(e)}", level='error') - return None - - def _extract_tables(self, page): - tables = [] - try: - for table in page.extract_tables(): - if table: - markdown_table = self._table_to_markdown(table) - if markdown_table: # Only add non-empty tables - tables.append(markdown_table) - self.log_tuning("_extract_tables", {"markdown_table": markdown_table}) - except Exception as e: - self._log(f"Error extracting tables from page: {str(e)}", level='error') - return tables - - def _table_to_markdown(self, table): - if not table or not table[0]: # Check if table is empty or first row is empty - return "" # Return empty string for empty tables - - def clean_cell(cell): - if cell is None: - return "" # Convert None to empty string - return str(cell).replace("|", "\\|") # Escape pipe characters and convert to string - - header = [clean_cell(cell) for cell in table[0]] - markdown = "| " + " | ".join(header) + " |\n" - markdown += "| " + " | ".join(["---"] * len(header)) + " |\n" - - for row in table[1:]: - cleaned_row = [clean_cell(cell) for cell in row] - markdown += "| " + " | ".join(cleaned_row) + " |\n" - - return markdown - - def _structure_content(self, extracted_content): - structured_content = "" - title = "Untitled Document" - current_heading_level = 0 - heading_pattern = re.compile(r'^(\d+(\.\d+)*\.?\s*)?(.+)$') - - def identify_heading(text): - match = heading_pattern.match(text.strip()) - if match: - numbering, _, content = match.groups() - if numbering: - level = numbering.count('.') + 1 - return level, f"{numbering}{content}" - else: - return 1, content # Assume it's a top-level heading if no numbering - return 0, text # Not a heading - - for page in extracted_content: - # Assume the title is on the first page - if page == extracted_content[0]: - lines = page.get('text', '').split('\n') - if lines: - title = lines[0].strip() # Use the first non-empty line as the title - - # Process text - paragraphs = page['text'].split('\n\n') - - for para in paragraphs: - lines = para.strip().split('\n') - if len(lines) == 1: # Potential heading - level, text = identify_heading(lines[0]) - if level > 0: - heading_marks = '#' * level - structured_content += f"\n\n{heading_marks} {text}\n\n" - if level == 1 and not title: - title = text # Use the first top-level heading as the title if not set - else: - structured_content += f"{para}\n\n" # Treat as normal paragraph - else: - structured_content += f"{para}\n\n" # Multi-line paragraph - - # Process figures - for figure in page.get('figures', []): - structured_content += f"\n\n![Figure {figure['figure_number']}]({figure['filename']})\n\n" - if figure['caption']: - structured_content += f"*Figure {figure['figure_number']}: {figure['caption']}*\n\n" - - # Add tables - if 'tables' in page: - for table in page['tables']: - structured_content += f"\n{table}\n" - - if self.tuning: - self._save_intermediate(structured_content, "structured_content.md") - - return structured_content, title - - def _split_content_for_llm(self, content): - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=self.chunk_size, - chunk_overlap=self.chunk_overlap, - length_function=len, - separators=["\n\n", "\n", " ", ""] - ) - return text_splitter.split_text(content) - - def _process_chunks_with_llm(self, chunks): - template, llm = get_template('pdf_parse') - pdf_prompt = ChatPromptTemplate.from_template(template) - setup = RunnablePassthrough() - output_parser = StrOutputParser() - chain = setup | pdf_prompt | llm | output_parser - - markdown_chunks = [] - for chunk in chunks: - input = {"pdf_content": chunk} - result = chain.invoke(input) - result = self._clean_markdown(result) - markdown_chunks.append(result) - - return "\n\n".join(markdown_chunks) - # Register the processor ProcessorRegistry.register("PDF_PROCESSOR", PDFProcessor) diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 1fe7a62..f92d71d 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -11,6 +11,7 @@ from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from sqlalchemy import or_ from sqlalchemy.exc import SQLAlchemyError +import traceback from common.extensions import db, cache_manager from common.models.document import DocumentVersion, Embedding, Document, Processor, Catalog @@ -24,7 +25,8 @@ from common.utils.business_event_context import current_event from config.type_defs.processor_types import PROCESSOR_TYPES from eveai_workers.processors.processor_registry import ProcessorRegistry -from common.utils.eveai_exceptions import EveAIInvalidEmbeddingModel +from common.utils.eveai_exceptions import EveAIInvalidEmbeddingModel, EveAINoContentFound, EveAIUnsupportedFileType, \ + EveAINoProcessorFound from common.utils.config_field_types import json_to_pattern_list @@ -58,8 +60,8 @@ def create_embeddings(tenant_id, document_version_id): catalog = Catalog.query.get_or_404(catalog_id) # Define processor related information - processor_type, processor_class = ProcessorRegistry.get_processor_for_file_type(document_version.file_type) processor = get_processor_for_document(catalog_id, document_version.file_type, document_version.sub_file_type) + processor_class = ProcessorRegistry.get_processor_class(processor.type) except Exception as e: current_app.logger.error(f'Create Embeddings request received ' @@ -95,7 +97,7 @@ def create_embeddings(tenant_id, document_version_id): delete_embeddings_for_document_version(document_version) try: - with current_event.create_span(f"{processor_type} Processing"): + with current_event.create_span(f"{processor.type} Processing"): document_processor = processor_class( tenant=tenant, document_version=document_version, @@ -107,6 +109,8 @@ def create_embeddings(tenant_id, document_version_id): 'markdown': markdown, 'title': title }) + if not markdown or markdown.strip() == '': + raise EveAINoContentFound(document_version.doc_id, document_version.id) with current_event.create_span("Embedding"): embed_markdown(tenant, document_version, catalog, document_processor, markdown, title) @@ -114,9 +118,11 @@ def create_embeddings(tenant_id, document_version_id): current_event.log("Finished Embedding Creation Task") except Exception as e: + stacktrace = traceback.format_exc() current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' - f'on document version {document_version_id} ' - f'error: {e}') + f'on document version {document_version_id} ' + f'error: {e}\n' + f'Stacktrace: {stacktrace}') document_version.processing = False document_version.processing_finished_at = dt.now(tz.utc) document_version.processing_error = str(e)[:255] @@ -624,25 +630,9 @@ def get_processor_for_document(catalog_id: int, file_type: str, sub_file_type: s ValueError: If no matching processor is found """ try: + current_app.logger.debug(f"Getting processor for catalog {catalog_id}, file type {file_type}, file sub_type {sub_file_type} ") # Start with base query for catalog - query = Processor.query.filter_by(catalog_id=catalog_id) - - # Find processor type that handles this file type - matching_processor_type = None - for proc_type, config in PROCESSOR_TYPES.items(): - supported_types = config['file_types'] - if isinstance(supported_types, str): - supported_types = [t.strip() for t in supported_types.split(',')] - - if file_type in supported_types: - matching_processor_type = proc_type - break - - if not matching_processor_type: - raise ValueError(f"No processor type found for file type: {file_type}") - - # Add processor type condition - query = query.filter_by(type=matching_processor_type) + query = Processor.query.filter_by(catalog_id=catalog_id).filter_by(active=True) # If sub_file_type is provided, add that condition if sub_file_type: @@ -651,22 +641,44 @@ def get_processor_for_document(catalog_id: int, file_type: str, sub_file_type: s # If no sub_file_type, prefer processors without sub_file_type specification query = query.filter(or_(Processor.sub_file_type.is_(None), Processor.sub_file_type == '')) + + available_processors = query.all() - # Get the first matching processor - processor = query.first() + if not available_processors: + raise EveAINoProcessorFound(catalog_id, file_type, sub_file_type) + available_processor_types = [processor.type for processor in available_processors] + current_app.logger.debug(f"Available processors for catalog {catalog_id}: {available_processor_types}") + + # Find processor type that handles this file type + matching_processor_type = None + for proc_type, config in PROCESSOR_TYPES.items(): + # Alleen verwerken als dit type processor beschikbaar is in de database + if proc_type in available_processor_types: + supported_types = config['file_types'] + if isinstance(supported_types, str): + supported_types = [t.strip() for t in supported_types.split(',')] + current_app.logger.debug(f"Supported types for processor type {proc_type}: {supported_types}") + + if file_type in supported_types: + matching_processor_type = proc_type + break + + current_app.logger.debug(f"Processor type found for catalog {catalog_id}, file type {file_type}: {matching_processor_type}") + if not matching_processor_type: + raise EveAINoProcessorFound(catalog_id, file_type, sub_file_type) + else: + current_app.logger.debug(f"Processor type found for file type: {file_type}: {matching_processor_type}") + + processor = None + for proc in available_processors: + if proc.type == matching_processor_type: + processor = proc + break if not processor: - if sub_file_type: - raise ValueError( - f"No processor found for catalog {catalog_id} of type {matching_processor_type}, " - f"file type {file_type}, sub-type {sub_file_type}" - ) - else: - raise ValueError( - f"No processor found for catalog {catalog_id}, " - f"file type {file_type}" - ) + raise EveAINoProcessorFound(catalog_id, file_type, sub_file_type) + current_app.logger.debug(f"Processor found for catalog {catalog_id}, file type {file_type}: {processor}") return processor except Exception as e: diff --git a/migrations/tenant/env.py b/migrations/tenant/env.py index 2837373..643c3af 100644 --- a/migrations/tenant/env.py +++ b/migrations/tenant/env.py @@ -72,7 +72,8 @@ def get_public_table_names(): # TODO: This function should include the necessary functionality to automatically retrieve table names return ['role', 'roles_users', 'tenant', 'user', 'tenant_domain','license_tier', 'license', 'license_usage', 'business_event_log', 'tenant_project', 'partner', 'partner_service', 'invoice', 'license_period', - 'license_change_log', 'partner_service_license_tier', 'payment', 'partner_tenant'] + 'license_change_log', 'partner_service_license_tier', 'payment', 'partner_tenant', 'tenant_make', + 'specialist_magic_link_tenant'] PUBLIC_TABLES = get_public_table_names() logger.info(f"Public tables: {PUBLIC_TABLES}") diff --git a/migrations/tenant/versions/b1647f31339a_add_active_flag_to_processor.py b/migrations/tenant/versions/b1647f31339a_add_active_flag_to_processor.py new file mode 100644 index 0000000..9124af7 --- /dev/null +++ b/migrations/tenant/versions/b1647f31339a_add_active_flag_to_processor.py @@ -0,0 +1,30 @@ +"""Add Active Flag to Processor + +Revision ID: b1647f31339a +Revises: 2b6ae6cc923e +Create Date: 2025-06-25 12:34:35.391516 + +""" +from alembic import op +import sqlalchemy as sa +import pgvector +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'b1647f31339a' +down_revision = '2b6ae6cc923e' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('processor', sa.Column('active', sa.Boolean(), nullable=True)) + op.execute("UPDATE processor SET active = true") + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('processor', 'active') + # ### end Alembic commands ###