import io import os from datetime import datetime as dt, timezone as tz from celery import states from flask import current_app # OpenAI imports from langchain.text_splitter import MarkdownHeaderTextSplitter from langchain_core.exceptions import LangChainException from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from sqlalchemy.exc import SQLAlchemyError from common.extensions import db, minio_client from common.models.document import DocumentVersion, Embedding from common.models.user import Tenant from common.utils.celery_utils import current_celery from common.utils.database import Database from common.utils.model_utils import select_model_variables, create_language_template from common.utils.os_utils import safe_remove, sync_folder from eveai_workers.Processors.audio_processor import AudioProcessor from eveai_workers.Processors.html_processor import HTMLProcessor from eveai_workers.Processors.pdf_processor import PDFProcessor from eveai_workers.Processors.srt_processor import SRTProcessor @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.') try: # Retrieve Tenant for which we are processing tenant = Tenant.query.get(tenant_id) if tenant is None: raise Exception(f'Tenant {tenant_id} not found') # Ensure we are working in the correct database schema Database(tenant_id).switch_schema() # Select variables to work with depending on tenant and model model_variables = select_model_variables(tenant) current_app.logger.debug(f'Model variables: {model_variables}') # Retrieve document version to process document_version = DocumentVersion.query.get(document_version_id) if document_version is None: raise Exception(f'Document version {document_version_id} not found') except Exception as e: current_app.logger.error(f'Create Embeddings request received ' f'for non existing document version {document_version_id} ' f'for tenant {tenant_id}, ' f'error: {e}') raise try: db.session.add(document_version) # start processing document_version.processing = True document_version.processing_started_at = dt.now(tz.utc) document_version.processing_finished_at = None document_version.processing_error = None db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Unable to save Embedding status information ' f'in document version {document_version_id} ' f'for tenant {tenant_id}') raise delete_embeddings_for_document_version(document_version) try: match document_version.file_type: case 'pdf': process_pdf(tenant, model_variables, document_version) case 'html': process_html(tenant, model_variables, document_version) case 'srt': process_srt(tenant, model_variables, document_version) case 'mp4' | 'mp3' | 'ogg': process_audio(tenant, model_variables, document_version) case _: raise Exception(f'No functionality defined for file type {document_version.file_type} ' f'for tenant {tenant_id} ' f'while creating embeddings for document version {document_version_id}') except Exception as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' f'on document version {document_version_id} ' f'error: {e}') document_version.processing = False document_version.processing_finished_at = dt.now(tz.utc) document_version.processing_error = str(e)[:255] db.session.commit() create_embeddings.update_state(state=states.FAILURE) raise def delete_embeddings_for_document_version(document_version): embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all() for embedding in embeddings_to_delete: db.session.delete(embedding) try: db.session.commit() current_app.logger.info(f'Deleted embeddings for document version {document_version.id}') except SQLAlchemyError as e: current_app.logger.error(f'Unable to delete embeddings for document version {document_version.id}') raise def process_pdf(tenant, model_variables, document_version): processor = PDFProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) def process_html(tenant, model_variables, document_version): processor = HTMLProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) def process_audio(tenant, model_variables, document_version): processor = AudioProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) def process_srt(tenant, model_variables, document_version): processor = SRTProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) def embed_markdown(tenant, model_variables, document_version, markdown, title): # Create potential chunks potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") # Combine chunks for embedding chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) # Enrich chunks enriched_chunks = enrich_chunks(tenant, model_variables, document_version, title, chunks) # Create embeddings embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) # Update document version and save embeddings try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on HTML, document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') def enrich_chunks(tenant, model_variables, document_version, title, chunks): current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') summary = '' if len(chunks) > 1: summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) chunk_total_context = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'Title: {title}\n' f'{summary}\n' f'{document_version.system_context}\n\n') enriched_chunks = [] initial_chunk = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'Title: {title}\n' f'{chunks[0]}') enriched_chunks.append(initial_chunk) for chunk in chunks[1:]: enriched_chunk = f'{chunk_total_context}\n{chunk}' enriched_chunks.append(enriched_chunk) current_app.logger.debug(f'Finished enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') return enriched_chunks def summarize_chunk(tenant, model_variables, document_version, chunk): current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['summary_template'] language_template = create_language_template(template, document_version.language) summary_prompt = ChatPromptTemplate.from_template(language_template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | summary_prompt | llm | output_parser try: current_app.logger.debug(f'Starting summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') summary = chain.invoke({"text": chunk}) current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}.') return summary except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: {e}') raise def embed_chunks(tenant, model_variables, document_version, chunks): current_app.logger.debug(f'Embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') embedding_model = model_variables['embedding_model'] try: embeddings = embedding_model.embed_documents(chunks) current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') except LangChainException as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while calling OpenAI API' f'error: {e}') raise # Add embeddings to the database new_embeddings = [] for chunk, embedding in zip(chunks, embeddings): new_embedding = model_variables['embedding_db_model']() new_embedding.document_version = document_version new_embedding.active = True new_embedding.chunk = chunk new_embedding.embedding = embedding new_embeddings.append(new_embedding) return new_embeddings def log_parsing_info(tenant, tags, included_elements, excluded_elements, excluded_classes, elements_to_parse): if tenant.embed_tuning: current_app.embed_tuning_logger.debug(f'Tags to parse: {tags}') current_app.embed_tuning_logger.debug(f'Included Elements: {included_elements}') current_app.embed_tuning_logger.debug(f'Excluded Elements: {excluded_elements}') current_app.embed_tuning_logger.debug(f'Excluded Classes: {excluded_classes}') current_app.embed_tuning_logger.debug(f'Found {len(elements_to_parse)} elements to parse') current_app.embed_tuning_logger.debug(f'First element to parse: {elements_to_parse[0]}') # def process_youtube(tenant, model_variables, document_version): # download_file_name = f'{document_version.id}.mp4' # compressed_file_name = f'{document_version.id}.mp3' # transcription_file_name = f'{document_version.id}.txt' # markdown_file_name = f'{document_version.id}.md' # # # Remove existing files (in case of a re-processing of the file # minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, # document_version.id, download_file_name) # minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, # document_version.id, compressed_file_name) # minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, # document_version.id, transcription_file_name) # minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, # document_version.id, markdown_file_name) # # of, title, description, author = download_youtube(document_version.url, tenant.id, document_version, # download_file_name) # document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}' # compress_audio(tenant.id, document_version, download_file_name, compressed_file_name) # transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables) # annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables) # # potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) # actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], # model_variables['max_chunk_size']) # # enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks) # embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) # # try: # db.session.add(document_version) # document_version.processing_finished_at = dt.now(tz.utc) # document_version.processing = False # db.session.add_all(embeddings) # db.session.commit() # except SQLAlchemyError as e: # current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' # f'on Youtube document version {document_version.id}' # f'error: {e}') # raise # # current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' # f'on Youtube document version {document_version.id} :-)') # # # def download_youtube(url, tenant_id, document_version, file_name): # try: # current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}') # yt = YouTube(url) # stream = yt.streams.get_audio_only() # # with tempfile.NamedTemporaryFile(delete=False) as temp_file: # stream.download(output_path=temp_file.name) # with open(temp_file.name, 'rb') as f: # file_data = f.read() # # minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, # document_version.id, # file_name, file_data) # # current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}') # return file_name, yt.title, yt.description, yt.author # except Exception as e: # current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}') # raise # # # def compress_audio(tenant_id, document_version, input_file, output_file): # try: # current_app.logger.info(f'Compressing audio for tenant: {tenant_id}') # # input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, # document_version.id, input_file) # # with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input: # temp_input.write(input_data) # temp_input.flush() # # with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output: # result = subprocess.run( # ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name], # capture_output=True, # text=True # ) # # if result.returncode != 0: # raise Exception(f"Compression failed: {result.stderr}") # # with open(temp_output.name, 'rb') as f: # compressed_data = f.read() # # minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, # document_version.id, # output_file, compressed_data) # # current_app.logger.info(f'Compressed audio for tenant: {tenant_id}') # except Exception as e: # current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}') # raise # # # def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables): # try: # current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}') # client = model_variables['transcription_client'] # model = model_variables['transcription_model'] # # # Download the audio file from MinIO # audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, # document_version.id, input_file) # # # Load the audio data into pydub # audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) # # # Define segment length (e.g., 10 minutes) # segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds # # transcriptions = [] # # # Split audio into segments and transcribe each # for i, chunk in enumerate(audio[::segment_length]): # current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}') # # with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: # chunk.export(temp_audio.name, format="mp3") # # with open(temp_audio.name, 'rb') as audio_segment: # transcription = client.audio.transcriptions.create( # file=audio_segment, # model=model, # language=document_version.language, # response_format='verbose_json', # ) # # transcriptions.append(transcription.text) # # os.unlink(temp_audio.name) # Delete the temporary file # # # Combine all transcriptions # full_transcription = " ".join(transcriptions) # # # Upload the full transcription to MinIO # minio_client.upload_document_file( # tenant_id, # document_version.doc_id, # document_version.language, # document_version.id, # output_file, # full_transcription.encode('utf-8') # ) # # current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}') # except Exception as e: # current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}') # raise # # # def annotate_transcription(tenant, document_version, input_file, output_file, model_variables): # try: # current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}') # # char_splitter = CharacterTextSplitter(separator='.', # chunk_size=model_variables['annotation_chunk_length'], # chunk_overlap=0) # # headers_to_split_on = [ # ("#", "Header 1"), # ("##", "Header 2"), # ] # markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) # # llm = model_variables['llm'] # template = model_variables['transcript_template'] # language_template = create_language_template(template, document_version.language) # transcript_prompt = ChatPromptTemplate.from_template(language_template) # setup = RunnablePassthrough() # output_parser = StrOutputParser() # # # Download the transcription file from MinIO # transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id, # document_version.language, document_version.id, # input_file) # transcript = transcript_data.decode('utf-8') # # chain = setup | transcript_prompt | llm | output_parser # # chunks = char_splitter.split_text(transcript) # all_markdown_chunks = [] # last_markdown_chunk = '' # for chunk in chunks: # current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}') # full_input = last_markdown_chunk + '\n' + chunk # if tenant.embed_tuning: # current_app.embed_tuning_logger.debug(f'Annotating chunk: \n ' # f'------------------\n' # f'{full_input}\n' # f'------------------\n') # input_transcript = {'transcript': full_input} # markdown = chain.invoke(input_transcript) # # GPT-4o returns some kind of content description: ```markdown ``` # if markdown.startswith("```markdown"): # markdown = "\n".join(markdown.strip().split("\n")[1:-1]) # if tenant.embed_tuning: # current_app.embed_tuning_logger.debug(f'Markdown Received: \n ' # f'------------------\n' # f'{markdown}\n' # f'------------------\n') # md_header_splits = markdown_splitter.split_text(markdown) # markdown_chunks = [doc.page_content for doc in md_header_splits] # # claude-3.5-sonnet returns introductory text # if not markdown_chunks[0].startswith('#'): # markdown_chunks.pop(0) # last_markdown_chunk = markdown_chunks[-1] # last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:]) # markdown_chunks.pop() # all_markdown_chunks += markdown_chunks # # all_markdown_chunks += [last_markdown_chunk] # # annotated_transcript = '\n'.join(all_markdown_chunks) # # # Upload the annotated transcript to MinIO # minio_client.upload_document_file( # tenant.id, # document_version.doc_id, # document_version.language, # document_version.id, # output_file, # annotated_transcript.encode('utf-8') # ) # # current_app.logger.info(f'Annotated transcription for tenant {tenant.id}') # except Exception as e: # current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}') # raise def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): try: current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') # Download the markdown file from MinIO markdown_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, input_file ) markdown = markdown_data.decode('utf-8') headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), ] markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) md_header_splits = markdown_splitter.split_text(markdown) potential_chunks = [doc.page_content for doc in md_header_splits] current_app.logger.debug(f'Created {len(potential_chunks)} potential chunks for tenant {tenant_id}') return potential_chunks except Exception as e: current_app.logger.error(f'Error creating potential chunks for tenant {tenant_id}, with error: {e}') raise def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars): actual_chunks = [] current_chunk = "" current_length = 0 for chunk in potential_chunks: chunk_length = len(chunk) if current_length + chunk_length > max_chars: if current_length >= min_chars: actual_chunks.append(current_chunk) current_chunk = chunk current_length = chunk_length else: # If the combined chunk is still less than max_chars, keep adding current_chunk += f'\n{chunk}' current_length += chunk_length else: current_chunk += f'\n{chunk}' current_length += chunk_length # Handle the last chunk if current_chunk and current_length >= 0: actual_chunks.append(current_chunk) return actual_chunks