import os from datetime import datetime as dt, timezone as tz import gevent from bs4 import BeautifulSoup import html from celery import states from flask import current_app # OpenAI imports from langchain.chains.summarize import load_summarize_chain from langchain.text_splitter import CharacterTextSplitter, MarkdownHeaderTextSplitter from langchain_core.exceptions import LangChainException from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from sqlalchemy.exc import SQLAlchemyError from pytube import YouTube import PyPDF2 from common.extensions import db from common.models.document import DocumentVersion, Embedding from common.models.user import Tenant from common.utils.celery_utils import current_celery from common.utils.database import Database from common.utils.model_utils import select_model_variables, create_language_template @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): # Setup Remote Debugging only if PYCHARM_DEBUG=True if current_app.config['PYCHARM_DEBUG']: import pydevd_pycharm pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True) current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.') try: # Retrieve Tenant for which we are processing tenant = Tenant.query.get(tenant_id) if tenant is None: raise Exception(f'Tenant {tenant_id} not found') # Ensure we are working in the correct database schema Database(tenant_id).switch_schema() # Select variables to work with depending on tenant and model model_variables = select_model_variables(tenant) # Retrieve document version to process document_version = DocumentVersion.query.get(document_version_id) if document_version is None: raise Exception(f'Document version {document_version_id} not found') except Exception as e: current_app.logger.error(f'Create Embeddings request received ' f'for non existing document version {document_version_id} ' f'for tenant {tenant_id}, ' f'error: {e}') raise try: db.session.add(document_version) # start processing document_version.processing = True document_version.processing_started_at = dt.now(tz.utc) document_version.processing_finished_at = None document_version.processing_error = None db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Unable to save Embedding status information ' f'in document version {document_version_id} ' f'for tenant {tenant_id}') raise delete_embeddings_for_document_version(document_version) try: match document_version.file_type: case 'pdf': process_pdf(tenant, model_variables, document_version) case 'html': process_html(tenant, model_variables, document_version) case 'youtube': process_youtube(tenant, model_variables, document_version) case _: raise Exception(f'No functionality defined for file type {document_version.file_type} ' f'for tenant {tenant_id} ' f'while creating embeddings for document version {document_version_id}') except Exception as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' f'on document version {document_version_id} ' f'error: {e}') document_version.processing = False document_version.processing_finished_at = dt.now(tz.utc) document_version.processing_error = str(e)[:255] db.session.commit() create_embeddings.update_state(state=states.FAILURE) raise def process_pdf(tenant, model_variables, document_version): base_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location) file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, document_version.file_name) if os.path.exists(file_path): pdf_text = '' # Function to extract text from PDF and return as string with open(file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page_num in range(len(reader.pages)): page = reader.pages[page_num] pdf_text += page.extract_text() else: current_app.logger.error(f'The physical file for document version {document_version.id} ' f'for tenant {tenant.id} ' f'at {file_path} does not exist') create_embeddings.update_state(state=states.FAILURE) raise markdown = generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_text) markdown_file_name = f'{document_version.id}.md' output_file = os.path.join(base_path, markdown_file_name) with open(output_file, 'w') as f: f.write(markdown) potential_chunks = create_potential_chunks_for_markdown(base_path, markdown_file_name, tenant) chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) if len(chunks) > 1: summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) document_version.system_context = f'Summary: {summary}\n' else: document_version.system_context = '' enriched_chunks = enrich_chunks(tenant, document_version, chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on HTML, document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') def delete_embeddings_for_document_version(document_version): embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all() for embedding in embeddings_to_delete: db.session.delete(embedding) try: db.session.commit() current_app.logger.info(f'Deleted embeddings for document version {document_version.id}') except SQLAlchemyError as e: current_app.logger.error(f'Unable to delete embeddings for document version {document_version.id}') raise def process_html(tenant, model_variables, document_version): # The tags to be considered can be dependent on the tenant html_tags = model_variables['html_tags'] html_end_tags = model_variables['html_end_tags'] html_included_elements = model_variables['html_included_elements'] html_excluded_elements = model_variables['html_excluded_elements'] base_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location) file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, document_version.file_name) if os.path.exists(file_path): with open(file_path, 'rb') as f: html_content = f.read() else: current_app.logger.error(f'The physical file for document version {document_version.id} ' f'for tenant {tenant.id} ' f'at {file_path} does not exist') create_embeddings.update_state(state=states.FAILURE) raise extracted_html, title = parse_html(html_content, html_tags, included_elements=html_included_elements, excluded_elements=html_excluded_elements) extracted_file_name = f'{document_version.id}-extracted.html' output_file = os.path.join(base_path, extracted_file_name) with open(output_file, 'w') as f: f.write(extracted_html) markdown = generate_markdown_from_html(tenant, model_variables, document_version, extracted_html) markdown_file_name = f'{document_version.id}.md' output_file = os.path.join(base_path, markdown_file_name) with open(output_file, 'w') as f: f.write(markdown) potential_chunks = create_potential_chunks_for_markdown(base_path, markdown_file_name, tenant) chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) if len(chunks) > 1: summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) document_version.system_context = (f'Title: {title}\n' f'Summary: {summary}\n') else: document_version.system_context = (f'Title: {title}\n') enriched_chunks = enrich_chunks(tenant, document_version, chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on HTML, document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') def enrich_chunks(tenant, document_version, chunks): current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') current_app.logger.debug(f'Nr of chunks: {len(chunks)}') chunk_total_context = (f'Filename: {document_version.file_name}\n' f'User Context:{document_version.user_context}\n' f'{document_version.system_context}\n\n') enriched_chunks = [] initial_chunk = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' f'{chunks[0]}') enriched_chunks.append(initial_chunk) for chunk in chunks[1:]: enriched_chunk = f'{chunk_total_context}\n{chunk}' enriched_chunks.append(enriched_chunk) current_app.logger.debug(f'Finished enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') return enriched_chunks def generate_markdown_from_html(tenant, model_variables, document_version, html_content): current_app.logger.debug(f'Generating Markdown from HTML for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['html_parse_template'] parse_prompt = ChatPromptTemplate.from_template(template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | parse_prompt | llm | output_parser input_html = {"html": html_content} markdown = chain.invoke(input_html) return markdown def generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_content): current_app.logger.debug(f'Generating Markdown from PDF for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['pdf_parse_template'] parse_prompt = ChatPromptTemplate.from_template(template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | parse_prompt | llm | output_parser input_pdf = {"pdf_content": pdf_content} markdown = chain.invoke(input_pdf) return markdown def summarize_chunk(tenant, model_variables, document_version, chunk): current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] template = model_variables['summary_template'] language_template = create_language_template(template, document_version.language) current_app.logger.debug(f'Language prompt: {language_template}') chain = load_summarize_chain(llm, chain_type='stuff', prompt=ChatPromptTemplate.from_template(language_template)) doc_creator = CharacterTextSplitter(chunk_size=model_variables['max_chunk_size'] * 2, chunk_overlap=0) text_to_summarize = doc_creator.create_documents(chunk) try: summary = chain.run(text_to_summarize) current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}.') return summary except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: {e}') raise def embed_chunks(tenant, model_variables, document_version, chunks): current_app.logger.debug(f'Embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') embedding_model = model_variables['embedding_model'] try: embeddings = embedding_model.embed_documents(chunks) current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') except LangChainException as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while calling OpenAI API' f'error: {e}') raise # Add embeddings to the database new_embeddings = [] for chunk, embedding in zip(chunks, embeddings): new_embedding = model_variables['embedding_db_model']() new_embedding.document_version = document_version new_embedding.active = True new_embedding.chunk = chunk new_embedding.embedding = embedding new_embeddings.append(new_embedding) return new_embeddings def parse_html(html_content, tags, included_elements=None, excluded_elements=None): soup = BeautifulSoup(html_content, 'html.parser') extracted_html = '' if included_elements: elements_to_parse = soup.find_all(included_elements) else: elements_to_parse = [soup] # parse the entire document if no included_elements specified current_app.embed_tuning_logger.debug(f'Included Elements: {included_elements}') current_app.embed_tuning_logger.debug(f'Included Elements: {len(included_elements)}') current_app.embed_tuning_logger.debug(f'Excluded Elements: {excluded_elements}') current_app.embed_tuning_logger.debug(f'Found {len(elements_to_parse)} elements to parse') # Iterate through the found included elements for element in elements_to_parse: # Find all specified tags within each included element for sub_element in element.find_all(tags): if excluded_elements and sub_element.find_parent(excluded_elements): continue # Skip this sub_element if it's within any of the excluded_elements sub_content = html.unescape(sub_element.get_text(strip=False)) extracted_html += f'<{sub_element.name}>{sub_element.get_text(strip=True)}\n' title = soup.find('title').get_text(strip=True) return extracted_html, title def process_youtube(tenant, model_variables, document_version): base_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location) download_file_name = f'{document_version.id}.mp4' compressed_file_name = f'{document_version.id}.mp3' transcription_file_name = f'{document_version.id}.txt' markdown_file_name = f'{document_version.id}.md' of, title, description, author = download_youtube(document_version.url, base_path, download_file_name, tenant) document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}' compress_audio(base_path, download_file_name, compressed_file_name, tenant) transcribe_audio(base_path, compressed_file_name, transcription_file_name, document_version.language, tenant, model_variables) annotate_transcription(base_path, transcription_file_name, markdown_file_name, tenant, model_variables) potential_chunks = create_potential_chunks_for_markdown(base_path, markdown_file_name, tenant) actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks) embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on Youtube document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on Youtube document version {document_version.id} :-)') def download_youtube(url, file_location, file_name, tenant): try: current_app.logger.info(f'Downloading YouTube video: {url} on location {file_location} for tenant: {tenant.id}') yt = YouTube(url) stream = yt.streams.get_audio_only() output_file = stream.download(output_path=file_location, filename=file_name) current_app.logger.info(f'Downloaded YouTube video: {url} on location {file_location} for tenant: {tenant.id}') return output_file, yt.title, yt.description, yt.author except Exception as e: current_app.logger.error(f'Error downloading YouTube video: {url} on location {file_location} for ' f'tenant: {tenant.id} with error: {e}') raise def compress_audio(file_location, input_file, output_file, tenant): try: current_app.logger.info(f'Compressing audio on {file_location} for tenant: {tenant.id}') result = os.popen(f'scripts/compress.sh -d {file_location} -i {input_file} -o {output_file}') output_file_path = os.path.join(file_location, output_file) count = 0 while not os.path.exists(output_file_path) and count < 10: gevent.sleep(1) current_app.logger.debug(f'Waiting for {output_file_path} to be created... Count: {count}') count += 1 current_app.logger.info(f'Compressed audio for {file_location} for tenant: {tenant.id}') return result except Exception as e: current_app.logger.error(f'Error compressing audio on {file_location} for tenant: {tenant.id} with error: {e}') raise def transcribe_audio(file_location, input_file, output_file, language, tenant, model_variables): try: current_app.logger.info(f'Transcribing audio on {file_location} for tenant: {tenant.id}') client = model_variables['transcription_client'] model = model_variables['transcription_model'] input_file_path = os.path.join(file_location, input_file) output_file_path = os.path.join(file_location, output_file) count = 0 while not os.path.exists(input_file_path) and count < 10: gevent.sleep(1) current_app.logger.debug(f'Waiting for {input_file_path} to exist... Count: {count}') count += 1 with open(input_file_path, 'rb') as audio_file: transcription = client.audio.transcriptions.create( file=audio_file, model=model, language=language, response_format='verbose_json', ) with open(output_file_path, 'w') as transcript_file: transcript_file.write(transcription.text) current_app.logger.info(f'Transcribed audio for {file_location} for tenant: {tenant.id}') except Exception as e: current_app.logger.error(f'Error transcribing audio for {file_location} for tenant: {tenant.id}, ' f'with error: {e}') raise def annotate_transcription(file_location, input_file, output_file, tenant, model_variables): try: current_app.logger.debug(f'Annotating transcription on {file_location} for tenant {tenant.id}') llm = model_variables['llm'] template = model_variables['transcript_template'] transcript_prompt = ChatPromptTemplate.from_template(template) setup = RunnablePassthrough() output_parser = StrOutputParser() transcript = '' with open(os.path.join(file_location, input_file), 'r') as f: transcript = f.read() chain = setup | transcript_prompt | llm | output_parser input_transcript = {"transcript": transcript} annotated_transcript = chain.invoke(input_transcript) with open(os.path.join(file_location, output_file), 'w') as f: f.write(annotated_transcript) current_app.logger.info(f'Annotated transcription for {file_location} for tenant {tenant.id}') except Exception as e: current_app.logger.error(f'Error annotating transcription for {file_location} for tenant {tenant.id}, ' f'with error: {e}') raise def create_potential_chunks_for_markdown(base_path, input_file, tenant): current_app.logger.info(f'Creating potential chunks for {base_path} for tenant {tenant.id}') markdown = '' with open(os.path.join(base_path, input_file), 'r') as f: markdown = f.read() headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), # ("###", "Header 3"), ] markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) md_header_splits = markdown_splitter.split_text(markdown) potential_chunks = [doc.page_content for doc in md_header_splits] return potential_chunks def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars): actual_chunks = [] current_chunk = "" current_length = 0 for chunk in potential_chunks: chunk_length = len(chunk) if current_length + chunk_length > max_chars: if current_length >= min_chars: actual_chunks.append(current_chunk) current_chunk = chunk current_length = chunk_length else: # If the combined chunk is still less than max_chars, keep adding current_chunk += f'\n{chunk}' current_length += chunk_length else: current_chunk += f'\n{chunk}' current_length += chunk_length # Handle the last chunk if current_chunk and current_length >= 0: actual_chunks.append(current_chunk) return actual_chunks pass