Business event tracing completed for both eveai_workers tasks and eveai_chat_workers tasks

This commit is contained in:
Josako
2024-09-27 10:53:42 +02:00
parent ee1b0f1cfa
commit d9cb00fcdc
9 changed files with 306 additions and 253 deletions

View File

@@ -46,7 +46,7 @@ class BusinessEvent:
parent_span_id = self.span_id parent_span_id = self.span_id
self.span_counter += 1 self.span_counter += 1
new_span_id = f"{self.trace_id}-{self.span_counter}" new_span_id = str(uuid.uuid4())
# Save the current span info # Save the current span info
self.spans.append((self.span_id, self.span_name, self.parent_span_id)) self.spans.append((self.span_id, self.span_name, self.parent_span_id))
@@ -56,9 +56,12 @@ class BusinessEvent:
self.span_name = span_name self.span_name = span_name
self.parent_span_id = parent_span_id self.parent_span_id = parent_span_id
self.log(f"Starting span {span_name}")
try: try:
yield yield
finally: finally:
self.log(f"Ending span {span_name}")
# Restore the previous span info # Restore the previous span info
if self.spans: if self.spans:
self.span_id, self.span_name, self.parent_span_id = self.spans.pop() self.span_id, self.span_name, self.parent_span_id = self.spans.pop()
@@ -103,7 +106,9 @@ class BusinessEvent:
db.session.commit() db.session.commit()
def __enter__(self): def __enter__(self):
self.log(f'Starting Trace for {self.event_type}')
return BusinessEventContext(self).__enter__() return BusinessEventContext(self).__enter__()
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
self.log(f'Ending Trace for {self.event_type}')
return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb) return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb)

View File

