from datetime import datetime as dt, timezone as tz from sqlalchemy import desc from sqlalchemy.exc import SQLAlchemyError from werkzeug.utils import secure_filename from common.models.document import Document, DocumentVersion, Catalog, Processor from common.extensions import db, minio_client from common.utils.celery_utils import current_celery from flask import current_app import requests from urllib.parse import urlparse, unquote, urlunparse, parse_qs import os from config.type_defs.processor_types import PROCESSOR_TYPES from .config_field_types import normalize_json_field from .eveai_exceptions import (EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, EveAIInvalidCatalog, EveAIInvalidDocument, EveAIInvalidDocumentVersion, EveAIException) from .minio_utils import MIB_CONVERTOR from ..models.user import Tenant from common.utils.model_logging_utils import set_logging_information, update_logging_information from common.services.entitlements import LicenseUsageServices def get_file_size(file): try: # Als file een bytes object is of iets anders dat len() ondersteunt file_size = len(file) except TypeError: # Als file een FileStorage object is current_position = file.tell() file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(current_position) return file_size def create_document_stack(api_input, file, filename, extension, tenant_id): # Precheck if we can add a document to the stack LicenseUsageServices.check_storage_and_embedding_quota(tenant_id, get_file_size(file) / MIB_CONVERTOR) # Create the Document catalog_id = int(api_input.get('catalog_id')) catalog = Catalog.query.get(catalog_id) if not catalog: raise EveAIInvalidCatalog(tenant_id, catalog_id) new_doc = create_document(api_input, filename, catalog_id) db.session.add(new_doc) url = api_input.get('url', '') if url != '': url = cope_with_local_url(api_input.get('url', '')) # Create the DocumentVersion new_doc_vers = create_version_for_document(new_doc, tenant_id, url, api_input.get('sub_file_type', ''), api_input.get('language', 'en'), api_input.get('user_context', ''), api_input.get('user_metadata'), api_input.get('catalog_properties') ) db.session.add(new_doc_vers) try: db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {tenant_id}: {e}') db.session.rollback() raise current_app.logger.info(f'Document added successfully for tenant {tenant_id}, ' f'Document Version {new_doc.id}') # Upload file to storage upload_file_for_version(new_doc_vers, file, extension, tenant_id) return new_doc, new_doc_vers def create_document(form, filename, catalog_id): new_doc = Document() if form['name'] == '': new_doc.name = filename.rsplit('.', 1)[0] else: new_doc.name = form['name'] if form['valid_from'] and form['valid_from'] != '': new_doc.valid_from = form['valid_from'] else: new_doc.valid_from = dt.now(tz.utc) new_doc.catalog_id = catalog_id set_logging_information(new_doc, dt.now(tz.utc)) return new_doc def create_version_for_document(document, tenant_id, url, sub_file_type, language, user_context, user_metadata, catalog_properties): new_doc_vers = DocumentVersion() if url != '': new_doc_vers.url = url if language == '': raise EveAIInvalidLanguageException('Language is required for document creation!') else: new_doc_vers.language = language if user_context != '': new_doc_vers.user_context = user_context if user_metadata != '' and user_metadata is not None: new_doc_vers.user_metadata = normalize_json_field(user_metadata, "user_metadata") if catalog_properties != '' and catalog_properties is not None: new_doc_vers.catalog_properties = normalize_json_field(catalog_properties, "catalog_properties") if sub_file_type != '': new_doc_vers.sub_file_type = sub_file_type new_doc_vers.document = document set_logging_information(new_doc_vers, dt.now(tz.utc)) return new_doc_vers def upload_file_for_version(doc_vers, file, extension, tenant_id): doc_vers.file_type = extension # Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place. minio_client.create_tenant_bucket(tenant_id) try: bn, on, size = minio_client.upload_document_file( tenant_id, doc_vers.doc_id, doc_vers.language, doc_vers.id, f"{doc_vers.id}.{extension}", file ) doc_vers.bucket_name = bn doc_vers.object_name = on doc_vers.file_size = size / MIB_CONVERTOR # Convert bytes to MB db.session.commit() current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for ' f'document version {doc_vers.id} while uploading file.') except Exception as e: db.session.rollback() current_app.logger.error( f'Error saving document to MinIO for tenant {tenant_id}: {e}') raise def get_extension_from_content_type(content_type): content_type_map = { 'text/html': 'html', 'application/pdf': 'pdf', 'text/plain': 'txt', 'application/msword': 'doc', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', # Add more mappings as needed } return content_type_map.get(content_type, 'html') # Default to 'html' if unknown def process_url(url, tenant_id): url = cope_with_local_url(url) response = requests.head(url, allow_redirects=True) content_type = response.headers.get('Content-Type', '').split(';')[0] # Determine file extension based on Content-Type extension = get_extension_from_content_type(content_type) # Generate filename parsed_url = urlparse(url) path = unquote(parsed_url.path) filename = os.path.basename(path) if not filename or '.' not in filename: # Use the last part of the path or a default name filename = path.strip('/').split('/')[-1] or 'document' filename = secure_filename(f"{filename}.{extension}") else: filename = secure_filename(filename) # Check if a document with this URL already exists existing_doc = DocumentVersion.query.filter_by(url=url).first() if existing_doc: raise EveAIDoubleURLException # Prepare the headers for maximal chance of downloading url referer = get_referer_from_url(url) headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/115.0.0.0 Safari/537.36" ), "Accept": ( "text/html,application/xhtml+xml,application/xml;" "q=0.9,image/avif,image/webp,image/apng,*/*;" "q=0.8,application/signed-exchange;v=b3;q=0.7" ), "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "nl-BE,nl;q=0.9,en-US;q=0.8,en;q=0.7", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Referer": referer, "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", } # Download the content response = requests.get(url, headers=headers) response.raise_for_status() file_content = response.content return file_content, filename, extension def clean_url(url): tracking_params = {"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content", "hsa_acc", "hsa_cam", "hsa_grp", "hsa_ad", "hsa_src", "hsa_tgt", "hsa_kw", "hsa_mt", "hsa_net", "hsa_ver", "gad_source", "gbraid"} parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) # Remove tracking params clean_params = {k: v for k, v in query_params.items() if k not in tracking_params} # Reconstruct the URL clean_query = "&".join(f"{k}={v[0]}" for k, v in clean_params.items()) if clean_params else "" cleaned_url = urlunparse(parsed_url._replace(query=clean_query)) return cleaned_url def start_embedding_task(tenant_id, doc_vers_id): task = current_celery.send_task('create_embeddings', args=[tenant_id, doc_vers_id,], queue='embeddings') current_app.logger.info(f'Embedding creation started for tenant {tenant_id}, ' f'Document Version {doc_vers_id}. ' f'Embedding creation task: {task.id}') return task.id def get_filename_from_url(url): parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') filename = path_parts[-1] if filename == '': filename = 'index' if not filename.endswith('.html'): filename += '.html' return filename def get_documents_list(page, per_page): query = Document.query.order_by(desc(Document.created_at)) pagination = query.paginate(page=page, per_page=per_page, error_out=False) return pagination def edit_document(tenant_id, document_id, name, valid_from, valid_to): doc = Document.query.get(document_id) if not doc: raise EveAIInvalidDocument(tenant_id, document_id) if name: doc.name = name if valid_from: doc.valid_from = valid_from if valid_to: doc.valid_to = valid_to update_logging_information(doc, dt.now(tz.utc)) try: db.session.add(doc) db.session.commit() return doc, None except SQLAlchemyError as e: db.session.rollback() return None, str(e) def edit_document_version(tenant_id, version_id, user_context, catalog_properties): doc_vers = DocumentVersion.query.get(version_id) if not doc_vers: raise EveAIInvalidDocumentVersion(tenant_id, version_id) doc_vers.user_context = user_context doc_vers.catalog_properties = normalize_json_field(catalog_properties, "catalog_properties") update_logging_information(doc_vers, dt.now(tz.utc)) try: db.session.add(doc_vers) db.session.commit() return doc_vers, None except SQLAlchemyError as e: db.session.rollback() return None, str(e) def refresh_document_with_info(doc_id, tenant_id, api_input): doc = Document.query.get(doc_id) if not doc: raise EveAIInvalidDocument(tenant_id, doc_id) old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first() if not old_doc_vers.url: return None, "This document has no URL. Only documents with a URL can be refreshed." # Precheck if we have enough quota for the new version LicenseUsageServices.check_storage_and_embedding_quota(tenant_id, old_doc_vers.file_size) new_doc_vers = create_version_for_document( doc, tenant_id, old_doc_vers.url, old_doc_vers.sub_file_type, api_input.get('language', old_doc_vers.language), api_input.get('user_context', old_doc_vers.user_context), api_input.get('user_metadata', old_doc_vers.user_metadata), api_input.get('catalog_properties', old_doc_vers.catalog_properties), ) set_logging_information(new_doc_vers, dt.now(tz.utc)) try: db.session.add(new_doc_vers) db.session.commit() except SQLAlchemyError as e: db.session.rollback() return None, str(e) url = cope_with_local_url(old_doc_vers.url) response = requests.head(url, allow_redirects=True) content_type = response.headers.get('Content-Type', '').split(';')[0] extension = get_extension_from_content_type(content_type) response = requests.get(url) response.raise_for_status() file_content = response.content upload_file_for_version(new_doc_vers, file_content, extension, tenant_id) task = current_celery.send_task('create_embeddings', args=[tenant_id, new_doc_vers.id,], queue='embeddings') current_app.logger.info(f'Embedding creation started for document {doc_id} on version {new_doc_vers.id} ' f'with task id: {task.id}.') return new_doc_vers, task.id def refresh_document_with_content(doc_id: int, tenant_id: int, file_content: bytes, api_input: dict) -> tuple: """ Refresh document with new content Args: doc_id: Document ID tenant_id: Tenant ID file_content: New file content api_input: Additional document information Returns: Tuple of (new_version, task_id) """ doc = Document.query.get(doc_id) if not doc: raise EveAIInvalidDocument(tenant_id, doc_id) old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first() # Precheck if we have enough quota for the new version LicenseUsageServices.check_storage_and_embedding_quota(tenant_id, get_file_size(file_content) / MIB_CONVERTOR) # Create new version with same file type as original extension = old_doc_vers.file_type new_doc_vers = create_version_for_document( doc, tenant_id, '', # No URL for content-based updates old_doc_vers.sub_file_type, api_input.get('language', old_doc_vers.language), api_input.get('user_context', old_doc_vers.user_context), api_input.get('user_metadata', old_doc_vers.user_metadata), api_input.get('catalog_properties', old_doc_vers.catalog_properties), ) try: db.session.add(new_doc_vers) db.session.commit() except SQLAlchemyError as e: db.session.rollback() return None, str(e) # Upload new content upload_file_for_version(new_doc_vers, file_content, extension, tenant_id) # Start embedding task task = current_celery.send_task('create_embeddings', args=[tenant_id, new_doc_vers.id], queue='embeddings') current_app.logger.info(f'Embedding creation started for document {doc_id} on version {new_doc_vers.id} ' f'with task id: {task.id}.') return new_doc_vers, task.id # Update the existing refresh_document function to use the new refresh_document_with_info def refresh_document(doc_id, tenant_id): current_app.logger.info(f'Refreshing document {doc_id}') doc = Document.query.get_or_404(doc_id) old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first() api_input = { 'language': old_doc_vers.language, 'user_context': old_doc_vers.user_context, 'user_metadata': old_doc_vers.user_metadata, 'catalog_properties': old_doc_vers.catalog_properties, } return refresh_document_with_info(doc_id, tenant_id, api_input) def cope_with_local_url(url): parsed_url = urlparse(url) # Check if this is an internal WordPress URL (TESTING) and rewrite it if parsed_url.netloc in [current_app.config['EXTERNAL_WORDPRESS_BASE_URL']]: parsed_url = parsed_url._replace( scheme=current_app.config['WORDPRESS_PROTOCOL'], netloc=f"{current_app.config['WORDPRESS_HOST']}:{current_app.config['WORDPRESS_PORT']}" ) url = urlunparse(parsed_url) return url def lookup_document(tenant_id: int, lookup_criteria: dict, metadata_type: str) -> tuple[Document, DocumentVersion]: """ Look up a document using metadata criteria Args: tenant_id: ID of the tenant lookup_criteria: Dictionary of key-value pairs to match in metadata metadata_type: Which metadata to search in ('user_metadata' or 'system_metadata') Returns: Tuple of (Document, DocumentVersion) if found Raises: ValueError: If invalid metadata_type provided EveAIException: If lookup fails """ if metadata_type not in ['user_metadata', 'system_metadata']: raise ValueError(f"Invalid metadata_type: {metadata_type}") try: # Query for the latest document version matching the criteria query = (db.session.query(Document, DocumentVersion) .join(DocumentVersion) .filter(Document.id == DocumentVersion.doc_id) .order_by(DocumentVersion.id.desc())) # Add metadata filtering using PostgreSQL JSONB operators metadata_field = getattr(DocumentVersion, metadata_type) for key, value in lookup_criteria.items(): query = query.filter(metadata_field[key].astext == str(value)) # Get first result result = query.first() if not result: raise EveAIException( f"No document found matching criteria in {metadata_type}", status_code=404 ) return result except SQLAlchemyError as e: current_app.logger.error(f'Database error during document lookup for tenant {tenant_id}: {e}') raise EveAIException( "Database error during document lookup", status_code=500 ) except Exception as e: current_app.logger.error(f'Error during document lookup for tenant {tenant_id}: {e}') raise EveAIException( "Error during document lookup", status_code=500 ) def is_file_type_supported_by_catalog(catalog_id, file_type): processors = Processor.query.filter_by(catalog_id=catalog_id).filter_by(active=True).all() supported_file_types = [] for processor in processors: processor_file_types = PROCESSOR_TYPES[processor.type]['file_types'] file_types = [f.strip() for f in processor_file_types.split(",")] supported_file_types.extend(file_types) if file_type not in supported_file_types: raise EveAIUnsupportedFileType() def get_referer_from_url(url): parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}/"