from datetime import datetime as dt, timezone as tz from sqlalchemy.exc import SQLAlchemyError from werkzeug.utils import secure_filename from common.models.document import Document, DocumentVersion from common.extensions import db, minio_client from common.utils.celery_utils import current_celery from flask import current_app import requests from urllib.parse import urlparse, unquote import os from .eveai_exceptions import EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, \ EveAIYoutubeError def create_document_stack(api_input, file, filename, extension, tenant_id): # Create the Document new_doc = create_document(api_input, filename, tenant_id) db.session.add(new_doc) # Create the DocumentVersion new_doc_vers = create_version_for_document(new_doc, api_input.get('url', ''), api_input.get('language', 'en'), api_input.get('user_context', '') ) db.session.add(new_doc_vers) try: db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {tenant_id}: {e}') db.session.rollback() raise current_app.logger.info(f'Document added successfully for tenant {tenant_id}, ' f'Document Version {new_doc.id}') # Upload file to storage upload_file_for_version(new_doc_vers, file, extension, tenant_id) return new_doc, new_doc_vers def create_document(form, filename, tenant_id): new_doc = Document() if form['name'] == '': new_doc.name = filename.rsplit('.', 1)[0] else: new_doc.name = form['name'] if form['valid_from'] and form['valid_from'] != '': new_doc.valid_from = form['valid_from'] else: new_doc.valid_from = dt.now(tz.utc) new_doc.tenant_id = tenant_id set_logging_information(new_doc, dt.now(tz.utc)) return new_doc def create_version_for_document(document, url, language, user_context): new_doc_vers = DocumentVersion() if url != '': new_doc_vers.url = url if language == '': raise EveAIInvalidLanguageException('Language is required for document creation!') else: new_doc_vers.language = language if user_context != '': new_doc_vers.user_context = user_context new_doc_vers.document = document set_logging_information(new_doc_vers, dt.now(tz.utc)) return new_doc_vers def upload_file_for_version(doc_vers, file, extension, tenant_id): doc_vers.file_type = extension doc_vers.file_name = doc_vers.calc_file_name() doc_vers.file_location = doc_vers.calc_file_location() # Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place. minio_client.create_tenant_bucket(tenant_id) try: minio_client.upload_document_file( tenant_id, doc_vers.doc_id, doc_vers.language, doc_vers.id, doc_vers.file_name, file ) db.session.commit() current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for ' f'document version {doc_vers.id} while uploading file.') except Exception as e: db.session.rollback() current_app.logger.error( f'Error saving document to MinIO for tenant {tenant_id}: {e}') raise def set_logging_information(obj, timestamp): obj.created_at = timestamp obj.updated_at = timestamp def update_logging_information(obj, timestamp): obj.updated_at = timestamp def get_extension_from_content_type(content_type): content_type_map = { 'text/html': 'html', 'application/pdf': 'pdf', 'text/plain': 'txt', 'application/msword': 'doc', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', # Add more mappings as needed } return content_type_map.get(content_type, 'html') # Default to 'html' if unknown def process_url(url, tenant_id): response = requests.head(url, allow_redirects=True) content_type = response.headers.get('Content-Type', '').split(';')[0] # Determine file extension based on Content-Type extension = get_extension_from_content_type(content_type) # Generate filename parsed_url = urlparse(url) path = unquote(parsed_url.path) filename = os.path.basename(path) if not filename or '.' not in filename: # Use the last part of the path or a default name filename = path.strip('/').split('/')[-1] or 'document' filename = secure_filename(f"{filename}.{extension}") else: filename = secure_filename(filename) # Check if a document with this URL already exists existing_doc = DocumentVersion.query.filter_by(url=url).first() if existing_doc: raise EveAIDoubleURLException # Download the content response = requests.get(url) response.raise_for_status() file_content = response.content return file_content, filename, extension def process_multiple_urls(urls, tenant_id, api_input): results = [] for url in urls: try: file_content, filename, extension = process_url(url, tenant_id) url_input = api_input.copy() url_input.update({ 'url': url, 'name': f"{api_input['name']}-{filename}" if api_input['name'] else filename }) new_doc, new_doc_vers = create_document_stack(url_input, file_content, filename, extension, tenant_id) task_id = start_embedding_task(tenant_id, new_doc_vers.id) results.append({ 'url': url, 'document_id': new_doc.id, 'document_version_id': new_doc_vers.id, 'task_id': task_id, 'status': 'success' }) except Exception as e: current_app.logger.error(f"Error processing URL {url}: {str(e)}") results.append({ 'url': url, 'status': 'error', 'message': str(e) }) return results def prepare_youtube_document(url, tenant_id, api_input): try: filename = f"placeholder.youtube" extension = 'youtube' new_doc = create_document(api_input, filename, tenant_id) new_doc_vers = create_version_for_document(new_doc, url, api_input['language'], api_input['user_context']) new_doc_vers.file_type = extension new_doc_vers.file_name = new_doc_vers.calc_file_name() new_doc_vers.file_location = new_doc_vers.calc_file_location() db.session.add(new_doc) db.session.add(new_doc_vers) db.session.commit() return new_doc, new_doc_vers except Exception as e: raise EveAIYoutubeError(f"Error preparing YouTube document: {str(e)}") def start_embedding_task(tenant_id, doc_vers_id): task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ tenant_id, doc_vers_id, ]) current_app.logger.info(f'Embedding creation started for tenant {tenant_id}, ' f'Document Version {doc_vers_id}. ' f'Embedding creation task: {task.id}') return task.id def validate_file_type(extension): current_app.logger.debug(f'Validating file type {extension}') current_app.logger.debug(f'Supported file types: {current_app.config["SUPPORTED_FILE_TYPES"]}') if extension not in current_app.config['SUPPORTED_FILE_TYPES']: raise EveAIUnsupportedFileType(f"Filetype {extension} is currently not supported. " f"Supported filetypes: {', '.join(current_app.config['SUPPORTED_FILE_TYPES'])}")