from datetime import datetime as dt, timezone as tz from flask import current_app import os # Unstructured commercial client imports from unstructured_client import UnstructuredClient from unstructured_client.models import shared from unstructured_client.models.errors import SDKError # OpenAI imports from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain.chains.summarize import load_summarize_chain from langchain.text_splitter import CharacterTextSplitter from langchain_core.exceptions import LangChainException from common.utils.database import Database from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI from common.extensions import db from common.utils.celery_utils import current_celery @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id, default_embedding_model): # Setup Remote Debugging only if PYCHARM_DEBUG=True if current_app.config['PYCHARM_DEBUG']: import pydevd_pycharm pydevd_pycharm.settrace('localhost', port=50170, stdoutToServer=True, stderrToServer=True) current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id} ' f'with model {default_embedding_model}') # Ensure we are working in the correct database schema Database(tenant_id).switch_schema() # Retrieve document version to process document_version = DocumentVersion.query.get(document_version_id) if document_version is None: current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. ' f'Document version {document_version_id} not found') return db.session.add(document_version) # start processing document_version.processing = True document_version.processing_started_at = dt.now(tz.utc) db.session.commit() embed_provider = default_embedding_model.rsplit('.', 1)[0] embed_model = default_embedding_model.rsplit('.', 1)[1] # define embedding variables match (embed_provider, embed_model): case ('openai', 'text-embedding-3-small'): embedding_function = embed_chunks_for_text_embedding_3_small case ('mistral', 'mistral.mistral-embed'): embedding_function = embed_chunks_for_mistral_embed match document_version.file_type: case 'pdf': file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], document_version.file_location, document_version.file_name) if os.path.exists(file_path): with open(file_path, 'rb') as f: files = shared.Files(content=f.read(), file_name=document_version.file_name) req = shared.PartitionParameters( files=files, strategy='hi_res', hi_res_model_name='yolox', coordinates=True, extract_image_block_types=['Image', 'Table'], chunking_strategy='by_title', combine_under_n_chars=2000, max_characters=3000, ) try: chunks = partition_doc_unstructured(tenant_id, document_version, req) enriched_chunk_docs = enrich_chunks(tenant_id, document_version, chunks) embedding_function(tenant_id, document_version, enriched_chunk_docs) except Exception as e: current_app.logger.error(f'Unable to create Embeddings for tenant {tenant_id} ' f'on document version {document_version.id} ' f'with model {default_embedding_model} ' f'error: {e}') return else: # file exists current_app.logger.error(f'The physical file for document version {document_version_id} ' f'at {file_path} does not exist') return @current_celery.task(name='ask_eve_ai', queue='llm_interactions') def ask_eve_ai(query): # Interaction logic with LLMs like GPT (Langchain API calls, etc.) pass def enrich_chunks(tenant_id, document_version, chunks): # We're adding filename and a summary of the first chunk to all the chunks to create global context # using openAI to summarise api_key = current_app.config.get('OPENAI_API_KEY') # TODO: model selection to be adapted to model approach llm = ChatOpenAI(api_key=api_key, temperature=0, model='gpt-4-turbo') summary_template = current_app.config.get('GPT4_SUMMARY_TEMPLATE') prompt = ChatPromptTemplate.from_template(summary_template) chain = load_summarize_chain(llm, chain_type='stuff', prompt=prompt) doc_creator = CharacterTextSplitter(chunk_size=9000, chunk_overlap=0) text_to_summarize = doc_creator.create_documents(chunks[0]['text']) try: summary = chain.run(text_to_summarize) chunk_global_context = f'Filename: {document_version.file_name}\nSummary:\n {summary}' enriched_chunks = [] for chunk in chunks[1:]: enriched_chunk_raw = f'{chunk_global_context}\n{chunk}' enriched_chunk_doc = doc_creator.create_documents([enriched_chunk_raw]) enriched_chunks.append(enriched_chunk_doc) return enriched_chunks except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant_id} ' f'on document version {document_version.id} ' f'error: {e}') raise def partition_doc_unstructured(tenant_id, document_version, unstructured_request): # Initiate the connection to unstructured.io url = current_app.config.get('UNSTRUCTURED_FULL_URL') api_key = current_app.config.get('UNSTRUCTURED_API_KEY') unstructured_client = UnstructuredClient(server_url=url, api_key_auth=api_key) try: res = unstructured_client.general.partition(unstructured_request) chunks = [] for el in res.elements: match el['type']: case 'Composite_element': chunks.append(el['text']) case 'Image': pass case 'Table': chunks.append(el['metadata']['text_as_html']) return chunks except SDKError as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' f'on document version {document_version.id} while chuncking' f'error: {e}') raise def embed_chunks_for_text_embedding_3_small(tenant_id, document_version, chunks): # Create embedding vectors using OpenAI api_key = current_app.config.get('OPENAI_API_KEY') embeddings_model = OpenAIEmbeddings(api_key=api_key, model='text-embedding-3-small') try: embeddings = embeddings_model.embed_documents(chunks) except LangChainException as e: current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' f'on document version {document_version.id} while calling OpenAI API' f'error: {e}') raise for chunk, embedding in zip(chunks, embeddings): new_embedding = EmbeddingSmallOpenAI() # TODO: continue here return embeddings def embed_chunks_for_mistral_embed(tenant_id, document_version, chunks): pass