@@ -9,10 +9,12 @@ from typing import List, Any, Iterator
from collections.abc import MutableMapping from collections.abc import MutableMapping
from openai import OpenAI from openai import OpenAI
from portkey_ai import createHeaders, PORTKEY_GATEWAY_URL from portkey_ai import createHeaders, PORTKEY_GATEWAY_URL
from portkey_ai.langchain.portkey_langchain_callback_handler import LangchainCallbackHandler
from common.models.document import EmbeddingSmallOpenAI, EmbeddingLargeOpenAI from common.models.document import EmbeddingSmallOpenAI, EmbeddingLargeOpenAI
from common.models.user import Tenant from common.models.user import Tenant
from config.model_config import MODEL_CONFIG from config.model_config import MODEL_CONFIG
from common.utils.business_event_context import current_event
class CitedAnswer(BaseModel): class CitedAnswer(BaseModel):
@@ -91,87 +93,115 @@ class ModelVariables(MutableMapping):
@property @property
def embedding_model(self): def embedding_model(self):
if self._embedding_model is None: portkey_metadata = self.get_portkey_metadata()
environment = os.getenv('FLASK_ENV', 'development')
portkey_metadata = {'tenant_id': str(self.tenant.id), 'environment': environment}
if self._variables['embedding_provider'] == 'openai': portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'),
portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), provider=self._variables['embedding_provider'],
provider='openai', metadata=portkey_metadata)
metadata=portkey_metadata) api_key = os.getenv('OPENAI_API_KEY')
api_key = os.getenv('OPENAI_API_KEY') model = self._variables['embedding_model']
model = self._variables['embedding_model'] self._embedding_model = OpenAIEmbeddings(api_key=api_key,
self._embedding_model = OpenAIEmbeddings(api_key=api_key, model=model,
model=model, base_url=PORTKEY_GATEWAY_URL,
base_url=PORTKEY_GATEWAY_URL, default_headers=portkey_headers)
default_headers=portkey_headers) self._embedding_db_model = EmbeddingSmallOpenAI \
self._embedding_db_model = EmbeddingSmallOpenAI \ if model == 'text-embedding-3-small' \
if model == 'text-embedding-3-small' \ else EmbeddingLargeOpenAI
else EmbeddingLargeOpenAI
else:
raise ValueError(f"Invalid embedding provider: {self._variables['embedding_provider']}")
return self._embedding_model return self._embedding_model
@property @property
def llm(self): def llm(self):
if self._llm is None: portkey_headers = self.get_portkey_headers_for_llm()
self._initialize_llm() api_key = self.get_api_key_for_llm()
self._llm = ChatOpenAI(api_key=api_key,
model=self._variables['llm_model'],
temperature=self._variables['RAG_temperature'],
base_url=PORTKEY_GATEWAY_URL,
default_headers=portkey_headers)
return self._llm return self._llm
@property @property
def llm_no_rag(self): def llm_no_rag(self):
if self._llm_no_rag is None: portkey_headers = self.get_portkey_headers_for_llm()
self._initialize_llm() api_key = self.get_api_key_for_llm()
self._llm_no_rag = ChatOpenAI(api_key=api_key,
model=self._variables['llm_model'],
temperature=self._variables['RAG_temperature'],
base_url=PORTKEY_GATEWAY_URL,
default_headers=portkey_headers)
return self._llm_no_rag return self._llm_no_rag
def _initialize_llm(self): def get_portkey_headers_for_llm(self):
environment = os.getenv('FLASK_ENV', 'development') portkey_metadata = self.get_portkey_metadata()
portkey_metadata = {'tenant_id': str(self.tenant.id), 'environment': environment} portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'),
metadata=portkey_metadata,
provider=self._variables['llm_provider'])
return portkey_headers
def get_portkey_metadata(self):
environment = os.getenv('FLASK_ENV', 'development')
portkey_metadata = {'tenant_id': str(self.tenant.id),
'environment': environment,
'trace_id': current_event.trace_id,
'span_id': current_event.span_id,
'span_name': current_event.span_name,
'parent_span_id': current_event.parent_span_id,
}
return portkey_metadata
def get_api_key_for_llm(self):
if self._variables['llm_provider'] == 'openai': if self._variables['llm_provider'] == 'openai':
portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'),
metadata=portkey_metadata,
provider='openai')
api_key = os.getenv('OPENAI_API_KEY') api_key = os.getenv('OPENAI_API_KEY')
self._llm = ChatOpenAI(api_key=api_key, else: # self._variables['llm_provider'] == 'anthropic'
model=self._variables['llm_model'],
temperature=self._variables['RAG_temperature'],
base_url=PORTKEY_GATEWAY_URL,
default_headers=portkey_headers)
self._llm_no_rag = ChatOpenAI(api_key=api_key,
model=self._variables['llm_model'],
temperature=self._variables['no_RAG_temperature'],
base_url=PORTKEY_GATEWAY_URL,
default_headers=portkey_headers)
self._variables['tool_calling_supported'] = self._variables['llm_model'] in ['gpt-4o', 'gpt-4o-mini']
elif self._variables['llm_provider'] == 'anthropic':
api_key = os.getenv('ANTHROPIC_API_KEY') api_key = os.getenv('ANTHROPIC_API_KEY')
llm_model_ext = os.getenv('ANTHROPIC_LLM_VERSIONS', {}).get(self._variables['llm_model'])
self._llm = ChatAnthropic(api_key=api_key, return api_key
model=llm_model_ext,
temperature=self._variables['RAG_temperature']) # def _initialize_llm(self):
self._llm_no_rag = ChatAnthropic(api_key=api_key, #
model=llm_model_ext, #
temperature=self._variables['RAG_temperature']) # if self._variables['llm_provider'] == 'openai':
self._variables['tool_calling_supported'] = True # portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'),
else: # metadata=portkey_metadata,
raise ValueError(f"Invalid chat provider: {self._variables['llm_provider']}") # provider='openai')
#
# self._llm = ChatOpenAI(api_key=api_key,
# model=self._variables['llm_model'],
# temperature=self._variables['RAG_temperature'],
# base_url=PORTKEY_GATEWAY_URL,
# default_headers=portkey_headers)
# self._llm_no_rag = ChatOpenAI(api_key=api_key,
# model=self._variables['llm_model'],
# temperature=self._variables['no_RAG_temperature'],
# base_url=PORTKEY_GATEWAY_URL,
# default_headers=portkey_headers)
# self._variables['tool_calling_supported'] = self._variables['llm_model'] in ['gpt-4o', 'gpt-4o-mini']
# elif self._variables['llm_provider'] == 'anthropic':
# api_key = os.getenv('ANTHROPIC_API_KEY')
# llm_model_ext = os.getenv('ANTHROPIC_LLM_VERSIONS', {}).get(self._variables['llm_model'])
# self._llm = ChatAnthropic(api_key=api_key,
# model=llm_model_ext,
# temperature=self._variables['RAG_temperature'])
# self._llm_no_rag = ChatAnthropic(api_key=api_key,
# model=llm_model_ext,
# temperature=self._variables['RAG_temperature'])
# self._variables['tool_calling_supported'] = True
# else:
# raise ValueError(f"Invalid chat provider: {self._variables['llm_provider']}")
@property @property
def transcription_client(self): def transcription_client(self):
if self._transcription_client is None: environment = os.getenv('FLASK_ENV', 'development')
environment = os.getenv('FLASK_ENV', 'development') portkey_metadata = self.get_portkey_metadata()
portkey_metadata = {'tenant_id': str(self.tenant.id), 'environment': environment} portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'),
portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), metadata=portkey_metadata,
metadata=portkey_metadata, provider='openai')
provider='openai') api_key = os.getenv('OPENAI_API_KEY')
api_key = os.getenv('OPENAI_API_KEY') self._transcription_client = OpenAI(api_key=api_key,
self._transcription_client = OpenAI(api_key=api_key, base_url=PORTKEY_GATEWAY_URL,
base_url=PORTKEY_GATEWAY_URL, default_headers=portkey_headers)
default_headers=portkey_headers) self._variables['transcription_model'] = 'whisper-1'
self._variables['transcription_model'] = 'whisper-1'
return self._transcription_client return self._transcription_client
@property @property

