from datetime import datetime as dt, timezone as tz from flask import current_app from sqlalchemy.exc import SQLAlchemyError from celery import states from celery.exceptions import Ignore import os # Unstructured commercial client imports from unstructured_client import UnstructuredClient from unstructured_client.models import shared from unstructured_client.models.errors import SDKError # OpenAI imports from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain.chains.summarize import load_summarize_chain from langchain.text_splitter import CharacterTextSplitter from langchain_core.exceptions import LangChainException from common.utils.database import Database from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI from common.models.user import Tenant from common.extensions import db from common.utils.celery_utils import current_celery from bs4 import BeautifulSoup @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): # Setup Remote Debugging only if PYCHARM_DEBUG=True if current_app.config['PYCHARM_DEBUG']: import pydevd_pycharm pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True) current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.') # Retrieve Tenant for which we are processing tenant = Tenant.query.get(tenant_id) if tenant is None: current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. ' f'Tenant not found') create_embeddings.update_state(state=states.FAILURE) raise Ignore() # Ensure we are working in the correct database schema Database(tenant_id).switch_schema() # Retrieve document version to process document_version = DocumentVersion.query.get(document_version_id) if document_version is None: current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. ' f'Document version {document_version_id} not found') create_embeddings.update_state(state=states.FAILURE) raise Ignore() db.session.add(document_version) # start processing document_version.processing = True document_version.processing_started_at = dt.now(tz.utc) try: db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving document version {document_version_id} to database ' f'for tenant {tenant_id} when starting creating of embeddings. ' f'error: {e}') create_embeddings.update_state(state=states.FAILURE) raise Ignore() match document_version.file_type: case 'pdf': process_pdf(tenant, document_version) case 'html': process_html(tenant, document_version) case _: current_app.logger.info(f'No functionality defined for file type {document_version.file_type} ' f'for tenant {tenant_id} ' f'while creating embeddings for document version {document_version_id}') create_embeddings.update_state(state=states.FAILURE) raise Ignore() @current_celery.task(name='ask_eve_ai', queue='llm_interactions') def ask_eve_ai(query): # Interaction logic with LLMs like GPT (Langchain API calls, etc.) pass def process_pdf(tenant, document_version): file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, document_version.file_name) if os.path.exists(file_path): with open(file_path, 'rb') as f: files = shared.Files(content=f.read(), file_name=document_version.file_name) req = shared.PartitionParameters( files=files, strategy='hi_res', hi_res_model_name='yolox', coordinates=True, extract_image_block_types=['Image', 'Table'], chunking_strategy='by_title', combine_under_n_chars=current_app.config.get('MIN_CHUNK_SIZE'), max_characters=current_app.config.get('MAX_CHUNK_SIZE'), ) else: current_app.logger.error(f'The physical file for document version {document_version.id} ' f'for tenant {tenant.id} ' f'at {file_path} does not exist') create_embeddings.update_state(state=states.FAILURE) raise Ignore() try: chunks = partition_doc_unstructured(tenant, document_version, req) except Exception as e: current_app.logger.error(f'Unable to create Embeddings for tenant {tenant.id} ' f'while processing PDF on document version {document_version.id} ' f'error: {e}') create_embeddings.update_state(state=states.FAILURE) raise Ignore() summary = summarize_chunk(tenant, document_version, chunks[0]) doc_lang = document_version.document_language doc_lang.system_context = f'Summary: {summary}\n' enriched_chunks = enrich_chunks(tenant, document_version, chunks) embeddings = embed_chunks(tenant, document_version, enriched_chunks) try: db.session.add(doc_lang) db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on PDF, document version {document_version.id}' f'error: {e}') db.session.rollback() raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') def process_html(tenant, document_version): # The tags to be considered can be dependent on the tenant html_tags = tenant.html_tags end_tags = tenant.html_end_tags included_elements = tenant.html_included_elements excluded_elements = tenant.html_excluded_elements file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, document_version.file_name) if os.path.exists(file_path): with open(file_path, 'rb') as f: html_content = f.read() else: current_app.logger.error(f'The physical file for document version {document_version.id} ' f'for tenant {tenant.id} ' f'at {file_path} does not exist') create_embeddings.update_state(state=states.FAILURE) raise Ignore() extracted_data, title = parse_html(html_content, html_tags, included_elements=included_elements, excluded_elements=excluded_elements) potential_chunks = create_potential_chunks(extracted_data, end_tags) chunks = combine_chunks(potential_chunks, current_app.config.get('MIN_CHUNK_SIZE'), current_app.config.get('MAX_CHUNK_SIZE') ) summary = summarize_chunk(tenant, document_version, chunks[0]) doc_lang = document_version.document_language doc_lang.system_context = (f'Title: {title}\n' f'Summary: {summary}\n') enriched_chunks = enrich_chunks(tenant, document_version, chunks) embeddings = embed_chunks(tenant, document_version, enriched_chunks) try: db.session.add(doc_lang) db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) document_version.processing = False db.session.add_all(embeddings) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' f'on HTML, document version {document_version.id}' f'error: {e}') raise current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') def enrich_chunks(tenant, document_version, chunks): doc_lang = document_version.document_language chunk_total_context = (f'Filename: {document_version.file_name}\n' f'{doc_lang.system_context}\n' f'User Context:\n{doc_lang.user_context}') enriched_chunks = [] initial_chunk = f'Filename: {document_version.file_name}\n User Context:\n{doc_lang.user_context}\n{chunks[0]}' enriched_chunks.append(initial_chunk) for chunk in chunks[1:]: enriched_chunk = f'{chunk_total_context}\n{chunk}' enriched_chunks.append(enriched_chunk) return enriched_chunks def summarize_chunk(tenant, document_version, chunk): llm_model = tenant.llm_model llm_provider = llm_model.split('.', 1)[0] llm_model = llm_model.split('.', 1)[1] summary_template = '' llm = None match llm_provider: case 'openai': api_key = current_app.config.get('OPENAI_API_KEY') llm = ChatOpenAI(api_key=api_key, temperature=0, model=llm_model) match llm_model: case 'gpt-4-turbo': summary_template = current_app.config.get('GPT4_SUMMARY_TEMPLATE') case 'gpt-3.5-turbo': summary_template = current_app.config.get('GPT3_5_SUMMARY_TEMPLATE') case _: current_app.logger.error(f'Error summarizing initial chunk for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: Invalid llm model') create_embeddings.update_state(state=states.FAILURE) raise Ignore() case _: current_app.logger.error(f'Error summarizing initial chunk for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: Invalid llm provider') prompt = ChatPromptTemplate.from_template(summary_template) chain = load_summarize_chain(llm, chain_type='stuff', prompt=prompt) doc_creator = CharacterTextSplitter(chunk_size=current_app.config.get('MAX_CHUNK_SIZE') * 2, chunk_overlap=0) text_to_summarize = doc_creator.create_documents(chunk) try: summary = chain.run(text_to_summarize) except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: {e}') raise return summary def partition_doc_unstructured(tenant, document_version, unstructured_request): # Initiate the connection to unstructured.io url = current_app.config.get('UNSTRUCTURED_FULL_URL') api_key = current_app.config.get('UNSTRUCTURED_API_KEY') unstructured_client = UnstructuredClient(server_url=url, api_key_auth=api_key) try: res = unstructured_client.general.partition(unstructured_request) chunks = [] for el in res.elements: match el['type']: case 'CompositeElement': chunks.append(el['text']) case 'Image': pass case 'Table': chunks.append(el['metadata']['text_as_html']) return chunks except SDKError as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while chuncking' f'error: {e}') raise def embed_chunks(tenant, document_version, chunks): embedding_provider = tenant.embedding_model.rsplit('.', 1)[0] embedding_model = tenant.embedding_model.rsplit('.', 1)[1] match embedding_provider: case 'openai': match embedding_model: case 'text-embedding-3-small': return embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks) case _: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: Invalid embedding model') create_embeddings.update_state(state=states.FAILURE) raise Ignore() case _: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} ' f'error: Invalid embedding provider') def embed_chunks_for_text_embedding_3_small(tenant, document_version, chunks): # Create embedding vectors using OpenAI api_key = current_app.config.get('OPENAI_API_KEY') embeddings_model = OpenAIEmbeddings(api_key=api_key, model='text-embedding-3-small') try: embeddings = embeddings_model.embed_documents(chunks) except LangChainException as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} ' f'on document version {document_version.id} while calling OpenAI API' f'error: {e}') raise # Add embeddings to the database new_embeddings = [] for chunk, embedding in zip(chunks, embeddings): new_embedding = EmbeddingSmallOpenAI() new_embedding.document_version = document_version new_embedding.active = True new_embedding.chunk = chunk new_embedding.embedding = embedding new_embeddings.append(new_embedding) return new_embeddings def embed_chunks_for_mistral_embed(tenant_id, document_version, chunks): pass def parse_html(html_content, tags, included_elements=None, excluded_elements=None): soup = BeautifulSoup(html_content, 'html.parser') extracted_content = [] if included_elements: elements_to_parse = soup.find_all(included_elements) else: elements_to_parse = [soup] # parse the entire document if no included_elements specified # Iterate through the found included elements for element in elements_to_parse: # Find all specified tags within each included element for sub_element in element.find_all(tags): if excluded_elements and sub_element.find_parent(excluded_elements): continue # Skip this sub_element if it's within any of the excluded_elements extracted_content.append((sub_element.name, sub_element.get_text(strip=True))) title = soup.find('title').get_text(strip=True) return extracted_content, title def create_potential_chunks(extracted_data, end_tags): potential_chunks = [] current_chunk = [] for tag, text in extracted_data: formatted_text = f"- {text}" if tag == 'li' else f"{text}\n" if current_chunk and tag in end_tags and current_chunk[-1][0] in end_tags: # Consecutive li and p elements stay together current_chunk.append((tag, formatted_text)) else: # End the current chunk if the last element was an end tag if current_chunk and current_chunk[-1][0] in end_tags: potential_chunks.append(current_chunk) current_chunk = [] current_chunk.append((tag, formatted_text)) # Add the last chunk if current_chunk: potential_chunks.append(current_chunk) return potential_chunks def combine_chunks(potential_chunks, min_chars, max_chars): actual_chunks = [] current_chunk = "" current_length = 0 for chunk in potential_chunks: chunk_content = ''.join(text for _, text in chunk) chunk_length = len(chunk_content) if current_length + chunk_length > max_chars: if current_length >= min_chars: actual_chunks.append(current_chunk) current_chunk = chunk_content current_length = chunk_length else: # If the combined chunk is still less than max_chars, keep adding current_chunk += chunk_content current_length += chunk_length else: current_chunk += chunk_content current_length += chunk_length # Handle the last chunk if current_chunk and current_length >= min_chars: actual_chunks.append(current_chunk) return actual_chunks