import io import os from datetime import datetime as dt, timezone as tz from celery import states from flask import current_app # OpenAI imports from langchain.text_splitter import MarkdownHeaderTextSplitter from langchain_core.exceptions import LangChainException from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from sqlalchemy.exc import SQLAlchemyError from common.extensions import db, minio_client from common.models.document import DocumentVersion, Embedding from common.models.user import Tenant from common.utils.celery_utils import current_celery from common.utils.database import Database from common.utils.model_utils import select_model_variables, create_language_template from common.utils.os_utils import safe_remove, sync_folder from eveai_workers.Processors.audio_processor import AudioProcessor from eveai_workers.Processors.html_processor import HTMLProcessor from eveai_workers.Processors.pdf_processor import PDFProcessor from eveai_workers.Processors.srt_processor import SRTProcessor from common.utils.business_event import BusinessEvent from common.utils.business_event_context import current_event # Healthcheck task @current_celery.task(name='ping', queue='embeddings') def ping(): return 'pong' @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): # BusinessEvent creates a context, which is why we need to use it with a with block with BusinessEvent('Create Embeddings', tenant_id, document_version_id=document_version_id): current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}') current_event.log("Starting Embedding Creation Task") try: # Retrieve Tenant for which we are processing tenant = Tenant.query.get(tenant_id) if tenant is None: raise Exception(f'Tenant {tenant_id} not found') # Ensure we are working in the correct database schema Database(tenant_id).switch_schema() # Select variables to work with depending on tenant and model model_variables = select_model_variables(tenant) current_app.logger.debug(f'Model variables: {model_variables}') # Retrieve document version to process document_version = DocumentVersion.query.get(document_version_id) if document_version is None: raise Exception(f'Document version {document_version_id} not found') except Exception as e: current_app.logger.error(f'Create Embeddings request received ' f'for non existing document version {document_version_id} ' f'for tenant {tenant_id}, ' f'error: {e}') raise try: db.session.add(document_version) # start processing document_version.processing = True document_version.processing_started_at = dt.now(tz.utc) document_version.processing_finished_at = None document_version.processing_error = None db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Unable to save Embedding status information ' f'in document version {document_version_id} ' f'for tenant {tenant_id}') raise delete_embeddings_for_document_version(document_version) try: match document_version.file_type: case 'pdf': process_pdf(tenant, model_variables, document_version) case 'html': process_html(tenant, model_variables, document_version) case 'srt': process_srt(tenant, model_variables, document_version) case 'mp4' | 'mp3' | 'ogg': process_audio(tenant, model_variables, document_version) case _: raise Exception(f'No functionality defined for file type {document_version.file_type} ' f'for tenant {tenant_id} ' f'while creating embeddings for document version {document_version_id}') current_event.log("Finished Embedding Creation Task") except Exception as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' f'on document version {document_version_id} ' f'error: {e}') document_version.processing = False document_version.processing_finished_at = dt.now(tz.utc) document_version.processing_error = str(e)[:255] db.session.commit() create_embeddings.update_state(state=states.FAILURE) raise def delete_embeddings_for_document_version(document_version): embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all() for embedding in embeddings_to_delete: db.session.delete(embedding) try: db.session.commit() current_app.logger.info(f'Deleted embeddings for document version {document_version.id}') except SQLAlchemyError as e: current_app.logger.error(f'Unable to delete embeddings for document version {document_version.id}') raise def process_pdf(tenant, model_variables, document_version): current_event.log("Starting PDF Processing") processor = PDFProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) current_event.log("Finished PDF Processing") def process_html(tenant, model_variables, document_version): with current_event.create_span("HTML Processing"): processor = HTMLProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed with current_event.create_span("Embedding"): embed_markdown(tenant, model_variables, document_version, markdown, title) def process_audio(tenant, model_variables, document_version): current_event.log("Starting Audio Processing") processor = AudioProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) current_event.log("Finished Audio Processing") def process_srt(tenant, model_variables, document_version): current_event.log("Starting SRT Processing") processor = SRTProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) current_event.log("Finished SRT Processing") def embed_markdown(tenant, model_variables, document_version, markdown, title): current_event.log("Starting Embedding Markdown Processing") # Create potential chunks potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") # Combine chunks for embedding chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) # Enrich chunks enriched_chunks = enrich_chunks(tenant, model_variables, document_version, title, chunks) # Create embeddings embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) # Update document version and save embeddings try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on HTML, document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') current_event.log("Finished Embedding Markdown Processing") def enrich_chunks(tenant, model_variables, document_version, title, chunks): current_event.log("Starting Enriching Chunks Processing") current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') summary = '' if len(chunks) > 1: summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) chunk_total_context = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'User Metadata:\n{document_version.user_metadata}\n\n' f'Title: {title}\n' f'Summary:\n{summary}\n' f'System Context:\n{document_version.system_context}\n\n' f'System Metadata:\n{document_version.system_metadata}\n\n' ) enriched_chunks = [] initial_chunk = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'User Metadata:\n{document_version.user_metadata}\n\n' f'Title: {title}\n' f'System Context:\n{document_version.system_context}\n\n' f'System Metadata:\n{document_version.system_metadata}\n\n' f'{chunks[0]}' ) enriched_chunks.append(initial_chunk) for chunk in chunks[1:]: enriched_chunk = f'{chunk_total_context}\n{chunk}' enriched_chunks.append(enriched_chunk) current_app.logger.debug(f'Finished enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') current_event.log("Finished Enriching Chunks Processing") return enriched_chunks def summarize_chunk(tenant, model_variables, document_version, chunk): current_event.log("Starting Summarizing Chunk Processing") current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['summary_template'] language_template = create_language_template(template, document_version.language) summary_prompt = ChatPromptTemplate.from_template(language_template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | summary_prompt | llm | output_parser try: current_app.logger.debug(f'Starting summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') summary = chain.invoke({"text": chunk}) current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}.') current_event.log("Finished summarizing chunk for tenant ") return summary except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: {e}') raise def embed_chunks(tenant, model_variables, document_version, chunks): current_event.log("Starting Embedding Chunks Processing") current_app.logger.debug(f'Embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') embedding_model = model_variables['embedding_model'] try: embeddings = embedding_model.embed_documents(chunks) current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') except LangChainException as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while calling OpenAI API' f'error: {e}') raise # Add embeddings to the database new_embeddings = [] for chunk, embedding in zip(chunks, embeddings): new_embedding = model_variables['embedding_db_model']() new_embedding.document_version = document_version new_embedding.active = True new_embedding.chunk = chunk new_embedding.embedding = embedding new_embeddings.append(new_embedding) current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} ') return new_embeddings def log_parsing_info(tenant, tags, included_elements, excluded_elements, excluded_classes, elements_to_parse): if tenant.embed_tuning: current_app.embed_tuning_logger.debug(f'Tags to parse: {tags}') current_app.embed_tuning_logger.debug(f'Included Elements: {included_elements}') current_app.embed_tuning_logger.debug(f'Excluded Elements: {excluded_elements}') current_app.embed_tuning_logger.debug(f'Excluded Classes: {excluded_classes}') current_app.embed_tuning_logger.debug(f'Found {len(elements_to_parse)} elements to parse') current_app.embed_tuning_logger.debug(f'First element to parse: {elements_to_parse[0]}') def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): try: current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') # Download the markdown file from MinIO markdown_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, input_file ) markdown = markdown_data.decode('utf-8') headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), ] markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) md_header_splits = markdown_splitter.split_text(markdown) potential_chunks = [doc.page_content for doc in md_header_splits] current_app.logger.debug(f'Created {len(potential_chunks)} potential chunks for tenant {tenant_id}') return potential_chunks except Exception as e: current_app.logger.error(f'Error creating potential chunks for tenant {tenant_id}, with error: {e}') raise def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars): actual_chunks = [] current_chunk = "" current_length = 0 for chunk in potential_chunks: chunk_length = len(chunk) if current_length + chunk_length > max_chars: if current_length >= min_chars: actual_chunks.append(current_chunk) current_chunk = chunk current_length = chunk_length else: # If the combined chunk is still less than max_chars, keep adding current_chunk += f'\n{chunk}' current_length += chunk_length else: current_chunk += f'\n{chunk}' current_length += chunk_length # Handle the last chunk if current_chunk and current_length >= 0: actual_chunks.append(current_chunk) return actual_chunks