View File

@@ -24,6 +24,8 @@ from common.utils.celery_utils import current_celery
from common.utils.model_utils import select_model_variables, create_language_template, replace_variable_in_template from common.utils.model_utils import select_model_variables, create_language_template, replace_variable_in_template
from common.langchain.eveai_retriever import EveAIRetriever from common.langchain.eveai_retriever import EveAIRetriever
from common.langchain.eveai_history_retriever import EveAIHistoryRetriever from common.langchain.eveai_history_retriever import EveAIHistoryRetriever
from common.utils.business_event import BusinessEvent
from common.utils.business_event_context import current_event
# Healthcheck task # Healthcheck task
@@ -65,53 +67,56 @@ def ask_question(tenant_id, question, language, session_id, user_timezone, room)
'interaction_id': 'interaction_id_value' 'interaction_id': 'interaction_id_value'
} }
""" """
current_app.logger.info(f'ask_question: Received question for tenant {tenant_id}: {question}. Processing...') with BusinessEvent("Ask Question", tenant_id=tenant_id, session_id=session_id):
current_app.logger.info(f'ask_question: Received question for tenant {tenant_id}: {question}. Processing...')
try: try:
# Retrieve the tenant # Retrieve the tenant
tenant = Tenant.query.get(tenant_id) tenant = Tenant.query.get(tenant_id)
if not tenant: if not tenant:
raise Exception(f'Tenant {tenant_id} not found.') raise Exception(f'Tenant {tenant_id} not found.')
# Ensure we are working in the correct database schema # Ensure we are working in the correct database schema
Database(tenant_id).switch_schema() Database(tenant_id).switch_schema()
# Ensure we have a session to story history # Ensure we have a session to story history
chat_session = ChatSession.query.filter_by(session_id=session_id).first() chat_session = ChatSession.query.filter_by(session_id=session_id).first()
if not chat_session: if not chat_session:
try: try:
chat_session = ChatSession() chat_session = ChatSession()
chat_session.session_id = session_id chat_session.session_id = session_id
chat_session.session_start = dt.now(tz.utc) chat_session.session_start = dt.now(tz.utc)
chat_session.timezone = user_timezone chat_session.timezone = user_timezone
db.session.add(chat_session) db.session.add(chat_session)
db.session.commit() db.session.commit()
except SQLAlchemyError as e: except SQLAlchemyError as e:
current_app.logger.error(f'ask_question: Error initializing chat session in database: {e}') current_app.logger.error(f'ask_question: Error initializing chat session in database: {e}')
raise raise
if tenant.rag_tuning: if tenant.rag_tuning:
current_app.rag_tuning_logger.debug(f'Received question for tenant {tenant_id}:\n{question}. Processing...') current_app.rag_tuning_logger.debug(f'Received question for tenant {tenant_id}:\n{question}. Processing...')
current_app.rag_tuning_logger.debug(f'Tenant Information: \n{tenant.to_dict()}') current_app.rag_tuning_logger.debug(f'Tenant Information: \n{tenant.to_dict()}')
current_app.rag_tuning_logger.debug(f'===================================================================') current_app.rag_tuning_logger.debug(f'===================================================================')
current_app.rag_tuning_logger.debug(f'===================================================================') current_app.rag_tuning_logger.debug(f'===================================================================')
result, interaction = answer_using_tenant_rag(question, language, tenant, chat_session) with current_event.create_span("RAG Answer"):
result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['RAG_TENANT']['name'] result, interaction = answer_using_tenant_rag(question, language, tenant, chat_session)
result['interaction_id'] = interaction.id result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['RAG_TENANT']['name']
result['room'] = room # Include the room in the result
if result['insufficient_info']:
if 'LLM' in tenant.fallback_algorithms:
result, interaction = answer_using_llm(question, language, tenant, chat_session)
result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['LLM']['name']
result['interaction_id'] = interaction.id result['interaction_id'] = interaction.id
result['room'] = room # Include the room in the result result['room'] = room # Include the room in the result
return result if result['insufficient_info']:
except Exception as e: if 'LLM' in tenant.fallback_algorithms:
current_app.logger.error(f'ask_question: Error processing question: {e}') with current_event.create_span("Fallback Algorithm LLM"):
raise result, interaction = answer_using_llm(question, language, tenant, chat_session)
result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['LLM']['name']
result['interaction_id'] = interaction.id
result['room'] = room # Include the room in the result
return result
except Exception as e:
current_app.logger.error(f'ask_question: Error processing question: {e}')
raise
def answer_using_tenant_rag(question, language, tenant, chat_session): def answer_using_tenant_rag(question, language, tenant, chat_session):
@@ -131,92 +136,94 @@ def answer_using_tenant_rag(question, language, tenant, chat_session):
# Langchain debugging if required # Langchain debugging if required
# set_debug(True) # set_debug(True)
detailed_question = detail_question(question, language, model_variables, chat_session.session_id) with current_event.create_span("Detail Question"):
current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}') detailed_question = detail_question(question, language, model_variables, chat_session.session_id)
if tenant.rag_tuning: current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}')
current_app.rag_tuning_logger.debug(f'Detailed Question for tenant {tenant.id}:\n{question}.')
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
new_interaction.detailed_question = detailed_question
new_interaction.detailed_question_at = dt.now(tz.utc)
retriever = EveAIRetriever(model_variables, tenant_info)
llm = model_variables['llm']
template = model_variables['rag_template']
language_template = create_language_template(template, language)
full_template = replace_variable_in_template(language_template, "{tenant_context}", model_variables['rag_context'])
rag_prompt = ChatPromptTemplate.from_template(full_template)
setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
if tenant.rag_tuning:
current_app.rag_tuning_logger.debug(f'Full prompt for tenant {tenant.id}:\n{full_template}.')
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
new_interaction_embeddings = []
if not model_variables['cited_answer_cls']: # The model doesn't support structured feedback
output_parser = StrOutputParser()
chain = setup_and_retrieval | rag_prompt | llm | output_parser
# Invoke the chain with the actual question
answer = chain.invoke(detailed_question)
new_interaction.answer = answer
result = {
'answer': answer,
'citations': [],
'insufficient_info': False
}
else: # The model supports structured feedback
structured_llm = llm.with_structured_output(model_variables['cited_answer_cls'])
chain = setup_and_retrieval | rag_prompt | structured_llm
result = chain.invoke(detailed_question).dict()
current_app.logger.debug(f'ask_question: result answer: {result['answer']}')
current_app.logger.debug(f'ask_question: result citations: {result["citations"]}')
current_app.logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}')
if tenant.rag_tuning: if tenant.rag_tuning:
current_app.rag_tuning_logger.debug(f'ask_question: result answer: {result['answer']}') current_app.rag_tuning_logger.debug(f'Detailed Question for tenant {tenant.id}:\n{question}.')
current_app.rag_tuning_logger.debug(f'ask_question: result citations: {result["citations"]}')
current_app.rag_tuning_logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}')
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
new_interaction.answer = result['answer'] new_interaction.detailed_question = detailed_question
new_interaction.detailed_question_at = dt.now(tz.utc)
# Filter out the existing Embedding IDs with current_event.create_span("Generate Answer using RAG"):
given_embedding_ids = [int(emb_id) for emb_id in result['citations']] retriever = EveAIRetriever(model_variables, tenant_info)
embeddings = ( llm = model_variables['llm']
db.session.query(Embedding) template = model_variables['rag_template']
.filter(Embedding.id.in_(given_embedding_ids)) language_template = create_language_template(template, language)
.all() full_template = replace_variable_in_template(language_template, "{tenant_context}", model_variables['rag_context'])
) rag_prompt = ChatPromptTemplate.from_template(full_template)
existing_embedding_ids = [emb.id for emb in embeddings] setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
urls = list(set(emb.document_version.url for emb in embeddings))
if tenant.rag_tuning: if tenant.rag_tuning:
current_app.rag_tuning_logger.debug(f'Referenced documents for answer for tenant {tenant.id}:\n') current_app.rag_tuning_logger.debug(f'Full prompt for tenant {tenant.id}:\n{full_template}.')
current_app.rag_tuning_logger.debug(f'{urls}')
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
for emb_id in existing_embedding_ids: new_interaction_embeddings = []
new_interaction_embedding = InteractionEmbedding(embedding_id=emb_id) if not model_variables['cited_answer_cls']: # The model doesn't support structured feedback
new_interaction_embedding.interaction = new_interaction output_parser = StrOutputParser()
new_interaction_embeddings.append(new_interaction_embedding)
result['citations'] = urls chain = setup_and_retrieval | rag_prompt | llm | output_parser
# Disable langchain debugging if set above. # Invoke the chain with the actual question
# set_debug(False) answer = chain.invoke(detailed_question)
new_interaction.answer = answer
result = {
'answer': answer,
'citations': [],
'insufficient_info': False
}
new_interaction.answer_at = dt.now(tz.utc) else: # The model supports structured feedback
chat_session.session_end = dt.now(tz.utc) structured_llm = llm.with_structured_output(model_variables['cited_answer_cls'])
try: chain = setup_and_retrieval | rag_prompt | structured_llm
db.session.add(chat_session)
db.session.add(new_interaction) result = chain.invoke(detailed_question).dict()
db.session.add_all(new_interaction_embeddings) current_app.logger.debug(f'ask_question: result answer: {result['answer']}')
db.session.commit() current_app.logger.debug(f'ask_question: result citations: {result["citations"]}')
return result, new_interaction current_app.logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}')
except SQLAlchemyError as e: if tenant.rag_tuning:
current_app.logger.error(f'ask_question: Error saving interaction to database: {e}') current_app.rag_tuning_logger.debug(f'ask_question: result answer: {result['answer']}')
raise current_app.rag_tuning_logger.debug(f'ask_question: result citations: {result["citations"]}')
current_app.rag_tuning_logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}')
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
new_interaction.answer = result['answer']
# Filter out the existing Embedding IDs
given_embedding_ids = [int(emb_id) for emb_id in result['citations']]
embeddings = (
db.session.query(Embedding)
.filter(Embedding.id.in_(given_embedding_ids))
.all()
)
existing_embedding_ids = [emb.id for emb in embeddings]
urls = list(set(emb.document_version.url for emb in embeddings))
if tenant.rag_tuning:
current_app.rag_tuning_logger.debug(f'Referenced documents for answer for tenant {tenant.id}:\n')
current_app.rag_tuning_logger.debug(f'{urls}')
current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------')
for emb_id in existing_embedding_ids:
new_interaction_embedding = InteractionEmbedding(embedding_id=emb_id)
new_interaction_embedding.interaction = new_interaction
new_interaction_embeddings.append(new_interaction_embedding)
result['citations'] = urls
# Disable langchain debugging if set above.
# set_debug(False)
new_interaction.answer_at = dt.now(tz.utc)
chat_session.session_end = dt.now(tz.utc)
try:
db.session.add(chat_session)
db.session.add(new_interaction)
db.session.add_all(new_interaction_embeddings)
db.session.commit()
return result, new_interaction
except SQLAlchemyError as e:
current_app.logger.error(f'ask_question: Error saving interaction to database: {e}')
raise
def answer_using_llm(question, language, tenant, chat_session): def answer_using_llm(question, language, tenant, chat_session):
@@ -236,47 +243,49 @@ def answer_using_llm(question, language, tenant, chat_session):
# Langchain debugging if required # Langchain debugging if required
# set_debug(True) # set_debug(True)
detailed_question = detail_question(question, language, model_variables, chat_session.session_id) with current_event.create_span("Detail Question"):
current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}') detailed_question = detail_question(question, language, model_variables, chat_session.session_id)
new_interaction.detailed_question = detailed_question current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}')
new_interaction.detailed_question_at = dt.now(tz.utc) new_interaction.detailed_question = detailed_question
new_interaction.detailed_question_at = dt.now(tz.utc)
retriever = EveAIRetriever(model_variables, tenant_info) with current_event.create_span("Detail Answer using LLM"):
llm = model_variables['llm_no_rag'] retriever = EveAIRetriever(model_variables, tenant_info)
template = model_variables['encyclopedia_template'] llm = model_variables['llm_no_rag']
language_template = create_language_template(template, language) template = model_variables['encyclopedia_template']
rag_prompt = ChatPromptTemplate.from_template(language_template) language_template = create_language_template(template, language)
setup = RunnablePassthrough() rag_prompt = ChatPromptTemplate.from_template(language_template)
output_parser = StrOutputParser() setup = RunnablePassthrough()
output_parser = StrOutputParser()
new_interaction_embeddings = [] new_interaction_embeddings = []
chain = setup | rag_prompt | llm | output_parser chain = setup | rag_prompt | llm | output_parser
input_question = {"question": detailed_question} input_question = {"question": detailed_question}
# Invoke the chain with the actual question # Invoke the chain with the actual question
answer = chain.invoke(input_question) answer = chain.invoke(input_question)
new_interaction.answer = answer new_interaction.answer = answer
result = { result = {
'answer': answer, 'answer': answer,
'citations': [], 'citations': [],
'insufficient_info': False 'insufficient_info': False
} }
# Disable langchain debugging if set above. # Disable langchain debugging if set above.
# set_debug(False) # set_debug(False)
new_interaction.answer_at = dt.now(tz.utc) new_interaction.answer_at = dt.now(tz.utc)
chat_session.session_end = dt.now(tz.utc) chat_session.session_end = dt.now(tz.utc)
try: try:
db.session.add(chat_session) db.session.add(chat_session)
db.session.add(new_interaction) db.session.add(new_interaction)
db.session.commit() db.session.commit()
return result, new_interaction return result, new_interaction
except SQLAlchemyError as e: except SQLAlchemyError as e:
current_app.logger.error(f'ask_question: Error saving interaction to database: {e}') current_app.logger.error(f'ask_question: Error saving interaction to database: {e}')
raise raise
def tasks_ping(): def tasks_ping():

