diff --git a/common/utils/business_event.py b/common/utils/business_event.py index 0fbef46..e2bc9c2 100644 --- a/common/utils/business_event.py +++ b/common/utils/business_event.py @@ -46,7 +46,7 @@ class BusinessEvent: parent_span_id = self.span_id self.span_counter += 1 - new_span_id = f"{self.trace_id}-{self.span_counter}" + new_span_id = str(uuid.uuid4()) # Save the current span info self.spans.append((self.span_id, self.span_name, self.parent_span_id)) @@ -56,9 +56,12 @@ class BusinessEvent: self.span_name = span_name self.parent_span_id = parent_span_id + self.log(f"Starting span {span_name}") + try: yield finally: + self.log(f"Ending span {span_name}") # Restore the previous span info if self.spans: self.span_id, self.span_name, self.parent_span_id = self.spans.pop() @@ -103,7 +106,9 @@ class BusinessEvent: db.session.commit() def __enter__(self): + self.log(f'Starting Trace for {self.event_type}') return BusinessEventContext(self).__enter__() def __exit__(self, exc_type, exc_val, exc_tb): + self.log(f'Ending Trace for {self.event_type}') return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb) diff --git a/common/utils/model_utils.py b/common/utils/model_utils.py index b964482..2d27f4f 100644 --- a/common/utils/model_utils.py +++ b/common/utils/model_utils.py @@ -9,10 +9,12 @@ from typing import List, Any, Iterator from collections.abc import MutableMapping from openai import OpenAI from portkey_ai import createHeaders, PORTKEY_GATEWAY_URL +from portkey_ai.langchain.portkey_langchain_callback_handler import LangchainCallbackHandler from common.models.document import EmbeddingSmallOpenAI, EmbeddingLargeOpenAI from common.models.user import Tenant from config.model_config import MODEL_CONFIG +from common.utils.business_event_context import current_event class CitedAnswer(BaseModel): @@ -91,87 +93,115 @@ class ModelVariables(MutableMapping): @property def embedding_model(self): - if self._embedding_model is None: - environment = os.getenv('FLASK_ENV', 'development') - portkey_metadata = {'tenant_id': str(self.tenant.id), 'environment': environment} + portkey_metadata = self.get_portkey_metadata() - if self._variables['embedding_provider'] == 'openai': - portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), - provider='openai', - metadata=portkey_metadata) - api_key = os.getenv('OPENAI_API_KEY') - model = self._variables['embedding_model'] - self._embedding_model = OpenAIEmbeddings(api_key=api_key, - model=model, - base_url=PORTKEY_GATEWAY_URL, - default_headers=portkey_headers) - self._embedding_db_model = EmbeddingSmallOpenAI \ - if model == 'text-embedding-3-small' \ - else EmbeddingLargeOpenAI - else: - raise ValueError(f"Invalid embedding provider: {self._variables['embedding_provider']}") + portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), + provider=self._variables['embedding_provider'], + metadata=portkey_metadata) + api_key = os.getenv('OPENAI_API_KEY') + model = self._variables['embedding_model'] + self._embedding_model = OpenAIEmbeddings(api_key=api_key, + model=model, + base_url=PORTKEY_GATEWAY_URL, + default_headers=portkey_headers) + self._embedding_db_model = EmbeddingSmallOpenAI \ + if model == 'text-embedding-3-small' \ + else EmbeddingLargeOpenAI return self._embedding_model @property def llm(self): - if self._llm is None: - self._initialize_llm() + portkey_headers = self.get_portkey_headers_for_llm() + api_key = self.get_api_key_for_llm() + self._llm = ChatOpenAI(api_key=api_key, + model=self._variables['llm_model'], + temperature=self._variables['RAG_temperature'], + base_url=PORTKEY_GATEWAY_URL, + default_headers=portkey_headers) return self._llm @property def llm_no_rag(self): - if self._llm_no_rag is None: - self._initialize_llm() + portkey_headers = self.get_portkey_headers_for_llm() + api_key = self.get_api_key_for_llm() + self._llm_no_rag = ChatOpenAI(api_key=api_key, + model=self._variables['llm_model'], + temperature=self._variables['RAG_temperature'], + base_url=PORTKEY_GATEWAY_URL, + default_headers=portkey_headers) return self._llm_no_rag - def _initialize_llm(self): - environment = os.getenv('FLASK_ENV', 'development') - portkey_metadata = {'tenant_id': str(self.tenant.id), 'environment': environment} + def get_portkey_headers_for_llm(self): + portkey_metadata = self.get_portkey_metadata() + portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), + metadata=portkey_metadata, + provider=self._variables['llm_provider']) + return portkey_headers + def get_portkey_metadata(self): + environment = os.getenv('FLASK_ENV', 'development') + portkey_metadata = {'tenant_id': str(self.tenant.id), + 'environment': environment, + 'trace_id': current_event.trace_id, + 'span_id': current_event.span_id, + 'span_name': current_event.span_name, + 'parent_span_id': current_event.parent_span_id, + } + return portkey_metadata + + def get_api_key_for_llm(self): if self._variables['llm_provider'] == 'openai': - portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), - metadata=portkey_metadata, - provider='openai') api_key = os.getenv('OPENAI_API_KEY') - self._llm = ChatOpenAI(api_key=api_key, - model=self._variables['llm_model'], - temperature=self._variables['RAG_temperature'], - base_url=PORTKEY_GATEWAY_URL, - default_headers=portkey_headers) - self._llm_no_rag = ChatOpenAI(api_key=api_key, - model=self._variables['llm_model'], - temperature=self._variables['no_RAG_temperature'], - base_url=PORTKEY_GATEWAY_URL, - default_headers=portkey_headers) - self._variables['tool_calling_supported'] = self._variables['llm_model'] in ['gpt-4o', 'gpt-4o-mini'] - elif self._variables['llm_provider'] == 'anthropic': + else: # self._variables['llm_provider'] == 'anthropic' api_key = os.getenv('ANTHROPIC_API_KEY') - llm_model_ext = os.getenv('ANTHROPIC_LLM_VERSIONS', {}).get(self._variables['llm_model']) - self._llm = ChatAnthropic(api_key=api_key, - model=llm_model_ext, - temperature=self._variables['RAG_temperature']) - self._llm_no_rag = ChatAnthropic(api_key=api_key, - model=llm_model_ext, - temperature=self._variables['RAG_temperature']) - self._variables['tool_calling_supported'] = True - else: - raise ValueError(f"Invalid chat provider: {self._variables['llm_provider']}") + + return api_key + + # def _initialize_llm(self): + # + # + # if self._variables['llm_provider'] == 'openai': + # portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), + # metadata=portkey_metadata, + # provider='openai') + # + # self._llm = ChatOpenAI(api_key=api_key, + # model=self._variables['llm_model'], + # temperature=self._variables['RAG_temperature'], + # base_url=PORTKEY_GATEWAY_URL, + # default_headers=portkey_headers) + # self._llm_no_rag = ChatOpenAI(api_key=api_key, + # model=self._variables['llm_model'], + # temperature=self._variables['no_RAG_temperature'], + # base_url=PORTKEY_GATEWAY_URL, + # default_headers=portkey_headers) + # self._variables['tool_calling_supported'] = self._variables['llm_model'] in ['gpt-4o', 'gpt-4o-mini'] + # elif self._variables['llm_provider'] == 'anthropic': + # api_key = os.getenv('ANTHROPIC_API_KEY') + # llm_model_ext = os.getenv('ANTHROPIC_LLM_VERSIONS', {}).get(self._variables['llm_model']) + # self._llm = ChatAnthropic(api_key=api_key, + # model=llm_model_ext, + # temperature=self._variables['RAG_temperature']) + # self._llm_no_rag = ChatAnthropic(api_key=api_key, + # model=llm_model_ext, + # temperature=self._variables['RAG_temperature']) + # self._variables['tool_calling_supported'] = True + # else: + # raise ValueError(f"Invalid chat provider: {self._variables['llm_provider']}") @property def transcription_client(self): - if self._transcription_client is None: - environment = os.getenv('FLASK_ENV', 'development') - portkey_metadata = {'tenant_id': str(self.tenant.id), 'environment': environment} - portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), - metadata=portkey_metadata, - provider='openai') - api_key = os.getenv('OPENAI_API_KEY') - self._transcription_client = OpenAI(api_key=api_key, - base_url=PORTKEY_GATEWAY_URL, - default_headers=portkey_headers) - self._variables['transcription_model'] = 'whisper-1' - + environment = os.getenv('FLASK_ENV', 'development') + portkey_metadata = self.get_portkey_metadata() + portkey_headers = createHeaders(api_key=os.getenv('PORTKEY_API_KEY'), + metadata=portkey_metadata, + provider='openai') + api_key = os.getenv('OPENAI_API_KEY') + self._transcription_client = OpenAI(api_key=api_key, + base_url=PORTKEY_GATEWAY_URL, + default_headers=portkey_headers) + self._variables['transcription_model'] = 'whisper-1' return self._transcription_client @property diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index da9fd2f..ae8eb64 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -24,6 +24,8 @@ from common.utils.celery_utils import current_celery from common.utils.model_utils import select_model_variables, create_language_template, replace_variable_in_template from common.langchain.eveai_retriever import EveAIRetriever from common.langchain.eveai_history_retriever import EveAIHistoryRetriever +from common.utils.business_event import BusinessEvent +from common.utils.business_event_context import current_event # Healthcheck task @@ -65,53 +67,56 @@ def ask_question(tenant_id, question, language, session_id, user_timezone, room) 'interaction_id': 'interaction_id_value' } """ - current_app.logger.info(f'ask_question: Received question for tenant {tenant_id}: {question}. Processing...') + with BusinessEvent("Ask Question", tenant_id=tenant_id, session_id=session_id): + current_app.logger.info(f'ask_question: Received question for tenant {tenant_id}: {question}. Processing...') - try: - # Retrieve the tenant - tenant = Tenant.query.get(tenant_id) - if not tenant: - raise Exception(f'Tenant {tenant_id} not found.') + try: + # Retrieve the tenant + tenant = Tenant.query.get(tenant_id) + if not tenant: + raise Exception(f'Tenant {tenant_id} not found.') - # Ensure we are working in the correct database schema - Database(tenant_id).switch_schema() + # Ensure we are working in the correct database schema + Database(tenant_id).switch_schema() - # Ensure we have a session to story history - chat_session = ChatSession.query.filter_by(session_id=session_id).first() - if not chat_session: - try: - chat_session = ChatSession() - chat_session.session_id = session_id - chat_session.session_start = dt.now(tz.utc) - chat_session.timezone = user_timezone - db.session.add(chat_session) - db.session.commit() - except SQLAlchemyError as e: - current_app.logger.error(f'ask_question: Error initializing chat session in database: {e}') - raise + # Ensure we have a session to story history + chat_session = ChatSession.query.filter_by(session_id=session_id).first() + if not chat_session: + try: + chat_session = ChatSession() + chat_session.session_id = session_id + chat_session.session_start = dt.now(tz.utc) + chat_session.timezone = user_timezone + db.session.add(chat_session) + db.session.commit() + except SQLAlchemyError as e: + current_app.logger.error(f'ask_question: Error initializing chat session in database: {e}') + raise - if tenant.rag_tuning: - current_app.rag_tuning_logger.debug(f'Received question for tenant {tenant_id}:\n{question}. Processing...') - current_app.rag_tuning_logger.debug(f'Tenant Information: \n{tenant.to_dict()}') - current_app.rag_tuning_logger.debug(f'===================================================================') - current_app.rag_tuning_logger.debug(f'===================================================================') + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'Received question for tenant {tenant_id}:\n{question}. Processing...') + current_app.rag_tuning_logger.debug(f'Tenant Information: \n{tenant.to_dict()}') + current_app.rag_tuning_logger.debug(f'===================================================================') + current_app.rag_tuning_logger.debug(f'===================================================================') - result, interaction = answer_using_tenant_rag(question, language, tenant, chat_session) - result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['RAG_TENANT']['name'] - result['interaction_id'] = interaction.id - result['room'] = room # Include the room in the result - - if result['insufficient_info']: - if 'LLM' in tenant.fallback_algorithms: - result, interaction = answer_using_llm(question, language, tenant, chat_session) - result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['LLM']['name'] + with current_event.create_span("RAG Answer"): + result, interaction = answer_using_tenant_rag(question, language, tenant, chat_session) + result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['RAG_TENANT']['name'] result['interaction_id'] = interaction.id result['room'] = room # Include the room in the result - return result - except Exception as e: - current_app.logger.error(f'ask_question: Error processing question: {e}') - raise + if result['insufficient_info']: + if 'LLM' in tenant.fallback_algorithms: + with current_event.create_span("Fallback Algorithm LLM"): + result, interaction = answer_using_llm(question, language, tenant, chat_session) + result['algorithm'] = current_app.config['INTERACTION_ALGORITHMS']['LLM']['name'] + result['interaction_id'] = interaction.id + result['room'] = room # Include the room in the result + + return result + except Exception as e: + current_app.logger.error(f'ask_question: Error processing question: {e}') + raise def answer_using_tenant_rag(question, language, tenant, chat_session): @@ -131,92 +136,94 @@ def answer_using_tenant_rag(question, language, tenant, chat_session): # Langchain debugging if required # set_debug(True) - detailed_question = detail_question(question, language, model_variables, chat_session.session_id) - current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}') - if tenant.rag_tuning: - current_app.rag_tuning_logger.debug(f'Detailed Question for tenant {tenant.id}:\n{question}.') - current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') - new_interaction.detailed_question = detailed_question - new_interaction.detailed_question_at = dt.now(tz.utc) - - retriever = EveAIRetriever(model_variables, tenant_info) - llm = model_variables['llm'] - template = model_variables['rag_template'] - language_template = create_language_template(template, language) - full_template = replace_variable_in_template(language_template, "{tenant_context}", model_variables['rag_context']) - rag_prompt = ChatPromptTemplate.from_template(full_template) - setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()}) - if tenant.rag_tuning: - current_app.rag_tuning_logger.debug(f'Full prompt for tenant {tenant.id}:\n{full_template}.') - current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') - - new_interaction_embeddings = [] - if not model_variables['cited_answer_cls']: # The model doesn't support structured feedback - output_parser = StrOutputParser() - - chain = setup_and_retrieval | rag_prompt | llm | output_parser - - # Invoke the chain with the actual question - answer = chain.invoke(detailed_question) - new_interaction.answer = answer - result = { - 'answer': answer, - 'citations': [], - 'insufficient_info': False - } - - else: # The model supports structured feedback - structured_llm = llm.with_structured_output(model_variables['cited_answer_cls']) - - chain = setup_and_retrieval | rag_prompt | structured_llm - - result = chain.invoke(detailed_question).dict() - current_app.logger.debug(f'ask_question: result answer: {result['answer']}') - current_app.logger.debug(f'ask_question: result citations: {result["citations"]}') - current_app.logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}') + with current_event.create_span("Detail Question"): + detailed_question = detail_question(question, language, model_variables, chat_session.session_id) + current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}') if tenant.rag_tuning: - current_app.rag_tuning_logger.debug(f'ask_question: result answer: {result['answer']}') - current_app.rag_tuning_logger.debug(f'ask_question: result citations: {result["citations"]}') - current_app.rag_tuning_logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}') + current_app.rag_tuning_logger.debug(f'Detailed Question for tenant {tenant.id}:\n{question}.') current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') - new_interaction.answer = result['answer'] + new_interaction.detailed_question = detailed_question + new_interaction.detailed_question_at = dt.now(tz.utc) - # Filter out the existing Embedding IDs - given_embedding_ids = [int(emb_id) for emb_id in result['citations']] - embeddings = ( - db.session.query(Embedding) - .filter(Embedding.id.in_(given_embedding_ids)) - .all() - ) - existing_embedding_ids = [emb.id for emb in embeddings] - urls = list(set(emb.document_version.url for emb in embeddings)) + with current_event.create_span("Generate Answer using RAG"): + retriever = EveAIRetriever(model_variables, tenant_info) + llm = model_variables['llm'] + template = model_variables['rag_template'] + language_template = create_language_template(template, language) + full_template = replace_variable_in_template(language_template, "{tenant_context}", model_variables['rag_context']) + rag_prompt = ChatPromptTemplate.from_template(full_template) + setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()}) if tenant.rag_tuning: - current_app.rag_tuning_logger.debug(f'Referenced documents for answer for tenant {tenant.id}:\n') - current_app.rag_tuning_logger.debug(f'{urls}') + current_app.rag_tuning_logger.debug(f'Full prompt for tenant {tenant.id}:\n{full_template}.') current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') - for emb_id in existing_embedding_ids: - new_interaction_embedding = InteractionEmbedding(embedding_id=emb_id) - new_interaction_embedding.interaction = new_interaction - new_interaction_embeddings.append(new_interaction_embedding) + new_interaction_embeddings = [] + if not model_variables['cited_answer_cls']: # The model doesn't support structured feedback + output_parser = StrOutputParser() - result['citations'] = urls + chain = setup_and_retrieval | rag_prompt | llm | output_parser - # Disable langchain debugging if set above. - # set_debug(False) + # Invoke the chain with the actual question + answer = chain.invoke(detailed_question) + new_interaction.answer = answer + result = { + 'answer': answer, + 'citations': [], + 'insufficient_info': False + } - new_interaction.answer_at = dt.now(tz.utc) - chat_session.session_end = dt.now(tz.utc) + else: # The model supports structured feedback + structured_llm = llm.with_structured_output(model_variables['cited_answer_cls']) - try: - db.session.add(chat_session) - db.session.add(new_interaction) - db.session.add_all(new_interaction_embeddings) - db.session.commit() - return result, new_interaction - except SQLAlchemyError as e: - current_app.logger.error(f'ask_question: Error saving interaction to database: {e}') - raise + chain = setup_and_retrieval | rag_prompt | structured_llm + + result = chain.invoke(detailed_question).dict() + current_app.logger.debug(f'ask_question: result answer: {result['answer']}') + current_app.logger.debug(f'ask_question: result citations: {result["citations"]}') + current_app.logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}') + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'ask_question: result answer: {result['answer']}') + current_app.rag_tuning_logger.debug(f'ask_question: result citations: {result["citations"]}') + current_app.rag_tuning_logger.debug(f'ask_question: insufficient information: {result["insufficient_info"]}') + current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') + new_interaction.answer = result['answer'] + + # Filter out the existing Embedding IDs + given_embedding_ids = [int(emb_id) for emb_id in result['citations']] + embeddings = ( + db.session.query(Embedding) + .filter(Embedding.id.in_(given_embedding_ids)) + .all() + ) + existing_embedding_ids = [emb.id for emb in embeddings] + urls = list(set(emb.document_version.url for emb in embeddings)) + if tenant.rag_tuning: + current_app.rag_tuning_logger.debug(f'Referenced documents for answer for tenant {tenant.id}:\n') + current_app.rag_tuning_logger.debug(f'{urls}') + current_app.rag_tuning_logger.debug(f'-------------------------------------------------------------------') + + for emb_id in existing_embedding_ids: + new_interaction_embedding = InteractionEmbedding(embedding_id=emb_id) + new_interaction_embedding.interaction = new_interaction + new_interaction_embeddings.append(new_interaction_embedding) + + result['citations'] = urls + + # Disable langchain debugging if set above. + # set_debug(False) + + new_interaction.answer_at = dt.now(tz.utc) + chat_session.session_end = dt.now(tz.utc) + + try: + db.session.add(chat_session) + db.session.add(new_interaction) + db.session.add_all(new_interaction_embeddings) + db.session.commit() + return result, new_interaction + except SQLAlchemyError as e: + current_app.logger.error(f'ask_question: Error saving interaction to database: {e}') + raise def answer_using_llm(question, language, tenant, chat_session): @@ -236,47 +243,49 @@ def answer_using_llm(question, language, tenant, chat_session): # Langchain debugging if required # set_debug(True) - detailed_question = detail_question(question, language, model_variables, chat_session.session_id) - current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}') - new_interaction.detailed_question = detailed_question - new_interaction.detailed_question_at = dt.now(tz.utc) + with current_event.create_span("Detail Question"): + detailed_question = detail_question(question, language, model_variables, chat_session.session_id) + current_app.logger.debug(f'Original question:\n {question}\n\nDetailed question: {detailed_question}') + new_interaction.detailed_question = detailed_question + new_interaction.detailed_question_at = dt.now(tz.utc) - retriever = EveAIRetriever(model_variables, tenant_info) - llm = model_variables['llm_no_rag'] - template = model_variables['encyclopedia_template'] - language_template = create_language_template(template, language) - rag_prompt = ChatPromptTemplate.from_template(language_template) - setup = RunnablePassthrough() - output_parser = StrOutputParser() + with current_event.create_span("Detail Answer using LLM"): + retriever = EveAIRetriever(model_variables, tenant_info) + llm = model_variables['llm_no_rag'] + template = model_variables['encyclopedia_template'] + language_template = create_language_template(template, language) + rag_prompt = ChatPromptTemplate.from_template(language_template) + setup = RunnablePassthrough() + output_parser = StrOutputParser() - new_interaction_embeddings = [] + new_interaction_embeddings = [] - chain = setup | rag_prompt | llm | output_parser - input_question = {"question": detailed_question} + chain = setup | rag_prompt | llm | output_parser + input_question = {"question": detailed_question} - # Invoke the chain with the actual question - answer = chain.invoke(input_question) - new_interaction.answer = answer - result = { - 'answer': answer, - 'citations': [], - 'insufficient_info': False - } + # Invoke the chain with the actual question + answer = chain.invoke(input_question) + new_interaction.answer = answer + result = { + 'answer': answer, + 'citations': [], + 'insufficient_info': False + } - # Disable langchain debugging if set above. - # set_debug(False) + # Disable langchain debugging if set above. + # set_debug(False) - new_interaction.answer_at = dt.now(tz.utc) - chat_session.session_end = dt.now(tz.utc) + new_interaction.answer_at = dt.now(tz.utc) + chat_session.session_end = dt.now(tz.utc) - try: - db.session.add(chat_session) - db.session.add(new_interaction) - db.session.commit() - return result, new_interaction - except SQLAlchemyError as e: - current_app.logger.error(f'ask_question: Error saving interaction to database: {e}') - raise + try: + db.session.add(chat_session) + db.session.add(new_interaction) + db.session.commit() + return result, new_interaction + except SQLAlchemyError as e: + current_app.logger.error(f'ask_question: Error saving interaction to database: {e}') + raise def tasks_ping(): diff --git a/eveai_workers/Processors/audio_processor.py b/eveai_workers/Processors/audio_processor.py index d3c7e0d..6e08a45 100644 --- a/eveai_workers/Processors/audio_processor.py +++ b/eveai_workers/Processors/audio_processor.py @@ -7,6 +7,7 @@ from common.extensions import minio_client import subprocess from .transcription_processor import TranscriptionProcessor +from common.utils.business_event_context import current_event class AudioProcessor(TranscriptionProcessor): @@ -24,8 +25,13 @@ class AudioProcessor(TranscriptionProcessor): self.document_version.id, self.document_version.file_name ) - compressed_audio = self._compress_audio(file_data) - return self._transcribe_audio(compressed_audio) + + with current_event.create_span("Audio Processing"): + compressed_audio = self._compress_audio(file_data) + with current_event.create_span("Transcription Generation"): + transcription = self._transcribe_audio(compressed_audio) + + return transcription def _compress_audio(self, audio_data): self._log("Compressing audio") diff --git a/eveai_workers/Processors/html_processor.py b/eveai_workers/Processors/html_processor.py index acc307c..9538cb1 100644 --- a/eveai_workers/Processors/html_processor.py +++ b/eveai_workers/Processors/html_processor.py @@ -31,8 +31,10 @@ class HTMLProcessor(Processor): ) html_content = file_data.decode('utf-8') - extracted_html, title = self._parse_html(html_content) - markdown = self._generate_markdown_from_html(extracted_html) + with current_event.create_span("HTML Content Extraction"): + extracted_html, title = self._parse_html(html_content) + with current_event.create_span("Markdown Generation"): + markdown = self._generate_markdown_from_html(extracted_html) self._save_markdown(markdown) self._log("Finished processing HTML") diff --git a/eveai_workers/Processors/pdf_processor.py b/eveai_workers/Processors/pdf_processor.py index cc2e156..b8826e6 100644 --- a/eveai_workers/Processors/pdf_processor.py +++ b/eveai_workers/Processors/pdf_processor.py @@ -10,6 +10,7 @@ from langchain_core.runnables import RunnablePassthrough from common.extensions import minio_client from common.utils.model_utils import create_language_template from .processor import Processor +from common.utils.business_event_context import current_event class PDFProcessor(Processor): @@ -32,13 +33,14 @@ class PDFProcessor(Processor): self.document_version.file_name ) - extracted_content = self._extract_content(file_data) - structured_content, title = self._structure_content(extracted_content) + with current_event.create_span("PDF Extraction"): + extracted_content = self._extract_content(file_data) + structured_content, title = self._structure_content(extracted_content) - llm_chunks = self._split_content_for_llm(structured_content) - markdown = self._process_chunks_with_llm(llm_chunks) - - self._save_markdown(markdown) + with current_event.create_span("Markdown Generation"): + llm_chunks = self._split_content_for_llm(structured_content) + markdown = self._process_chunks_with_llm(llm_chunks) + self._save_markdown(markdown) self._log("Finished processing PDF") return markdown, title except Exception as e: diff --git a/eveai_workers/Processors/transcription_processor.py b/eveai_workers/Processors/transcription_processor.py index 09e3544..837c7a5 100644 --- a/eveai_workers/Processors/transcription_processor.py +++ b/eveai_workers/Processors/transcription_processor.py @@ -1,11 +1,13 @@ # transcription_processor.py -from common.utils.model_utils import create_language_template -from .processor import Processor from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough +from common.utils.model_utils import create_language_template +from .processor import Processor +from common.utils.business_event_context import current_event + class TranscriptionProcessor(Processor): def __init__(self, tenant, model_variables, document_version): @@ -16,12 +18,14 @@ class TranscriptionProcessor(Processor): def process(self): self._log("Starting Transcription processing") try: - transcription = self._get_transcription() - chunks = self._chunk_transcription(transcription) - markdown_chunks = self._process_chunks(chunks) - full_markdown = self._combine_markdown_chunks(markdown_chunks) - self._save_markdown(full_markdown) - self._log("Finished processing Transcription") + with current_event.create_span("Transcription Generation"): + transcription = self._get_transcription() + with current_event.create_span("Markdown Generation"): + chunks = self._chunk_transcription(transcription) + markdown_chunks = self._process_chunks(chunks) + full_markdown = self._combine_markdown_chunks(markdown_chunks) + self._save_markdown(full_markdown) + self._log("Finished processing Transcription") return full_markdown, self._extract_title_from_markdown(full_markdown) except Exception as e: self._log(f"Error processing Transcription: {str(e)}", level='error') diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index cd86fc2..ab551c1 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -39,8 +39,6 @@ def create_embeddings(tenant_id, document_version_id): # BusinessEvent creates a context, which is why we need to use it with a with block with BusinessEvent('Create Embeddings', tenant_id, document_version_id=document_version_id): current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}') - current_event.log("Starting Embedding Creation Task") - try: # Retrieve Tenant for which we are processing tenant = Tenant.query.get(tenant_id) @@ -125,13 +123,13 @@ def delete_embeddings_for_document_version(document_version): def process_pdf(tenant, model_variables, document_version): - current_event.log("Starting PDF Processing") - processor = PDFProcessor(tenant, model_variables, document_version) - markdown, title = processor.process() + with current_event.create_span("PDF Processing"): + processor = PDFProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() # Process markdown and embed - embed_markdown(tenant, model_variables, document_version, markdown, title) - current_event.log("Finished PDF Processing") + with current_event.create_span("Embedding"): + embed_markdown(tenant, model_variables, document_version, markdown, title) def process_html(tenant, model_variables, document_version): @@ -144,29 +142,27 @@ def process_html(tenant, model_variables, document_version): embed_markdown(tenant, model_variables, document_version, markdown, title) - def process_audio(tenant, model_variables, document_version): - current_event.log("Starting Audio Processing") - processor = AudioProcessor(tenant, model_variables, document_version) - markdown, title = processor.process() + with current_event.create_span("Audio Processing"): + processor = AudioProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() # Process markdown and embed - embed_markdown(tenant, model_variables, document_version, markdown, title) - current_event.log("Finished Audio Processing") + with current_event.create_span("Embedding"): + embed_markdown(tenant, model_variables, document_version, markdown, title) def process_srt(tenant, model_variables, document_version): - current_event.log("Starting SRT Processing") - processor = SRTProcessor(tenant, model_variables, document_version) - markdown, title = processor.process() + with current_event.create_span("SRT Processing"): + processor = SRTProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() # Process markdown and embed - embed_markdown(tenant, model_variables, document_version, markdown, title) - current_event.log("Finished SRT Processing") + with current_event.create_span("Embedding"): + embed_markdown(tenant, model_variables, document_version, markdown, title) def embed_markdown(tenant, model_variables, document_version, markdown, title): - current_event.log("Starting Embedding Markdown Processing") # Create potential chunks potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") @@ -195,7 +191,6 @@ def embed_markdown(tenant, model_variables, document_version, markdown, title): current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') - current_event.log("Finished Embedding Markdown Processing") def enrich_chunks(tenant, model_variables, document_version, title, chunks): @@ -238,7 +233,7 @@ def enrich_chunks(tenant, model_variables, document_version, title, chunks): def summarize_chunk(tenant, model_variables, document_version, chunk): - current_event.log("Starting Summarizing Chunk Processing") + current_event.log("Starting Summarizing Chunk") current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] @@ -256,7 +251,7 @@ def summarize_chunk(tenant, model_variables, document_version, chunk): summary = chain.invoke({"text": chunk}) current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}.') - current_event.log("Finished summarizing chunk for tenant ") + current_event.log("Finished Summarizing Chunk") return summary except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' diff --git a/requirements.txt b/requirements.txt index 9fcec88..a03ad4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -63,7 +63,7 @@ zxcvbn~=4.4.28 groq~=0.9.0 pydub~=0.25.1 argparse~=1.4.0 -portkey_ai~=1.8.2 +portkey_ai~=1.8.7 minio~=7.2.7 Werkzeug~=3.0.3 itsdangerous~=2.2.0