Files
eveAI/eveai_workers/tasks.py
Josako 6cf660e622 - Adding a Tenant Type
- Allow filtering on Tenant Types & searching for parts of Tenant names
- Implement health checks
- Start Prometheus monitoring (needs to be finalized)
- Refine audio_processor and srt_processor to reduce duplicate code and support for larger files
- Introduce repopack to reason in LLMs about the code
2024-09-13 15:43:40 +02:00

577 lines
26 KiB
Python

import io
import os
from datetime import datetime as dt, timezone as tz
from celery import states
from flask import current_app
# OpenAI imports
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain_core.exceptions import LangChainException
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from sqlalchemy.exc import SQLAlchemyError
from common.extensions import db, minio_client
from common.models.document import DocumentVersion, Embedding
from common.models.user import Tenant
from common.utils.celery_utils import current_celery
from common.utils.database import Database
from common.utils.model_utils import select_model_variables, create_language_template
from common.utils.os_utils import safe_remove, sync_folder
from eveai_workers.Processors.audio_processor import AudioProcessor
from eveai_workers.Processors.html_processor import HTMLProcessor
from eveai_workers.Processors.pdf_processor import PDFProcessor
from eveai_workers.Processors.srt_processor import SRTProcessor
# Healthcheck task
@current_celery.task(name='ping', queue='embeddings')
def ping():
return 'pong'
@current_celery.task(name='create_embeddings', queue='embeddings')
def create_embeddings(tenant_id, document_version_id):
current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.')
try:
# Retrieve Tenant for which we are processing
tenant = Tenant.query.get(tenant_id)
if tenant is None:
raise Exception(f'Tenant {tenant_id} not found')
# Ensure we are working in the correct database schema
Database(tenant_id).switch_schema()
# Select variables to work with depending on tenant and model
model_variables = select_model_variables(tenant)
current_app.logger.debug(f'Model variables: {model_variables}')
# Retrieve document version to process
document_version = DocumentVersion.query.get(document_version_id)
if document_version is None:
raise Exception(f'Document version {document_version_id} not found')
except Exception as e:
current_app.logger.error(f'Create Embeddings request received '
f'for non existing document version {document_version_id} '
f'for tenant {tenant_id}, '
f'error: {e}')
raise
try:
db.session.add(document_version)
# start processing
document_version.processing = True
document_version.processing_started_at = dt.now(tz.utc)
document_version.processing_finished_at = None
document_version.processing_error = None
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Unable to save Embedding status information '
f'in document version {document_version_id} '
f'for tenant {tenant_id}')
raise
delete_embeddings_for_document_version(document_version)
try:
match document_version.file_type:
case 'pdf':
process_pdf(tenant, model_variables, document_version)
case 'html':
process_html(tenant, model_variables, document_version)
case 'srt':
process_srt(tenant, model_variables, document_version)
case 'mp4' | 'mp3' | 'ogg':
process_audio(tenant, model_variables, document_version)
case _:
raise Exception(f'No functionality defined for file type {document_version.file_type} '
f'for tenant {tenant_id} '
f'while creating embeddings for document version {document_version_id}')
except Exception as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} '
f'on document version {document_version_id} '
f'error: {e}')
document_version.processing = False
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing_error = str(e)[:255]
db.session.commit()
create_embeddings.update_state(state=states.FAILURE)
raise
def delete_embeddings_for_document_version(document_version):
embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all()
for embedding in embeddings_to_delete:
db.session.delete(embedding)
try:
db.session.commit()
current_app.logger.info(f'Deleted embeddings for document version {document_version.id}')
except SQLAlchemyError as e:
current_app.logger.error(f'Unable to delete embeddings for document version {document_version.id}')
raise
def process_pdf(tenant, model_variables, document_version):
processor = PDFProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
def process_html(tenant, model_variables, document_version):
processor = HTMLProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
def process_audio(tenant, model_variables, document_version):
processor = AudioProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
def process_srt(tenant, model_variables, document_version):
processor = SRTProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
def embed_markdown(tenant, model_variables, document_version, markdown, title):
# Create potential chunks
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md")
# Combine chunks for embedding
chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
model_variables['max_chunk_size'])
# Enrich chunks
enriched_chunks = enrich_chunks(tenant, model_variables, document_version, title, chunks)
# Create embeddings
embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
# Update document version and save embeddings
try:
db.session.add(document_version)
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing = False
db.session.add_all(embeddings)
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
f'on HTML, document version {document_version.id}'
f'error: {e}')
raise
current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
f'on document version {document_version.id} :-)')
def enrich_chunks(tenant, model_variables, document_version, title, chunks):
current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
summary = ''
if len(chunks) > 1:
summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
chunk_total_context = (f'Filename: {document_version.file_name}\n'
f'User Context:\n{document_version.user_context}\n\n'
f'User Metadata:\n{document_version.user_metadata}\n\n'
f'Title: {title}\n'
f'Summary:\n{summary}\n'
f'System Context:\n{document_version.system_context}\n\n'
f'System Metadata:\n{document_version.system_metadata}\n\n'
)
enriched_chunks = []
initial_chunk = (f'Filename: {document_version.file_name}\n'
f'User Context:\n{document_version.user_context}\n\n'
f'User Metadata:\n{document_version.user_metadata}\n\n'
f'Title: {title}\n'
f'System Context:\n{document_version.system_context}\n\n'
f'System Metadata:\n{document_version.system_metadata}\n\n'
f'{chunks[0]}'
)
enriched_chunks.append(initial_chunk)
for chunk in chunks[1:]:
enriched_chunk = f'{chunk_total_context}\n{chunk}'
enriched_chunks.append(enriched_chunk)
current_app.logger.debug(f'Finished enriching chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
return enriched_chunks
def summarize_chunk(tenant, model_variables, document_version, chunk):
current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}')
llm = model_variables['llm']
template = model_variables['summary_template']
language_template = create_language_template(template, document_version.language)
summary_prompt = ChatPromptTemplate.from_template(language_template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | summary_prompt | llm | output_parser
try:
current_app.logger.debug(f'Starting summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}')
summary = chain.invoke({"text": chunk})
current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}.')
return summary
except LangChainException as e:
current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} '
f'on document version {document_version.id} '
f'error: {e}')
raise
def embed_chunks(tenant, model_variables, document_version, chunks):
current_app.logger.debug(f'Embedding chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
embedding_model = model_variables['embedding_model']
try:
embeddings = embedding_model.embed_documents(chunks)
current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
except LangChainException as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} '
f'on document version {document_version.id} while calling OpenAI API'
f'error: {e}')
raise
# Add embeddings to the database
new_embeddings = []
for chunk, embedding in zip(chunks, embeddings):
new_embedding = model_variables['embedding_db_model']()
new_embedding.document_version = document_version
new_embedding.active = True
new_embedding.chunk = chunk
new_embedding.embedding = embedding
new_embeddings.append(new_embedding)
return new_embeddings
def log_parsing_info(tenant, tags, included_elements, excluded_elements, excluded_classes, elements_to_parse):
if tenant.embed_tuning:
current_app.embed_tuning_logger.debug(f'Tags to parse: {tags}')
current_app.embed_tuning_logger.debug(f'Included Elements: {included_elements}')
current_app.embed_tuning_logger.debug(f'Excluded Elements: {excluded_elements}')
current_app.embed_tuning_logger.debug(f'Excluded Classes: {excluded_classes}')
current_app.embed_tuning_logger.debug(f'Found {len(elements_to_parse)} elements to parse')
current_app.embed_tuning_logger.debug(f'First element to parse: {elements_to_parse[0]}')
# def process_youtube(tenant, model_variables, document_version):
# download_file_name = f'{document_version.id}.mp4'
# compressed_file_name = f'{document_version.id}.mp3'
# transcription_file_name = f'{document_version.id}.txt'
# markdown_file_name = f'{document_version.id}.md'
#
# # Remove existing files (in case of a re-processing of the file
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, download_file_name)
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, compressed_file_name)
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, transcription_file_name)
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, markdown_file_name)
#
# of, title, description, author = download_youtube(document_version.url, tenant.id, document_version,
# download_file_name)
# document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}'
# compress_audio(tenant.id, document_version, download_file_name, compressed_file_name)
# transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables)
# annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables)
#
# potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name)
# actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
# model_variables['max_chunk_size'])
#
# enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks)
# embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
#
# try:
# db.session.add(document_version)
# document_version.processing_finished_at = dt.now(tz.utc)
# document_version.processing = False
# db.session.add_all(embeddings)
# db.session.commit()
# except SQLAlchemyError as e:
# current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
# f'on Youtube document version {document_version.id}'
# f'error: {e}')
# raise
#
# current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
# f'on Youtube document version {document_version.id} :-)')
#
#
# def download_youtube(url, tenant_id, document_version, file_name):
# try:
# current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}')
# yt = YouTube(url)
# stream = yt.streams.get_audio_only()
#
# with tempfile.NamedTemporaryFile(delete=False) as temp_file:
# stream.download(output_path=temp_file.name)
# with open(temp_file.name, 'rb') as f:
# file_data = f.read()
#
# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id,
# file_name, file_data)
#
# current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}')
# return file_name, yt.title, yt.description, yt.author
# except Exception as e:
# current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}')
# raise
#
#
# def compress_audio(tenant_id, document_version, input_file, output_file):
# try:
# current_app.logger.info(f'Compressing audio for tenant: {tenant_id}')
#
# input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id, input_file)
#
# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input:
# temp_input.write(input_data)
# temp_input.flush()
#
# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output:
# result = subprocess.run(
# ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name],
# capture_output=True,
# text=True
# )
#
# if result.returncode != 0:
# raise Exception(f"Compression failed: {result.stderr}")
#
# with open(temp_output.name, 'rb') as f:
# compressed_data = f.read()
#
# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id,
# output_file, compressed_data)
#
# current_app.logger.info(f'Compressed audio for tenant: {tenant_id}')
# except Exception as e:
# current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}')
# raise
#
#
# def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables):
# try:
# current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}')
# client = model_variables['transcription_client']
# model = model_variables['transcription_model']
#
# # Download the audio file from MinIO
# audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id, input_file)
#
# # Load the audio data into pydub
# audio = AudioSegment.from_mp3(io.BytesIO(audio_data))
#
# # Define segment length (e.g., 10 minutes)
# segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds
#
# transcriptions = []
#
# # Split audio into segments and transcribe each
# for i, chunk in enumerate(audio[::segment_length]):
# current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}')
#
# with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio:
# chunk.export(temp_audio.name, format="mp3")
#
# with open(temp_audio.name, 'rb') as audio_segment:
# transcription = client.audio.transcriptions.create(
# file=audio_segment,
# model=model,
# language=document_version.language,
# response_format='verbose_json',
# )
#
# transcriptions.append(transcription.text)
#
# os.unlink(temp_audio.name) # Delete the temporary file
#
# # Combine all transcriptions
# full_transcription = " ".join(transcriptions)
#
# # Upload the full transcription to MinIO
# minio_client.upload_document_file(
# tenant_id,
# document_version.doc_id,
# document_version.language,
# document_version.id,
# output_file,
# full_transcription.encode('utf-8')
# )
#
# current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}')
# except Exception as e:
# current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}')
# raise
#
#
# def annotate_transcription(tenant, document_version, input_file, output_file, model_variables):
# try:
# current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}')
#
# char_splitter = CharacterTextSplitter(separator='.',
# chunk_size=model_variables['annotation_chunk_length'],
# chunk_overlap=0)
#
# headers_to_split_on = [
# ("#", "Header 1"),
# ("##", "Header 2"),
# ]
# markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False)
#
# llm = model_variables['llm']
# template = model_variables['transcript_template']
# language_template = create_language_template(template, document_version.language)
# transcript_prompt = ChatPromptTemplate.from_template(language_template)
# setup = RunnablePassthrough()
# output_parser = StrOutputParser()
#
# # Download the transcription file from MinIO
# transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id,
# document_version.language, document_version.id,
# input_file)
# transcript = transcript_data.decode('utf-8')
#
# chain = setup | transcript_prompt | llm | output_parser
#
# chunks = char_splitter.split_text(transcript)
# all_markdown_chunks = []
# last_markdown_chunk = ''
# for chunk in chunks:
# current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}')
# full_input = last_markdown_chunk + '\n' + chunk
# if tenant.embed_tuning:
# current_app.embed_tuning_logger.debug(f'Annotating chunk: \n '
# f'------------------\n'
# f'{full_input}\n'
# f'------------------\n')
# input_transcript = {'transcript': full_input}
# markdown = chain.invoke(input_transcript)
# # GPT-4o returns some kind of content description: ```markdown <text> ```
# if markdown.startswith("```markdown"):
# markdown = "\n".join(markdown.strip().split("\n")[1:-1])
# if tenant.embed_tuning:
# current_app.embed_tuning_logger.debug(f'Markdown Received: \n '
# f'------------------\n'
# f'{markdown}\n'
# f'------------------\n')
# md_header_splits = markdown_splitter.split_text(markdown)
# markdown_chunks = [doc.page_content for doc in md_header_splits]
# # claude-3.5-sonnet returns introductory text
# if not markdown_chunks[0].startswith('#'):
# markdown_chunks.pop(0)
# last_markdown_chunk = markdown_chunks[-1]
# last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:])
# markdown_chunks.pop()
# all_markdown_chunks += markdown_chunks
#
# all_markdown_chunks += [last_markdown_chunk]
#
# annotated_transcript = '\n'.join(all_markdown_chunks)
#
# # Upload the annotated transcript to MinIO
# minio_client.upload_document_file(
# tenant.id,
# document_version.doc_id,
# document_version.language,
# document_version.id,
# output_file,
# annotated_transcript.encode('utf-8')
# )
#
# current_app.logger.info(f'Annotated transcription for tenant {tenant.id}')
# except Exception as e:
# current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}')
# raise
def create_potential_chunks_for_markdown(tenant_id, document_version, input_file):
try:
current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}')
# Download the markdown file from MinIO
markdown_data = minio_client.download_document_file(tenant_id,
document_version.doc_id,
document_version.language,
document_version.id,
input_file
)
markdown = markdown_data.decode('utf-8')
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False)
md_header_splits = markdown_splitter.split_text(markdown)
potential_chunks = [doc.page_content for doc in md_header_splits]
current_app.logger.debug(f'Created {len(potential_chunks)} potential chunks for tenant {tenant_id}')
return potential_chunks
except Exception as e:
current_app.logger.error(f'Error creating potential chunks for tenant {tenant_id}, with error: {e}')
raise
def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars):
actual_chunks = []
current_chunk = ""
current_length = 0
for chunk in potential_chunks:
chunk_length = len(chunk)
if current_length + chunk_length > max_chars:
if current_length >= min_chars:
actual_chunks.append(current_chunk)
current_chunk = chunk
current_length = chunk_length
else:
# If the combined chunk is still less than max_chars, keep adding
current_chunk += f'\n{chunk}'
current_length += chunk_length
else:
current_chunk += f'\n{chunk}'
current_length += chunk_length
# Handle the last chunk
if current_chunk and current_length >= 0:
actual_chunks.append(current_chunk)
return actual_chunks