View File

@@ -7,6 +7,7 @@ from common.extensions import minio_client
import subprocess import subprocess
from .transcription_processor import TranscriptionProcessor from .transcription_processor import TranscriptionProcessor
from common.utils.business_event_context import current_event
class AudioProcessor(TranscriptionProcessor): class AudioProcessor(TranscriptionProcessor):
@@ -24,8 +25,13 @@ class AudioProcessor(TranscriptionProcessor):
self.document_version.id, self.document_version.id,
self.document_version.file_name self.document_version.file_name
) )
compressed_audio = self._compress_audio(file_data)
return self._transcribe_audio(compressed_audio) with current_event.create_span("Audio Processing"):
compressed_audio = self._compress_audio(file_data)
with current_event.create_span("Transcription Generation"):
transcription = self._transcribe_audio(compressed_audio)
return transcription
def _compress_audio(self, audio_data): def _compress_audio(self, audio_data):
self._log("Compressing audio") self._log("Compressing audio")

View File

@@ -31,8 +31,10 @@ class HTMLProcessor(Processor):
) )
html_content = file_data.decode('utf-8') html_content = file_data.decode('utf-8')
extracted_html, title = self._parse_html(html_content) with current_event.create_span("HTML Content Extraction"):
markdown = self._generate_markdown_from_html(extracted_html) extracted_html, title = self._parse_html(html_content)
with current_event.create_span("Markdown Generation"):
markdown = self._generate_markdown_from_html(extracted_html)
self._save_markdown(markdown) self._save_markdown(markdown)
self._log("Finished processing HTML") self._log("Finished processing HTML")

