Improvements to Document Interface and correcting embedding workers

This commit is contained in:
Josako
2024-06-04 14:59:38 +02:00
parent c660c35de4
commit 61e1372dc8
15 changed files with 486 additions and 246 deletions

View File

@@ -23,6 +23,9 @@ def create_app(config_file=None):
from . import tasks
app.logger.info("EveAI Worker Server Started Successfully")
app.logger.info("-------------------------------------------------------------------------------------------------")
return app, celery

View File

@@ -11,17 +11,17 @@ from unstructured_client.models import shared
from unstructured_client.models.errors import SDKError
# OpenAI imports
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import CharacterTextSplitter
from langchain_core.exceptions import LangChainException
from common.utils.database import Database
from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI
from common.models.document import DocumentVersion
from common.models.user import Tenant
from common.extensions import db
from common.utils.celery_utils import current_celery
from common.utils.model_utils import select_model_variables
from bs4 import BeautifulSoup
@@ -35,59 +35,68 @@ def create_embeddings(tenant_id, document_version_id):
current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.')
# Retrieve Tenant for which we are processing
tenant = Tenant.query.get(tenant_id)
if tenant is None:
current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. '
f'Tenant not found')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
# Ensure we are working in the correct database schema
Database(tenant_id).switch_schema()
# Retrieve document version to process
document_version = DocumentVersion.query.get(document_version_id)
if document_version is None:
current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. '
f'Document version {document_version_id} not found')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
db.session.add(document_version)
# start processing
document_version.processing = True
document_version.processing_started_at = dt.now(tz.utc)
try:
# Retrieve Tenant for which we are processing
tenant = Tenant.query.get(tenant_id)
if tenant is None:
raise Exception(f'Tenant {tenant_id} not found')
# Ensure we are working in the correct database schema
Database(tenant_id).switch_schema()
# Select variables to work with depending on tenant and model
model_variables = select_model_variables(tenant)
# Retrieve document version to process
document_version = DocumentVersion.query.get(document_version_id)
if document_version is None:
raise Exception(f'Document version {document_version_id} not found')
except Exception as e:
current_app.logger.error(f'Create Embeddings request received '
f'for non existing document version {document_version_id} '
f'for tenant {tenant_id}, '
f'error: {e}')
raise
try:
db.session.add(document_version)
# start processing
document_version.processing = True
document_version.processing_started_at = dt.now(tz.utc)
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving document version {document_version_id} to database '
f'for tenant {tenant_id} when starting creating of embeddings. '
current_app.logger.error(f'Unable to save Embedding status information '
f'in document version {document_version_id} '
f'for tenant {tenant_id}')
raise
try:
match document_version.file_type:
case 'pdf':
process_pdf(tenant, model_variables, document_version)
case 'html':
process_html(tenant, model_variables, document_version)
case _:
raise Exception(f'No functionality defined for file type {document_version.file_type} '
f'for tenant {tenant_id} '
f'while creating embeddings for document version {document_version_id}')
except Exception as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} '
f'on document version {document_version_id} '
f'error: {e}')
document_version.processing = False
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing_error = str(e)[:255]
db.session.commit()
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
match document_version.file_type:
case 'pdf':
process_pdf(tenant, document_version)
case 'html':
process_html(tenant, document_version)
case _:
current_app.logger.info(f'No functionality defined for file type {document_version.file_type} '
f'for tenant {tenant_id} '
f'while creating embeddings for document version {document_version_id}')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
raise
@current_celery.task(name='ask_eve_ai', queue='llm_interactions')
def ask_eve_ai(query):
# Interaction logic with LLMs like GPT (Langchain API calls, etc.)
pass
def process_pdf(tenant, document_version):
def process_pdf(tenant, model_variables, document_version):
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
document_version.file_location,
document_version.file_name)
@@ -101,15 +110,15 @@ def process_pdf(tenant, document_version):
coordinates=True,
extract_image_block_types=['Image', 'Table'],
chunking_strategy='by_title',
combine_under_n_chars=current_app.config.get('MIN_CHUNK_SIZE'),
max_characters=current_app.config.get('MAX_CHUNK_SIZE'),
combine_under_n_chars=model_variables['min_chunk_size'],
max_characters=model_variables['max_chunk_size'],
)
else:
current_app.logger.error(f'The physical file for document version {document_version.id} '
f'for tenant {tenant.id} '
f'at {file_path} does not exist')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
raise
try:
chunks = partition_doc_unstructured(tenant, document_version, req)
@@ -118,13 +127,13 @@ def process_pdf(tenant, document_version):
f'while processing PDF on document version {document_version.id} '
f'error: {e}')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
raise
summary = summarize_chunk(tenant, document_version, chunks[0])
summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
doc_lang = document_version.document_language
doc_lang.system_context = f'Summary: {summary}\n'
enriched_chunks = enrich_chunks(tenant, document_version, chunks)
embeddings = embed_chunks(tenant, document_version, enriched_chunks)
embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
try:
db.session.add(doc_lang)
@@ -138,13 +147,14 @@ def process_pdf(tenant, document_version):
f'on PDF, document version {document_version.id}'
f'error: {e}')
db.session.rollback()
create_embeddings.update_state(state=states.FAILURE)
raise
current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
f'on document version {document_version.id} :-)')
def process_html(tenant, document_version):
def process_html(tenant, model_variables, document_version):
# The tags to be considered can be dependent on the tenant
html_tags = tenant.html_tags
end_tags = tenant.html_end_tags
@@ -163,22 +173,22 @@ def process_html(tenant, document_version):
f'for tenant {tenant.id} '
f'at {file_path} does not exist')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
raise
extracted_data, title = parse_html(html_content, html_tags, included_elements=included_elements,
excluded_elements=excluded_elements)
potential_chunks = create_potential_chunks(extracted_data, end_tags)
chunks = combine_chunks(potential_chunks,
current_app.config.get('MIN_CHUNK_SIZE'),
current_app.config.get('MAX_CHUNK_SIZE')
model_variables['min_chunk_size'],
model_variables['max_chunk_size']
)
summary = summarize_chunk(tenant, document_version, chunks[0])
summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
doc_lang = document_version.document_language
doc_lang.system_context = (f'Title: {title}\n'
f'Summary: {summary}\n')
enriched_chunks = enrich_chunks(tenant, document_version, chunks)
embeddings = embed_chunks(tenant, document_version, enriched_chunks)
embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
try:
db.session.add(doc_lang)
@@ -198,6 +208,8 @@ def process_html(tenant, document_version):
def enrich_chunks(tenant, document_version, chunks):
current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
doc_lang = document_version.document_language
chunk_total_context = (f'Filename: {document_version.file_name}\n'
f'{doc_lang.system_context}\n'
@@ -209,54 +221,36 @@ def enrich_chunks(tenant, document_version, chunks):
enriched_chunk = f'{chunk_total_context}\n{chunk}'
enriched_chunks.append(enriched_chunk)
current_app.logger.debug(f'Finished enriching chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
return enriched_chunks
def summarize_chunk(tenant, document_version, chunk):
llm_model = tenant.llm_model
llm_provider = llm_model.split('.', 1)[0]
llm_model = llm_model.split('.', 1)[1]
summary_template = ''
llm = None
match llm_provider:
case 'openai':
api_key = current_app.config.get('OPENAI_API_KEY')
llm = ChatOpenAI(api_key=api_key, temperature=0, model=llm_model)
match llm_model:
case 'gpt-4-turbo':
summary_template = current_app.config.get('GPT4_SUMMARY_TEMPLATE')
case 'gpt-3.5-turbo':
summary_template = current_app.config.get('GPT3_5_SUMMARY_TEMPLATE')
case _:
current_app.logger.error(f'Error summarizing initial chunk for tenant {tenant.id} '
f'on document version {document_version.id} '
f'error: Invalid llm model')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
case _:
current_app.logger.error(f'Error summarizing initial chunk for tenant {tenant.id} '
f'on document version {document_version.id} '
f'error: Invalid llm provider')
prompt = ChatPromptTemplate.from_template(summary_template)
def summarize_chunk(tenant, model_variables, document_version, chunk):
current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}')
llm = model_variables['llm']
prompt = model_variables['summary_prompt']
chain = load_summarize_chain(llm, chain_type='stuff', prompt=prompt)
doc_creator = CharacterTextSplitter(chunk_size=current_app.config.get('MAX_CHUNK_SIZE') * 2, chunk_overlap=0)
doc_creator = CharacterTextSplitter(chunk_size=model_variables['max_chunk_size'] * 2, chunk_overlap=0)
text_to_summarize = doc_creator.create_documents(chunk)
try:
summary = chain.run(text_to_summarize)
current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}.')
return summary
except LangChainException as e:
current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} '
f'on document version {document_version.id} '
f'error: {e}')
raise
return summary
def partition_doc_unstructured(tenant, document_version, unstructured_request):
current_app.logger.debug(f'Partitioning document version {document_version.id} for tenant {tenant.id}')
# Initiate the connection to unstructured.io
url = current_app.config.get('UNSTRUCTURED_FULL_URL')
api_key = current_app.config.get('UNSTRUCTURED_API_KEY')
@@ -273,6 +267,7 @@ def partition_doc_unstructured(tenant, document_version, unstructured_request):
pass
case 'Table':
chunks.append(el['metadata']['text_as_html'])
current_app.logger.debug(f'Finished partioning document version {document_version.id} for tenant {tenant.id}')
return chunks
except SDKError as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} '
@@ -281,33 +276,15 @@ def partition_doc_unstructured(tenant, document_version, unstructured_request):
raise
def embed_chunks(tenant, document_version, chunks):
embedding_provider = tenant.embedding_model.rsplit('.', 1)[0]
embedding_model = tenant.embedding_model.rsplit('.', 1)[1]
def embed_chunks(tenant, model_variables, document_version, chunks):
current_app.logger.debug(f'Embedding chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
embedding_model = model_variables['embedding_model']
match embedding_provider:
case 'openai':
match embedding_model:
case 'text-embedding-3-small':
return embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks)
case _:
current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} '
f'on document version {document_version.id} '
f'error: Invalid embedding model')
create_embeddings.update_state(state=states.FAILURE)
raise Ignore()
case _:
current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} '
f'on document version {document_version.id} '
f'error: Invalid embedding provider')
def embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks):
# Create embedding vectors using OpenAI
api_key = current_app.config.get('OPENAI_API_KEY')
embeddings_model = OpenAIEmbeddings(api_key=api_key, model='text-embedding-3-small')
try:
embeddings = embeddings_model.embed_documents(chunks)
embeddings = embedding_model.embed_documents(chunks)
current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
except LangChainException as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} '
f'on document version {document_version.id} while calling OpenAI API'
@@ -317,7 +294,7 @@ def embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks):
# Add embeddings to the database
new_embeddings = []
for chunk, embedding in zip(chunks, embeddings):
new_embedding = EmbeddingSmallOpenAI()
new_embedding = model_variables['embedding_db_model']()
new_embedding.document_version = document_version
new_embedding.active = True
new_embedding.chunk = chunk
@@ -327,10 +304,6 @@ def embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks):
return new_embeddings
def embed_chunks_for_mistral_embed(tenant_id, document_version, chunks):
pass
def parse_html(html_content, tags, included_elements=None, excluded_elements=None):
soup = BeautifulSoup(html_content, 'html.parser')
extracted_content = []