From 6c2e99f4670d513e8c68791adad5f619a151b3c8 Mon Sep 17 00:00:00 2001 From: Josako Date: Mon, 13 May 2024 17:18:38 +0200 Subject: [PATCH] Realise processing of HTML and improve both HTML & PDF processing giving new tenant information. --- config/config.py | 3 + eveai_app/views/document_forms.py | 5 - eveai_app/views/document_views.py | 34 ++-- eveai_workers/tasks.py | 322 +++++++++++++++++++++--------- 4 files changed, 253 insertions(+), 111 deletions(-) diff --git a/config/config.py b/config/config.py index d731a3a..3bd71a8 100644 --- a/config/config.py +++ b/config/config.py @@ -59,6 +59,9 @@ class Config(object): GPT4_SUMMARY_TEMPLATE = """Write a concise summary of the text in the same language as the provided text. Text is delimited between triple backquotes. ```{text}```""" + GPT3_5_SUMMARY_TEMPLATE = """Write a concise summary of the text in the same language as the provided text. + Text is delimited between triple backquotes. + ```{text}```""" class DevConfig(Config): diff --git a/eveai_app/views/document_forms.py b/eveai_app/views/document_forms.py index 39fcf9d..572ef8e 100644 --- a/eveai_app/views/document_forms.py +++ b/eveai_app/views/document_forms.py @@ -13,7 +13,6 @@ class AddDocumentForm(FlaskForm): language = SelectField('Language', choices=[], validators=[Optional()]) user_context = TextAreaField('User Context', validators=[Optional()]) valid_from = DateField('Valid from', id='form-control datepicker', validators=[Optional()]) - doc_embedding_model = SelectField('Default Embedding Model', choices=[], validators=[DataRequired()]) submit = SubmitField('Submit') @@ -23,8 +22,6 @@ class AddDocumentForm(FlaskForm): session.get('tenant').get('allowed_languages')] self.language.data = session.get('default_language') - self.doc_embedding_model.data = session.get('embedding_model') - class AddURLForm(FlaskForm): url = URLField('URL', validators=[DataRequired(), URL()]) @@ -32,7 +29,6 @@ class AddURLForm(FlaskForm): language = SelectField('Language', choices=[], validators=[Optional()]) user_context = TextAreaField('User Context', validators=[Optional()]) valid_from = DateField('Valid from', id='form-control datepicker', validators=[Optional()]) - doc_embedding_model = SelectField('Embedding Model', choices=[], validators=[DataRequired()]) submit = SubmitField('Submit') @@ -41,4 +37,3 @@ class AddURLForm(FlaskForm): self.language.choices = [(language, language) for language in session.get('tenant').get('allowed_languages')] self.language.data = session.get('default_language') - self.doc_embedding_model.data = session.get('default_embedding_model') diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index b105957..e556cd0 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -38,7 +38,15 @@ def add_document(): filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() - create_document_stack(form, file, filename, extension) + new_doc, new_doc_lang, new_doc_vers = create_document_stack(form, file, filename, extension) + + task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ + session['tenant']['id'], + new_doc_vers.id, + ]) + current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' + f'Document Version {new_doc_vers.id}. ' + f'Embedding creation task: {task.id}') return redirect(url_for('document_bp.documents')) @@ -67,7 +75,16 @@ def add_url(): filename += '.html' extension = 'html' - create_document_stack(form, file, filename, extension) + new_doc, new_doc_lang, new_doc_vers = create_document_stack(form, file, filename, extension) + + task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ + session['tenant']['id'], + new_doc_vers.id, + ]) + current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' + f'Document Version {new_doc_vers.id}. ' + f'Embedding creation task: {task.id}') + return redirect(url_for('document_bp.documents')) return render_template('document/add_url.html', form=form) @@ -123,9 +140,7 @@ def create_document_stack(form, file, filename, extension): db.session.add(new_doc) db.session.add(new_doc_lang) db.session.add(new_doc_vers) - log_session_state(db.session, "Before first commit") db.session.commit() - log_session_state(db.session, "After first commit") except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') flash('Error adding document.', 'error') @@ -140,9 +155,7 @@ def create_document_stack(form, file, filename, extension): new_doc_lang = db.session.merge(new_doc_lang) new_doc_vers = db.session.merge(new_doc_vers) new_doc_lang.latest_version_id = new_doc_vers.id - log_session_state(db.session, "Before second commit") db.session.commit() - log_session_state(db.session, "After second commit") except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') flash('Error adding document.', 'error') @@ -160,15 +173,8 @@ def create_document_stack(form, file, filename, extension): f'Document Version {new_doc.id}') upload_file_for_version(new_doc_vers, file, extension) - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - session['tenant']['id'], - new_doc_vers.id, - session['default_embedding_model'], - ]) - current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' - f'Document Version {new_doc_vers.id}. ' - f'Embedding creation task: {task.id}') + return new_doc, new_doc_lang, new_doc_vers def log_session_state(session, msg=""): diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 9f04823..e823f4f 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -1,7 +1,8 @@ from datetime import datetime as dt, timezone as tz from flask import current_app from sqlalchemy.exc import SQLAlchemyError - +from celery import states +from celery.exceptions import Ignore import os # Unstructured commercial client imports @@ -18,6 +19,7 @@ from langchain_core.exceptions import LangChainException from common.utils.database import Database from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI +from common.models.user import Tenant from common.extensions import db from common.utils.celery_utils import current_celery @@ -25,14 +27,21 @@ from bs4 import BeautifulSoup @current_celery.task(name='create_embeddings', queue='embeddings') -def create_embeddings(tenant_id, document_version_id, default_embedding_model): +def create_embeddings(tenant_id, document_version_id): # Setup Remote Debugging only if PYCHARM_DEBUG=True if current_app.config['PYCHARM_DEBUG']: import pydevd_pycharm pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True) - current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id} ' - f'with model {default_embedding_model}') + current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.') + + # Retrieve Tenant for which we are processing + tenant = Tenant.query.get(tenant_id) + if tenant is None: + current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. ' + f'Tenant not found') + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() # Ensure we are working in the correct database schema Database(tenant_id).switch_schema() @@ -42,7 +51,9 @@ def create_embeddings(tenant_id, document_version_id, default_embedding_model): if document_version is None: current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. ' f'Document version {document_version_id} not found') - return + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() + db.session.add(document_version) # start processing @@ -52,29 +63,22 @@ def create_embeddings(tenant_id, document_version_id, default_embedding_model): db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving document version {document_version_id} to database ' - f'for tenant {tenant_id} when creating embeddings. ' + f'for tenant {tenant_id} when starting creating of embeddings. ' f'error: {e}') - return - - embed_provider = default_embedding_model.rsplit('.', 1)[0] - embed_model = default_embedding_model.rsplit('.', 1)[1] - # define embedding variables - embedding_function = None - match (embed_provider, embed_model): - case ('openai', 'text-embedding-3-small'): - embedding_function = embed_chunks_for_text_embedding_3_small - case ('mistral', 'mistral.mistral-embed'): - embedding_function = embed_chunks_for_mistral_embed + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() match document_version.file_type: case 'pdf': - process_pdf(tenant_id, document_version, embedding_function, default_embedding_model) + process_pdf(tenant, document_version) case 'html': - process_html(tenant_id, document_version, embedding_function, default_embedding_model) + process_html(tenant, document_version) case _: current_app.logger.info(f'No functionality defined for file type {document_version.file_type} ' f'for tenant {tenant_id} ' f'while creating embeddings for document version {document_version_id}') + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() @current_celery.task(name='ask_eve_ai', queue='llm_interactions') @@ -83,7 +87,7 @@ def ask_eve_ai(query): pass -def process_pdf(tenant_id, document_version, embedding_function, embedding_model): +def process_pdf(tenant, document_version): file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, document_version.file_name) @@ -97,102 +101,162 @@ def process_pdf(tenant_id, document_version, embedding_function, embedding_model coordinates=True, extract_image_block_types=['Image', 'Table'], chunking_strategy='by_title', - combine_under_n_chars=2000, - max_characters=3000, + combine_under_n_chars=current_app.config.get('MIN_CHUNK_SIZE'), + max_characters=current_app.config.get('MAX_CHUNK_SIZE'), ) - try: - chunks = partition_doc_unstructured(tenant_id, document_version, req) - enriched_chunk_docs = enrich_chunks(tenant_id, document_version, chunks) - embeddings = embedding_function(tenant_id, document_version, enriched_chunk_docs) - except Exception as e: - current_app.logger.error(f'Unable to create Embeddings for tenant {tenant_id} ' - f'on document version {document_version.id} ' - f'with model {embedding_model} ' - f'error: {e}') - raise + else: + current_app.logger.error(f'The physical file for document version {document_version.id} ' + f'for tenant {tenant.id} ' + f'at {file_path} does not exist') + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() - # Save embeddings & processing information to the database - db.session.add_all(embeddings) + try: + chunks = partition_doc_unstructured(tenant, document_version, req) + except Exception as e: + current_app.logger.error(f'Unable to create Embeddings for tenant {tenant.id} ' + f'while processing PDF on document version {document_version.id} ' + f'error: {e}') + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() + + summary = summarize_chunk(tenant, document_version, chunks[0]) + doc_lang = document_version.document_language + doc_lang.system_context = f'Summary: {summary}\n' + enriched_chunks = enrich_chunks(tenant, document_version, chunks) + embeddings = embed_chunks(tenant, document_version, enriched_chunks) + + try: + db.session.add(doc_lang) db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False - - try: - db.session.commit() - except SQLAlchemyError as e: - current_app.logger.error(f'Error saving embedding information for tenant {tenant_id} ' - f'on document version {document_version.id}' - f'error: {e}') - db.session.rollback() - raise - - current_app.logger.info(f'Embeddings created successfully for tenant {tenant_id} ' - f'on document version {document_version.id} :-)') - else: # file exists - current_app.logger.error(f'The physical file for document version {document_version.id} ' - f'at {file_path} does not exist') + db.session.add_all(embeddings) + db.session.commit() + except SQLAlchemyError as e: + current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' + f'on PDF, document version {document_version.id}' + f'error: {e}') + db.session.rollback() raise + current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' + f'on document version {document_version.id} :-)') + + +def process_html(tenant, document_version): + # The tags to be considered can be dependent on the tenant + html_tags = tenant.html_tags + end_tags = tenant.html_end_tags + included_elements = tenant.html_included_elements + excluded_elements = tenant.html_excluded_elements -def process_html(tenant_id, document_version, embedding_function, default_embedding_model): file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, document_version.file_name) + if os.path.exists(file_path): with open(file_path, 'rb') as f: html_content = f.read() + else: + current_app.logger.error(f'The physical file for document version {document_version.id} ' + f'for tenant {tenant.id} ' + f'at {file_path} does not exist') + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() + + extracted_data, title = parse_html(html_content, html_tags, included_elements=included_elements, + excluded_elements=excluded_elements) + potential_chunks = create_potential_chunks(extracted_data, end_tags) + chunks = combine_chunks(potential_chunks, + current_app.config.get('MIN_CHUNK_SIZE'), + current_app.config.get('MAX_CHUNK_SIZE') + ) + summary = summarize_chunk(tenant, document_version, chunks[0]) + doc_lang = document_version.document_language + doc_lang.system_context = (f'Title: {title}\n' + f'Summary: {summary}\n') + + enriched_chunks = enrich_chunks(tenant, document_version, chunks) + embeddings = embed_chunks(tenant, document_version, enriched_chunks) + + try: + db.session.add(doc_lang) + db.session.add(document_version) + document_version.processing_finished_at = dt.now(tz.utc) + document_version.processing = False + db.session.add_all(embeddings) + db.session.commit() + except SQLAlchemyError as e: + current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' + f'on HTML, document version {document_version.id}' + f'error: {e}') + raise + + current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' + f'on document version {document_version.id} :-)') +def enrich_chunks(tenant, document_version, chunks): + doc_lang = document_version.document_language + chunk_total_context = (f'Filename: {document_version.file_name}\n' + f'{doc_lang.system_context}\n' + f'User Context:\n{doc_lang.user_context}') + enriched_chunks = [] + initial_chunk = f'Filename: {document_version.file_name}\n User Context:\n{doc_lang.user_context}\n{chunks[0]}' + enriched_chunks.append(initial_chunk) + for chunk in chunks[1:]: + enriched_chunk = f'{chunk_total_context}\n{chunk}' + enriched_chunks.append(enriched_chunk) -def enrich_chunks(tenant_id, document_version, chunks): - # We're adding filename and a summary of the first chunk to all the chunks to create global context - # using openAI to summarise - api_key = current_app.config.get('OPENAI_API_KEY') - # TODO: model selection to be adapted to model approach - llm = ChatOpenAI(api_key=api_key, temperature=0, model='gpt-4-turbo') + return enriched_chunks + + +def summarize_chunk(tenant, document_version, chunk): + llm_model = tenant.llm_model + llm_provider = llm_model.split('.', 1)[0] + llm_model = llm_model.split('.', 1)[1] + + summary_template = '' + llm = None + match llm_provider: + case 'openai': + api_key = current_app.config.get('OPENAI_API_KEY') + llm = ChatOpenAI(api_key=api_key, temperature=0, model=llm_model) + match llm_model: + case 'gpt-4-turbo': + summary_template = current_app.config.get('GPT4_SUMMARY_TEMPLATE') + case 'gpt-3.5-turbo': + summary_template = current_app.config.get('GPT3_5_SUMMARY_TEMPLATE') + case _: + current_app.logger.error(f'Error summarizing initial chunk for tenant {tenant.id} ' + f'on document version {document_version.id} ' + f'error: Invalid llm model') + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() + case _: + current_app.logger.error(f'Error summarizing initial chunk for tenant {tenant.id} ' + f'on document version {document_version.id} ' + f'error: Invalid llm provider') - summary_template = current_app.config.get('GPT4_SUMMARY_TEMPLATE') prompt = ChatPromptTemplate.from_template(summary_template) - chain = load_summarize_chain(llm, chain_type='stuff', prompt=prompt) - doc_creator = CharacterTextSplitter(chunk_size=9000, chunk_overlap=0) - text_to_summarize = doc_creator.create_documents(chunks[0]) + doc_creator = CharacterTextSplitter(chunk_size=current_app.config.get('MAX_CHUNK_SIZE') * 2, chunk_overlap=0) + text_to_summarize = doc_creator.create_documents(chunk) + try: summary = chain.run(text_to_summarize) - doc_lang = document_version.document_language - db.session.add(doc_lang) - doc_lang.system_context = f'Summary:\n {summary}' - try: - db.session.commit() - except SQLAlchemyError as e: - current_app.logger. error(f'Error saving summary to DocumentLanguage {doc_lang.id} ' - f'while enriching chunks for tenant {tenant_id} ' - f'on document version {document_version.id} ' - f'error: {e}') - db.session.rollback() - raise - - chunk_global_context = (f'Filename: {doc_lang.document.name}\n' - f'User Context:\n{doc_lang.user_context}' - f'System Context:\n{summary}') - enriched_chunks = [] - initial_chunk = f'Filename: {document_version.file_name}\n User Context:\n{doc_lang.user_context}\n{chunks[0]}' - enriched_chunks.append(initial_chunk) - for chunk in chunks[1:]: - enriched_chunk = f'{chunk_global_context}\n{chunk}' - enriched_chunks.append(enriched_chunk) - - return enriched_chunks - except LangChainException as e: - current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant_id} ' + current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: {e}') raise + return summary -def partition_doc_unstructured(tenant_id, document_version, unstructured_request): + +def partition_doc_unstructured(tenant, document_version, unstructured_request): # Initiate the connection to unstructured.io url = current_app.config.get('UNSTRUCTURED_FULL_URL') api_key = current_app.config.get('UNSTRUCTURED_API_KEY') @@ -211,20 +275,41 @@ def partition_doc_unstructured(tenant_id, document_version, unstructured_request chunks.append(el['metadata']['text_as_html']) return chunks except SDKError as e: - current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' + current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while chuncking' f'error: {e}') raise -def embed_chunks_for_text_embedding_3_small(tenant_id, document_version, chunks): +def embed_chunks(tenant, document_version, chunks): + embedding_provider = tenant.embedding_model.rsplit('.', 1)[0] + embedding_model = tenant.embedding_model.rsplit('.', 1)[1] + + match embedding_provider: + case 'openai': + match embedding_model: + case 'text-embedding-3-small': + return embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks) + case _: + current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' + f'on document version {document_version.id} ' + f'error: Invalid embedding model') + create_embeddings.update_state(state=states.FAILURE) + raise Ignore() + case _: + current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' + f'on document version {document_version.id} ' + f'error: Invalid embedding provider') + + +def embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks): # Create embedding vectors using OpenAI api_key = current_app.config.get('OPENAI_API_KEY') embeddings_model = OpenAIEmbeddings(api_key=api_key, model='text-embedding-3-small') try: embeddings = embeddings_model.embed_documents(chunks) except LangChainException as e: - current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' + current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while calling OpenAI API' f'error: {e}') raise @@ -246,7 +331,7 @@ def embed_chunks_for_mistral_embed(tenant_id, document_version, chunks): pass -def parse_html(html_content, included_elements=None, excluded_elements=None): +def parse_html(html_content, tags, included_elements=None, excluded_elements=None): soup = BeautifulSoup(html_content, 'html.parser') extracted_content = [] @@ -263,4 +348,57 @@ def parse_html(html_content, included_elements=None, excluded_elements=None): continue # Skip this sub_element if it's within any of the excluded_elements extracted_content.append((sub_element.name, sub_element.get_text(strip=True))) - return extracted_content + title = soup.find('title').get_text(strip=True) + + return extracted_content, title + + +def create_potential_chunks(extracted_data, end_tags): + potential_chunks = [] + current_chunk = [] + + for tag, text in extracted_data: + formatted_text = f"- {text}" if tag == 'li' else f"{text}\n" + if current_chunk and tag in end_tags and current_chunk[-1][0] in end_tags: + # Consecutive li and p elements stay together + current_chunk.append((tag, formatted_text)) + else: + # End the current chunk if the last element was an end tag + if current_chunk and current_chunk[-1][0] in end_tags: + potential_chunks.append(current_chunk) + current_chunk = [] + current_chunk.append((tag, formatted_text)) + + # Add the last chunk + if current_chunk: + potential_chunks.append(current_chunk) + return potential_chunks + + +def combine_chunks(potential_chunks, min_chars, max_chars): + actual_chunks = [] + current_chunk = "" + current_length = 0 + + for chunk in potential_chunks: + chunk_content = ''.join(text for _, text in chunk) + chunk_length = len(chunk_content) + + if current_length + chunk_length > max_chars: + if current_length >= min_chars: + actual_chunks.append(current_chunk) + current_chunk = chunk_content + current_length = chunk_length + else: + # If the combined chunk is still less than max_chars, keep adding + current_chunk += chunk_content + current_length += chunk_length + else: + current_chunk += chunk_content + current_length += chunk_length + + # Handle the last chunk + if current_chunk and current_length >= min_chars: + actual_chunks.append(current_chunk) + + return actual_chunks