Files
eveAI/eveai_workers/tasks.py

267 lines
11 KiB
Python

from datetime import datetime as dt, timezone as tz
from flask import current_app
from sqlalchemy.exc import SQLAlchemyError
import os
# Unstructured commercial client imports
from unstructured_client import UnstructuredClient
from unstructured_client.models import shared
from unstructured_client.models.errors import SDKError
# OpenAI imports
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import CharacterTextSplitter
from langchain_core.exceptions import LangChainException
from common.utils.database import Database
from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI
from common.extensions import db
from common.utils.celery_utils import current_celery
from bs4 import BeautifulSoup
@current_celery.task(name='create_embeddings', queue='embeddings')
def create_embeddings(tenant_id, document_version_id, default_embedding_model):
# Setup Remote Debugging only if PYCHARM_DEBUG=True
if current_app.config['PYCHARM_DEBUG']:
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True)
current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id} '
f'with model {default_embedding_model}')
# Ensure we are working in the correct database schema
Database(tenant_id).switch_schema()
# Retrieve document version to process
document_version = DocumentVersion.query.get(document_version_id)
if document_version is None:
current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. '
f'Document version {document_version_id} not found')
return
db.session.add(document_version)
# start processing
document_version.processing = True
document_version.processing_started_at = dt.now(tz.utc)
try:
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving document version {document_version_id} to database '
f'for tenant {tenant_id} when creating embeddings. '
f'error: {e}')
return
embed_provider = default_embedding_model.rsplit('.', 1)[0]
embed_model = default_embedding_model.rsplit('.', 1)[1]
# define embedding variables
embedding_function = None
match (embed_provider, embed_model):
case ('openai', 'text-embedding-3-small'):
embedding_function = embed_chunks_for_text_embedding_3_small
case ('mistral', 'mistral.mistral-embed'):
embedding_function = embed_chunks_for_mistral_embed
match document_version.file_type:
case 'pdf':
process_pdf(tenant_id, document_version, embedding_function, default_embedding_model)
case 'html':
process_html(tenant_id, document_version, embedding_function, default_embedding_model)
case _:
current_app.logger.info(f'No functionality defined for file type {document_version.file_type} '
f'for tenant {tenant_id} '
f'while creating embeddings for document version {document_version_id}')
@current_celery.task(name='ask_eve_ai', queue='llm_interactions')
def ask_eve_ai(query):
# Interaction logic with LLMs like GPT (Langchain API calls, etc.)
pass
def process_pdf(tenant_id, document_version, embedding_function, embedding_model):
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
document_version.file_location,
document_version.file_name)
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
files = shared.Files(content=f.read(), file_name=document_version.file_name)
req = shared.PartitionParameters(
files=files,
strategy='hi_res',
hi_res_model_name='yolox',
coordinates=True,
extract_image_block_types=['Image', 'Table'],
chunking_strategy='by_title',
combine_under_n_chars=2000,
max_characters=3000,
)
try:
chunks = partition_doc_unstructured(tenant_id, document_version, req)
enriched_chunk_docs = enrich_chunks(tenant_id, document_version, chunks)
embeddings = embedding_function(tenant_id, document_version, enriched_chunk_docs)
except Exception as e:
current_app.logger.error(f'Unable to create Embeddings for tenant {tenant_id} '
f'on document version {document_version.id} '
f'with model {embedding_model} '
f'error: {e}')
raise
# Save embeddings & processing information to the database
db.session.add_all(embeddings)
db.session.add(document_version)
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing = False
try:
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving embedding information for tenant {tenant_id} '
f'on document version {document_version.id}'
f'error: {e}')
db.session.rollback()
raise
current_app.logger.info(f'Embeddings created successfully for tenant {tenant_id} '
f'on document version {document_version.id} :-)')
else: # file exists
current_app.logger.error(f'The physical file for document version {document_version.id} '
f'at {file_path} does not exist')
raise
def process_html(tenant_id, document_version, embedding_function, default_embedding_model):
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
document_version.file_location,
document_version.file_name)
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
html_content = f.read()
def enrich_chunks(tenant_id, document_version, chunks):
# We're adding filename and a summary of the first chunk to all the chunks to create global context
# using openAI to summarise
api_key = current_app.config.get('OPENAI_API_KEY')
# TODO: model selection to be adapted to model approach
llm = ChatOpenAI(api_key=api_key, temperature=0, model='gpt-4-turbo')
summary_template = current_app.config.get('GPT4_SUMMARY_TEMPLATE')
prompt = ChatPromptTemplate.from_template(summary_template)
chain = load_summarize_chain(llm, chain_type='stuff', prompt=prompt)
doc_creator = CharacterTextSplitter(chunk_size=9000, chunk_overlap=0)
text_to_summarize = doc_creator.create_documents(chunks[0])
try:
summary = chain.run(text_to_summarize)
doc_lang = document_version.document_language
db.session.add(doc_lang)
doc_lang.system_context = f'Summary:\n {summary}'
try:
db.session.commit()
except SQLAlchemyError as e:
current_app.logger. error(f'Error saving summary to DocumentLanguage {doc_lang.id} '
f'while enriching chunks for tenant {tenant_id} '
f'on document version {document_version.id} '
f'error: {e}')
db.session.rollback()
raise
chunk_global_context = (f'Filename: {doc_lang.document.name}\n'
f'User Context:\n{doc_lang.user_context}'
f'System Context:\n{summary}')
enriched_chunks = []
initial_chunk = f'Filename: {document_version.file_name}\n User Context:\n{doc_lang.user_context}\n{chunks[0]}'
enriched_chunks.append(initial_chunk)
for chunk in chunks[1:]:
enriched_chunk = f'{chunk_global_context}\n{chunk}'
enriched_chunks.append(enriched_chunk)
return enriched_chunks
except LangChainException as e:
current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant_id} '
f'on document version {document_version.id} '
f'error: {e}')
raise
def partition_doc_unstructured(tenant_id, document_version, unstructured_request):
# Initiate the connection to unstructured.io
url = current_app.config.get('UNSTRUCTURED_FULL_URL')
api_key = current_app.config.get('UNSTRUCTURED_API_KEY')
unstructured_client = UnstructuredClient(server_url=url, api_key_auth=api_key)
try:
res = unstructured_client.general.partition(unstructured_request)
chunks = []
for el in res.elements:
match el['type']:
case 'CompositeElement':
chunks.append(el['text'])
case 'Image':
pass
case 'Table':
chunks.append(el['metadata']['text_as_html'])
return chunks
except SDKError as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} '
f'on document version {document_version.id} while chuncking'
f'error: {e}')
raise
def embed_chunks_for_text_embedding_3_small(tenant_id, document_version, chunks):
# Create embedding vectors using OpenAI
api_key = current_app.config.get('OPENAI_API_KEY')
embeddings_model = OpenAIEmbeddings(api_key=api_key, model='text-embedding-3-small')
try:
embeddings = embeddings_model.embed_documents(chunks)
except LangChainException as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} '
f'on document version {document_version.id} while calling OpenAI API'
f'error: {e}')
raise
# Add embeddings to the database
new_embeddings = []
for chunk, embedding in zip(chunks, embeddings):
new_embedding = EmbeddingSmallOpenAI()
new_embedding.document_version = document_version
new_embedding.active = True
new_embedding.chunk = chunk
new_embedding.embedding = embedding
new_embeddings.append(new_embedding)
return new_embeddings
def embed_chunks_for_mistral_embed(tenant_id, document_version, chunks):
pass
def parse_html(html_content, included_elements=None, excluded_elements=None):
soup = BeautifulSoup(html_content, 'html.parser')
extracted_content = []
if included_elements:
elements_to_parse = soup.find_all(included_elements)
else:
elements_to_parse = [soup] # parse the entire document if no included_elements specified
# Iterate through the found included elements
for element in elements_to_parse:
# Find all specified tags within each included element
for sub_element in element.find_all(tags):
if excluded_elements and sub_element.find_parent(excluded_elements):
continue # Skip this sub_element if it's within any of the excluded_elements
extracted_content.append((sub_element.name, sub_element.get_text(strip=True)))
return extracted_content