293 lines
14 KiB
Python
293 lines
14 KiB
Python
from datetime import datetime as dt, timezone as tz
|
|
from flask import current_app, session
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
|
|
from langchain.globals import set_debug
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from celery import states
|
|
from celery.exceptions import Ignore
|
|
import os
|
|
|
|
# OpenAI imports
|
|
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain.chains.summarize import load_summarize_chain
|
|
from langchain.text_splitter import CharacterTextSplitter
|
|
from langchain_core.exceptions import LangChainException
|
|
|
|
from common.utils.database import Database
|
|
from common.models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI, Embedding
|
|
from common.models.user import Tenant
|
|
from common.models.interaction import ChatSession, Interaction, InteractionEmbedding
|
|
from common.extensions import db
|
|
from common.utils.celery_utils import current_celery
|
|
from common.utils.model_utils import select_model_variables, create_language_template, replace_variable_in_template
|
|
from common.langchain.eveai_retriever import EveAIRetriever
|
|
from common.langchain.eveai_history_retriever import EveAIHistoryRetriever
|
|
from common.utils.business_event import BusinessEvent
|
|
from common.utils.business_event_context import current_event
|
|
|
|
|
|
# Healthcheck task
|
|
@current_celery.task(name='ping', queue='llm_interactions')
|
|
def ping():
|
|
return 'pong'
|
|
|
|
|
|
def detail_question(question, language, model_variables, session_id):
|
|
current_app.logger.debug(f'Detail question: {question}')
|
|
current_app.logger.debug(f'model_variables: {model_variables}')
|
|
current_app.logger.debug(f'session_id: {session_id}')
|
|
retriever = EveAIHistoryRetriever(model_variables=model_variables, session_id=session_id)
|
|
llm = model_variables['llm']
|
|
template = model_variables['history_template']
|
|
language_template = create_language_template(template, language)
|
|
full_template = replace_variable_in_template(language_template, "{tenant_context}", model_variables['rag_context'])
|
|
history_prompt = ChatPromptTemplate.from_template(full_template)
|
|
setup_and_retrieval = RunnableParallel({"history": retriever, "question": RunnablePassthrough()})
|
|
output_parser = StrOutputParser()
|
|
|
|
chain = setup_and_retrieval | history_prompt | llm | output_parser
|
|
|
|
try:
|
|
answer = chain.invoke(question)
|
|
return answer
|
|
except LangChainException as e:
|
|
current_app.logger.error(f'Error detailing question: {e}')
|
|
raise
|
|
|
|
|
|
@current_celery.task(name='ask_question', queue='llm_interactions')
|
|
def ask_question(tenant_id, question, language, session_id, user_timezone, room):
|
|
"""returns result structured as follows:
|
|
result = {
|
|
'answer': 'Your answer here',
|
|
'citations': ['http://example.com/citation1', 'http://example.com/citation2'],
|
|
'algorithm': 'algorithm_name',
|
|
'interaction_id': 'interaction_id_value'
|
|
}
|
|
"""
|
|
with BusinessEvent("Ask Question", tenant_id=tenant_id, chat_session_id=session_id):
|
|
current_app.logger.info(f'ask_question: Received question for tenant {tenant_id}: {question}. Processing...')
|
|
|
|
try:
|
|
# Retrieve the tenant
|
|
tenant = Tenant.query.get(tenant_id)
|
|
if not tenant:
|
|
raise Exception(f'Tenant {tenant_id} not found.')
|
|
|
|
# Ensure we are working in the correct database schema
|
|
Database(tenant_id).switch_schema()
|
|
|
|
# Ensure we have a session to story history
|
|
chat_session = ChatSession.query.filter_by(session_id=session_id).first()
|
|
if not chat_session:
|
|
try:
|
|
chat_session = ChatSession()
|
|
chat_session.session_id = session_id
|
|
chat_session.session_start = dt.now(tz.utc)
|
|
chat_session.timezone = user_timezone
|
|
db.session.add(chat_session)
|
|
db.session.commit()
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.error(f'ask_question: Error initializing chat session in database: {e}')
|
|
raise
|
|
|
|
if tenant.rag_tuning:
|
|
current_app.rag_tuning_logger.debug(f'Received question for tenant {tenant_id}:\n{question}. Processing...')
|
|
current_app.rag_tuning_logger.debug(f'Tenant Information: \n{tenant.to_dict()}')
|
|
current_app.rag_tuning_logger.debug(f'===================================================================')
|
|
current_app.rag_tuning_logger.debug(f'===================================================================')
|
|
|
|
with current_event.create_span("RAG Answer"):
|
|
result, interaction = answer_using_tenant_rag(question, language, tenant, chat_session)
|
|
result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['RAG_TENANT']['name']
|
|
result['interaction_id'] = interaction.id
|
|
result['room'] = room # Include the room in the result
|
|
|
|
if result['insufficient_info']:
|
|
if 'LLM' in tenant.fallback_algorithms:
|
|
with current_event.create_span("Fallback Algorithm LLM"):
|
|
result, interaction = answer_using_llm(question, language, tenant, chat_session)
|
|
result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['LLM']['name']
|
|
result['interaction_id'] = interaction.id
|
|
result['room'] = room # Include the room in the result
|
|
|
|
return result
|
|
except Exception as e:
|
|
current_app.logger.error(f'ask_question: Error processing question: {e}')
|
|
raise
|
|
|
|
|
|
def answer_using_tenant_rag(question, language, tenant, chat_session):
|
|
new_interaction = Interaction()
|
|
new_interaction.question = question
|
|
new_interaction.language = language
|
|
new_interaction.timezone = chat_session.timezone
|
|
new_interaction.appreciation = None
|
|
new_interaction.chat_session_id = chat_session.id
|
|
new_interaction.question_at = dt.now(tz.utc)
|
|
new_interaction.algorithm_used = current_app.config['INTERACTION_ALGORITHMS']['RAG_TENANT']['name']
|
|
|
|
# Select variables to work with depending on tenant model
|
|
model_variables = select_model_variables(tenant)
|
|
tenant_info = tenant.to_dict()
|
|
|
|
# Langchain debugging if required
|
|
# set_debug(True)
|
|
|
|
with current_event.create_span("Detail Question"):
|
|
detailed_question = detail_question(question, language, model_variables, chat_session.session_id)
|
|
current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}')
|
|
if tenant.rag_tuning:
|
|
current_app.rag_tuning_logger.debug(f'Detailed Question for tenant {tenant.id}:\n{question}.')
|
|
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
|
|
new_interaction.detailed_question = detailed_question
|
|
new_interaction.detailed_question_at = dt.now(tz.utc)
|
|
|
|
with current_event.create_span("Generate Answer using RAG"):
|
|
retriever = EveAIRetriever(model_variables, tenant_info)
|
|
llm = model_variables['llm']
|
|
template = model_variables['rag_template']
|
|
language_template = create_language_template(template, language)
|
|
full_template = replace_variable_in_template(language_template, "{tenant_context}", model_variables['rag_context'])
|
|
rag_prompt = ChatPromptTemplate.from_template(full_template)
|
|
setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
|
|
if tenant.rag_tuning:
|
|
current_app.rag_tuning_logger.debug(f'Full prompt for tenant {tenant.id}:\n{full_template}.')
|
|
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
|
|
|
|
new_interaction_embeddings = []
|
|
if not model_variables['cited_answer_cls']: # The model doesn't support structured feedback
|
|
output_parser = StrOutputParser()
|
|
|
|
chain = setup_and_retrieval | rag_prompt | llm | output_parser
|
|
|
|
# Invoke the chain with the actual question
|
|
answer = chain.invoke(detailed_question)
|
|
new_interaction.answer = answer
|
|
result = {
|
|
'answer': answer,
|
|
'citations': [],
|
|
'insufficient_info': False
|
|
}
|
|
|
|
else: # The model supports structured feedback
|
|
structured_llm = llm.with_structured_output(model_variables['cited_answer_cls'])
|
|
|
|
chain = setup_and_retrieval | rag_prompt | structured_llm
|
|
|
|
result = chain.invoke(detailed_question).dict()
|
|
current_app.logger.debug(f'ask_question: result answer: {result['answer']}')
|
|
current_app.logger.debug(f'ask_question: result citations: {result["citations"]}')
|
|
current_app.logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}')
|
|
if tenant.rag_tuning:
|
|
current_app.rag_tuning_logger.debug(f'ask_question: result answer: {result['answer']}')
|
|
current_app.rag_tuning_logger.debug(f'ask_question: result citations: {result["citations"]}')
|
|
current_app.rag_tuning_logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}')
|
|
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
|
|
new_interaction.answer = result['answer']
|
|
|
|
# Filter out the existing Embedding IDs
|
|
given_embedding_ids = [int(emb_id) for emb_id in result['citations']]
|
|
embeddings = (
|
|
db.session.query(Embedding)
|
|
.filter(Embedding.id.in_(given_embedding_ids))
|
|
.all()
|
|
)
|
|
existing_embedding_ids = [emb.id for emb in embeddings]
|
|
urls = list(set(emb.document_version.url for emb in embeddings))
|
|
if tenant.rag_tuning:
|
|
current_app.rag_tuning_logger.debug(f'Referenced documents for answer for tenant {tenant.id}:\n')
|
|
current_app.rag_tuning_logger.debug(f'{urls}')
|
|
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
|
|
|
|
for emb_id in existing_embedding_ids:
|
|
new_interaction_embedding = InteractionEmbedding(embedding_id=emb_id)
|
|
new_interaction_embedding.interaction = new_interaction
|
|
new_interaction_embeddings.append(new_interaction_embedding)
|
|
|
|
result['citations'] = urls
|
|
|
|
# Disable langchain debugging if set above.
|
|
# set_debug(False)
|
|
|
|
new_interaction.answer_at = dt.now(tz.utc)
|
|
chat_session.session_end = dt.now(tz.utc)
|
|
|
|
try:
|
|
db.session.add(chat_session)
|
|
db.session.add(new_interaction)
|
|
db.session.add_all(new_interaction_embeddings)
|
|
db.session.commit()
|
|
return result, new_interaction
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.error(f'ask_question: Error saving interaction to database: {e}')
|
|
raise
|
|
|
|
|
|
def answer_using_llm(question, language, tenant, chat_session):
|
|
new_interaction = Interaction()
|
|
new_interaction.question = question
|
|
new_interaction.language = language
|
|
new_interaction.timezone = chat_session.timezone
|
|
new_interaction.appreciation = None
|
|
new_interaction.chat_session_id = chat_session.id
|
|
new_interaction.question_at = dt.now(tz.utc)
|
|
new_interaction.algorithm_used = current_app.config['INTERACTION_ALGORITHMS']['LLM']['name']
|
|
|
|
# Select variables to work with depending on tenant model
|
|
model_variables = select_model_variables(tenant)
|
|
tenant_info = tenant.to_dict()
|
|
|
|
# Langchain debugging if required
|
|
# set_debug(True)
|
|
|
|
with current_event.create_span("Detail Question"):
|
|
detailed_question = detail_question(question, language, model_variables, chat_session.session_id)
|
|
current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}')
|
|
new_interaction.detailed_question = detailed_question
|
|
new_interaction.detailed_question_at = dt.now(tz.utc)
|
|
|
|
with current_event.create_span("Detail Answer using LLM"):
|
|
retriever = EveAIRetriever(model_variables, tenant_info)
|
|
llm = model_variables['llm_no_rag']
|
|
template = model_variables['encyclopedia_template']
|
|
language_template = create_language_template(template, language)
|
|
rag_prompt = ChatPromptTemplate.from_template(language_template)
|
|
setup = RunnablePassthrough()
|
|
output_parser = StrOutputParser()
|
|
|
|
new_interaction_embeddings = []
|
|
|
|
chain = setup | rag_prompt | llm | output_parser
|
|
input_question = {"question": detailed_question}
|
|
|
|
# Invoke the chain with the actual question
|
|
answer = chain.invoke(input_question)
|
|
new_interaction.answer = answer
|
|
result = {
|
|
'answer': answer,
|
|
'citations': [],
|
|
'insufficient_info': False
|
|
}
|
|
|
|
# Disable langchain debugging if set above.
|
|
# set_debug(False)
|
|
|
|
new_interaction.answer_at = dt.now(tz.utc)
|
|
chat_session.session_end = dt.now(tz.utc)
|
|
|
|
try:
|
|
db.session.add(chat_session)
|
|
db.session.add(new_interaction)
|
|
db.session.commit()
|
|
return result, new_interaction
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.error(f'ask_question: Error saving interaction to database: {e}')
|
|
raise
|
|
|
|
|
|
def tasks_ping():
|
|
return 'pong'
|