View File

@@ -10,6 +10,7 @@ from langchain_core.runnables import RunnablePassthrough
from common.extensions import minio_client from common.extensions import minio_client
from common.utils.model_utils import create_language_template from common.utils.model_utils import create_language_template
from .processor import Processor from .processor import Processor
from common.utils.business_event_context import current_event
class PDFProcessor(Processor): class PDFProcessor(Processor):
@@ -32,13 +33,14 @@ class PDFProcessor(Processor):
self.document_version.file_name self.document_version.file_name
) )
extracted_content = self._extract_content(file_data) with current_event.create_span("PDF Extraction"):
structured_content, title = self._structure_content(extracted_content) extracted_content = self._extract_content(file_data)
structured_content, title = self._structure_content(extracted_content)
llm_chunks = self._split_content_for_llm(structured_content) with current_event.create_span("Markdown Generation"):
markdown = self._process_chunks_with_llm(llm_chunks) llm_chunks = self._split_content_for_llm(structured_content)
markdown = self._process_chunks_with_llm(llm_chunks)
self._save_markdown(markdown) self._save_markdown(markdown)
self._log("Finished processing PDF") self._log("Finished processing PDF")
return markdown, title return markdown, title
except Exception as e: except Exception as e:

View File

@@ -1,11 +1,13 @@
# transcription_processor.py # transcription_processor.py
from common.utils.model_utils import create_language_template
from .processor import Processor
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import StrOutputParser from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough from langchain_core.runnables import RunnablePassthrough
from common.utils.model_utils import create_language_template
from .processor import Processor
from common.utils.business_event_context import current_event
class TranscriptionProcessor(Processor): class TranscriptionProcessor(Processor):
def __init__(self, tenant, model_variables, document_version): def __init__(self, tenant, model_variables, document_version):
@@ -16,12 +18,14 @@ class TranscriptionProcessor(Processor):
def process(self): def process(self):
self._log("Starting Transcription processing") self._log("Starting Transcription processing")
try: try:
transcription = self._get_transcription() with current_event.create_span("Transcription Generation"):
chunks = self._chunk_transcription(transcription) transcription = self._get_transcription()
markdown_chunks = self._process_chunks(chunks) with current_event.create_span("Markdown Generation"):
full_markdown = self._combine_markdown_chunks(markdown_chunks) chunks = self._chunk_transcription(transcription)
self._save_markdown(full_markdown) markdown_chunks = self._process_chunks(chunks)
self._log("Finished processing Transcription") full_markdown = self._combine_markdown_chunks(markdown_chunks)
self._save_markdown(full_markdown)
self._log("Finished processing Transcription")
return full_markdown, self._extract_title_from_markdown(full_markdown) return full_markdown, self._extract_title_from_markdown(full_markdown)
except Exception as e: except Exception as e:
self._log(f"Error processing Transcription: {str(e)}", level='error') self._log(f"Error processing Transcription: {str(e)}", level='error')

