import io import os from datetime import datetime as dt, timezone as tz import subprocess import gevent from bs4 import BeautifulSoup import html from celery import states from flask import current_app # OpenAI imports from langchain.chains.summarize import load_summarize_chain from langchain.text_splitter import CharacterTextSplitter, MarkdownHeaderTextSplitter from langchain_core.exceptions import LangChainException from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from sqlalchemy.exc import SQLAlchemyError from pytube import YouTube import PyPDF2 from pydub import AudioSegment import tempfile from common.extensions import db, minio_client from common.models.document import DocumentVersion, Embedding from common.models.user import Tenant from common.utils.celery_utils import current_celery from common.utils.database import Database from common.utils.model_utils import select_model_variables, create_language_template from common.utils.os_utils import safe_remove, sync_folder @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.') try: # Retrieve Tenant for which we are processing tenant = Tenant.query.get(tenant_id) if tenant is None: raise Exception(f'Tenant {tenant_id} not found') # Ensure we are working in the correct database schema Database(tenant_id).switch_schema() # Select variables to work with depending on tenant and model model_variables = select_model_variables(tenant) current_app.logger.debug(f'Model variables: {model_variables}') # Retrieve document version to process document_version = DocumentVersion.query.get(document_version_id) if document_version is None: raise Exception(f'Document version {document_version_id} not found') except Exception as e: current_app.logger.error(f'Create Embeddings request received ' f'for non existing document version {document_version_id} ' f'for tenant {tenant_id}, ' f'error: {e}') raise try: db.session.add(document_version) # start processing document_version.processing = True document_version.processing_started_at = dt.now(tz.utc) document_version.processing_finished_at = None document_version.processing_error = None db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Unable to save Embedding status information ' f'in document version {document_version_id} ' f'for tenant {tenant_id}') raise delete_embeddings_for_document_version(document_version) try: match document_version.file_type: case 'pdf': process_pdf(tenant, model_variables, document_version) case 'html': process_html(tenant, model_variables, document_version) case 'youtube': process_youtube(tenant, model_variables, document_version) case _: raise Exception(f'No functionality defined for file type {document_version.file_type} ' f'for tenant {tenant_id} ' f'while creating embeddings for document version {document_version_id}') except Exception as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' f'on document version {document_version_id} ' f'error: {e}') document_version.processing = False document_version.processing_finished_at = dt.now(tz.utc) document_version.processing_error = str(e)[:255] db.session.commit() create_embeddings.update_state(state=states.FAILURE) raise def process_pdf(tenant, model_variables, document_version): file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, document_version.file_name) pdf_text = '' pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data)) for page in pdf_reader.pages: pdf_text += page.extract_text() markdown = generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_text) markdown_file_name = f'{document_version.id}.md' minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, markdown_file_name, markdown.encode()) potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) if len(chunks) > 1: summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) document_version.system_context = f'Summary: {summary}\n' else: document_version.system_context = '' enriched_chunks = enrich_chunks(tenant, document_version, chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on HTML, document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') def delete_embeddings_for_document_version(document_version): embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all() for embedding in embeddings_to_delete: db.session.delete(embedding) try: db.session.commit() current_app.logger.info(f'Deleted embeddings for document version {document_version.id}') except SQLAlchemyError as e: current_app.logger.error(f'Unable to delete embeddings for document version {document_version.id}') raise def process_html(tenant, model_variables, document_version): file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, document_version.file_name) html_content = file_data.decode('utf-8') # The tags to be considered can be dependent on the tenant html_tags = model_variables['html_tags'] html_end_tags = model_variables['html_end_tags'] html_included_elements = model_variables['html_included_elements'] html_excluded_elements = model_variables['html_excluded_elements'] extracted_html, title = parse_html(tenant, html_content, html_tags, included_elements=html_included_elements, excluded_elements=html_excluded_elements) extracted_file_name = f'{document_version.id}-extracted.html' minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, extracted_file_name, extracted_html.encode()) markdown = generate_markdown_from_html(tenant, model_variables, document_version, extracted_html) markdown_file_name = f'{document_version.id}.md' minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, markdown_file_name, markdown.encode()) potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) if len(chunks) > 1: summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) document_version.system_context = (f'Title: {title}\n' f'Summary: {summary}\n') else: document_version.system_context = (f'Title: {title}\n') enriched_chunks = enrich_chunks(tenant, document_version, title, chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on HTML, document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') def enrich_chunks(tenant, document_version, title, chunks): current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') current_app.logger.debug(f'Nr of chunks: {len(chunks)}') chunk_total_context = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'{document_version.system_context}\n\n') enriched_chunks = [] initial_chunk = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'Title: {title}\n' f'{chunks[0]}') enriched_chunks.append(initial_chunk) for chunk in chunks[1:]: enriched_chunk = f'{chunk_total_context}\n{chunk}' enriched_chunks.append(enriched_chunk) current_app.logger.debug(f'Finished enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') return enriched_chunks def generate_markdown_from_html(tenant, model_variables, document_version, html_content): current_app.logger.debug(f'Generating Markdown from HTML for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['html_parse_template'] parse_prompt = ChatPromptTemplate.from_template(template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | parse_prompt | llm | output_parser input_html = {"html": html_content} markdown = chain.invoke(input_html) return markdown def generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_content): current_app.logger.debug(f'Generating Markdown from PDF for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['pdf_parse_template'] parse_prompt = ChatPromptTemplate.from_template(template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | parse_prompt | llm | output_parser input_pdf = {"pdf_content": pdf_content} markdown = chain.invoke(input_pdf) return markdown def summarize_chunk(tenant, model_variables, document_version, chunk): current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['summary_template'] language_template = create_language_template(template, document_version.language) current_app.logger.debug(f'Language prompt: {language_template}') chain = load_summarize_chain(llm, chain_type='stuff', prompt=ChatPromptTemplate.from_template(language_template)) doc_creator = CharacterTextSplitter(chunk_size=model_variables['max_chunk_size'] * 2, chunk_overlap=0) text_to_summarize = doc_creator.create_documents(chunk) try: summary = chain.invoke({"text": text_to_summarize}) current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}.') return summary except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: {e}') raise def embed_chunks(tenant, model_variables, document_version, chunks): current_app.logger.debug(f'Embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') embedding_model = model_variables['embedding_model'] try: embeddings = embedding_model.embed_documents(chunks) current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') except LangChainException as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while calling OpenAI API' f'error: {e}') raise # Add embeddings to the database new_embeddings = [] for chunk, embedding in zip(chunks, embeddings): new_embedding = model_variables['embedding_db_model']() new_embedding.document_version = document_version new_embedding.active = True new_embedding.chunk = chunk new_embedding.embedding = embedding new_embeddings.append(new_embedding) return new_embeddings def parse_html(tenant, html_content, tags, included_elements=None, excluded_elements=None): soup = BeautifulSoup(html_content, 'html.parser') extracted_html = '' if included_elements: elements_to_parse = soup.find_all(included_elements) else: elements_to_parse = [soup] # parse the entire document if no included_elements specified if tenant.embed_tuning: current_app.embed_tuning_logger.debug(f'Tags to parse: {tags}') current_app.embed_tuning_logger.debug(f'Included Elements: {included_elements}') current_app.embed_tuning_logger.debug(f'Included Elements: {len(included_elements)}') current_app.embed_tuning_logger.debug(f'Excluded Elements: {excluded_elements}') current_app.embed_tuning_logger.debug(f'Found {len(elements_to_parse)} elements to parse') current_app.embed_tuning_logger.debug(f'First element to parse: {elements_to_parse[0]}') # Iterate through the found included elements for element in elements_to_parse: # Find all specified tags within each included element for sub_element in element.find_all(tags): if tenant.embed_tuning: current_app.embed_tuning_logger.debug(f'Found element: {sub_element.name}') if excluded_elements and sub_element.find_parent(excluded_elements): continue # Skip this sub_element if it's within any of the excluded_elements extracted_html += f'<{sub_element.name}>{sub_element.get_text(strip=True)}\n' title = soup.find('title').get_text(strip=True) return extracted_html, title def process_youtube(tenant, model_variables, document_version): base_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location) download_file_name = f'{document_version.id}.mp4' compressed_file_name = f'{document_version.id}.mp3' transcription_file_name = f'{document_version.id}.txt' markdown_file_name = f'{document_version.id}.md' # Remove existing files (in case of a re-processing of the file minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, download_file_name) minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, compressed_file_name) minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, transcription_file_name) minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, markdown_file_name) of, title, description, author = download_youtube(document_version.url, tenant.id, document_version, download_file_name) document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}' compress_audio(tenant.id, document_version, download_file_name, compressed_file_name) transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables) annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables) potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on Youtube document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on Youtube document version {document_version.id} :-)') def download_youtube(url, tenant_id, document_version, file_name): try: current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}') yt = YouTube(url) stream = yt.streams.get_audio_only() with tempfile.NamedTemporaryFile(delete=False) as temp_file: stream.download(output_path=temp_file.name) with open(temp_file.name, 'rb') as f: file_data = f.read() minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, file_name, file_data) current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}') return file_name, yt.title, yt.description, yt.author except Exception as e: current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}') raise def compress_audio(tenant_id, document_version, input_file, output_file): try: current_app.logger.info(f'Compressing audio for tenant: {tenant_id}') input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, input_file) with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input: temp_input.write(input_data) temp_input.flush() with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output: result = subprocess.run( ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name], capture_output=True, text=True ) if result.returncode != 0: raise Exception(f"Compression failed: {result.stderr}") with open(temp_output.name, 'rb') as f: compressed_data = f.read() minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, output_file, compressed_data) current_app.logger.info(f'Compressed audio for tenant: {tenant_id}') except Exception as e: current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}') raise def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables): try: current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}') client = model_variables['transcription_client'] model = model_variables['transcription_model'] # Download the audio file from MinIO audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, input_file) # Load the audio data into pydub audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) # Define segment length (e.g., 10 minutes) segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds transcriptions = [] # Split audio into segments and transcribe each for i, chunk in enumerate(audio[::segment_length]): current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}') with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: chunk.export(temp_audio.name, format="mp3") with open(temp_audio.name, 'rb') as audio_segment: transcription = client.audio.transcriptions.create( file=audio_segment, model=model, language=document_version.language, response_format='verbose_json', ) transcriptions.append(transcription.text) os.unlink(temp_audio.name) # Delete the temporary file # Combine all transcriptions full_transcription = " ".join(transcriptions) # Upload the full transcription to MinIO minio_client.upload_document_file( tenant_id, document_version.doc_id, document_version.language, document_version.id, output_file, full_transcription.encode('utf-8') ) current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}') except Exception as e: current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}') raise def annotate_transcription(tenant, document_version, input_file, output_file, model_variables): try: current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}') char_splitter = CharacterTextSplitter(separator='.', chunk_size=model_variables['annotation_chunk_length'], chunk_overlap=0) headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), ] markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) llm = model_variables['llm'] template = model_variables['transcript_template'] language_template = create_language_template(template, document_version.language) transcript_prompt = ChatPromptTemplate.from_template(language_template) setup = RunnablePassthrough() output_parser = StrOutputParser() # Download the transcription file from MinIO transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language, document_version.id, input_file) transcript = transcript_data.decode('utf-8') chain = setup | transcript_prompt | llm | output_parser chunks = char_splitter.split_text(transcript) all_markdown_chunks = [] last_markdown_chunk = '' for chunk in chunks: current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}') full_input = last_markdown_chunk + '\n' + chunk if tenant.embed_tuning: current_app.embed_tuning_logger.debug(f'Annotating chunk: \n ' f'------------------\n' f'{full_input}\n' f'------------------\n') input_transcript = {'transcript': full_input} markdown = chain.invoke(input_transcript) # GPT-4o returns some kind of content description: ```markdown ``` if markdown.startswith("```markdown"): markdown = "\n".join(markdown.strip().split("\n")[1:-1]) if tenant.embed_tuning: current_app.embed_tuning_logger.debug(f'Markdown Received: \n ' f'------------------\n' f'{markdown}\n' f'------------------\n') md_header_splits = markdown_splitter.split_text(markdown) markdown_chunks = [doc.page_content for doc in md_header_splits] # claude-3.5-sonnet returns introductory text if not markdown_chunks[0].startswith('#'): markdown_chunks.pop(0) last_markdown_chunk = markdown_chunks[-1] last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:]) markdown_chunks.pop() all_markdown_chunks += markdown_chunks all_markdown_chunks += [last_markdown_chunk] annotated_transcript = '\n'.join(all_markdown_chunks) # Upload the annotated transcript to MinIO minio_client.upload_document_file( tenant.id, document_version.doc_id, document_version.language, document_version.id, output_file, annotated_transcript.encode('utf-8') ) current_app.logger.info(f'Annotated transcription for tenant {tenant.id}') except Exception as e: current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}') raise def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): try: current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') # Download the markdown file from MinIO markdown_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, document_version.id, input_file ) markdown = markdown_data.decode('utf-8') headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), ] markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) md_header_splits = markdown_splitter.split_text(markdown) potential_chunks = [doc.page_content for doc in md_header_splits] current_app.logger.debug(f'Created {len(potential_chunks)} potential chunks for tenant {tenant_id}') return potential_chunks except Exception as e: current_app.logger.error(f'Error creating potential chunks for tenant {tenant_id}, with error: {e}') raise def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars): actual_chunks = [] current_chunk = "" current_length = 0 for chunk in potential_chunks: chunk_length = len(chunk) if current_length + chunk_length > max_chars: if current_length >= min_chars: actual_chunks.append(current_chunk) current_chunk = chunk current_length = chunk_length else: # If the combined chunk is still less than max_chars, keep adding current_chunk += f'\n{chunk}' current_length += chunk_length else: current_chunk += f'\n{chunk}' current_length += chunk_length # Handle the last chunk if current_chunk and current_length >= 0: actual_chunks.append(current_chunk) return actual_chunks pass