View File

@@ -39,8 +39,6 @@ def create_embeddings(tenant_id, document_version_id):
# BusinessEvent creates a context, which is why we need to use it with a with block # BusinessEvent creates a context, which is why we need to use it with a with block
with BusinessEvent('Create Embeddings', tenant_id, document_version_id=document_version_id): with BusinessEvent('Create Embeddings', tenant_id, document_version_id=document_version_id):
current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}') current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}')
current_event.log("Starting Embedding Creation Task")
try: try:
# Retrieve Tenant for which we are processing # Retrieve Tenant for which we are processing
tenant = Tenant.query.get(tenant_id) tenant = Tenant.query.get(tenant_id)
@@ -125,13 +123,13 @@ def delete_embeddings_for_document_version(document_version):
def process_pdf(tenant, model_variables, document_version): def process_pdf(tenant, model_variables, document_version):
current_event.log("Starting PDF Processing") with current_event.create_span("PDF Processing"):
processor = PDFProcessor(tenant, model_variables, document_version) processor = PDFProcessor(tenant, model_variables, document_version)
markdown, title = processor.process() markdown, title = processor.process()
# Process markdown and embed # Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title) with current_event.create_span("Embedding"):
current_event.log("Finished PDF Processing") embed_markdown(tenant, model_variables, document_version, markdown, title)
def process_html(tenant, model_variables, document_version): def process_html(tenant, model_variables, document_version):
@@ -144,29 +142,27 @@ def process_html(tenant, model_variables, document_version):
embed_markdown(tenant, model_variables, document_version, markdown, title) embed_markdown(tenant, model_variables, document_version, markdown, title)
def process_audio(tenant, model_variables, document_version): def process_audio(tenant, model_variables, document_version):
current_event.log("Starting Audio Processing") with current_event.create_span("Audio Processing"):
processor = AudioProcessor(tenant, model_variables, document_version) processor = AudioProcessor(tenant, model_variables, document_version)
markdown, title = processor.process() markdown, title = processor.process()
# Process markdown and embed # Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title) with current_event.create_span("Embedding"):
current_event.log("Finished Audio Processing") embed_markdown(tenant, model_variables, document_version, markdown, title)
def process_srt(tenant, model_variables, document_version): def process_srt(tenant, model_variables, document_version):
current_event.log("Starting SRT Processing") with current_event.create_span("SRT Processing"):
processor = SRTProcessor(tenant, model_variables, document_version) processor = SRTProcessor(tenant, model_variables, document_version)
markdown, title = processor.process() markdown, title = processor.process()
# Process markdown and embed # Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title) with current_event.create_span("Embedding"):
current_event.log("Finished SRT Processing") embed_markdown(tenant, model_variables, document_version, markdown, title)
def embed_markdown(tenant, model_variables, document_version, markdown, title): def embed_markdown(tenant, model_variables, document_version, markdown, title):
current_event.log("Starting Embedding Markdown Processing")
# Create potential chunks # Create potential chunks
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md")
@@ -195,7 +191,6 @@ def embed_markdown(tenant, model_variables, document_version, markdown, title):
current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
f'on document version {document_version.id} :-)') f'on document version {document_version.id} :-)')
current_event.log("Finished Embedding Markdown Processing")
def enrich_chunks(tenant, model_variables, document_version, title, chunks): def enrich_chunks(tenant, model_variables, document_version, title, chunks):
@@ -238,7 +233,7 @@ def enrich_chunks(tenant, model_variables, document_version, title, chunks):
def summarize_chunk(tenant, model_variables, document_version, chunk): def summarize_chunk(tenant, model_variables, document_version, chunk):
current_event.log("Starting Summarizing Chunk Processing") current_event.log("Starting Summarizing Chunk")
current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}') f'on document version {document_version.id}')
llm = model_variables['llm'] llm = model_variables['llm']
@@ -256,7 +251,7 @@ def summarize_chunk(tenant, model_variables, document_version, chunk):
summary = chain.invoke({"text": chunk}) summary = chain.invoke({"text": chunk})
current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}.') f'on document version {document_version.id}.')
current_event.log("Finished summarizing chunk for tenant ") current_event.log("Finished Summarizing Chunk")
return summary return summary
except LangChainException as e: except LangChainException as e:
current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} '

View File

@@ -63,7 +63,7 @@ zxcvbn~=4.4.28
groq~=0.9.0 groq~=0.9.0
pydub~=0.25.1 pydub~=0.25.1
argparse~=1.4.0 argparse~=1.4.0
portkey_ai~=1.8.2 portkey_ai~=1.8.7
minio~=7.2.7 minio~=7.2.7
Werkzeug~=3.0.3 Werkzeug~=3.0.3
itsdangerous~=2.2.0 itsdangerous~=2.2.0