From 6465e4f358de969f8cea9698f92f4f1b7a5e7bc9 Mon Sep 17 00:00:00 2001 From: Josako Date: Mon, 10 Mar 2025 15:49:21 +0100 Subject: [PATCH] - Re-introduced detail_question to crewai specialists --- config/prompts/history/1.0.0.yaml | 10 +- .../specialists/RAG_SPECIALIST/1_0.py | 4 +- .../specialists/SPIN_SPECIALIST/1_0.py | 6 +- .../specialists/base_specialist.py | 2 +- .../specialists/crewai_base_specialist.py | 107 ++++++++++++++++-- eveai_chat_workers/tasks.py | 2 +- 6 files changed, 105 insertions(+), 26 deletions(-) diff --git a/config/prompts/history/1.0.0.yaml b/config/prompts/history/1.0.0.yaml index 06862fc..3e77e53 100644 --- a/config/prompts/history/1.0.0.yaml +++ b/config/prompts/history/1.0.0.yaml @@ -1,10 +1,10 @@ version: "1.0.0" content: | - You are a helpful assistant that details a question based on a previous context, - in such a way that the question is understandable without the previous context. - The context is a conversation history, with the HUMAN asking questions, the AI answering questions. - The history is delimited between triple backquotes. - You answer by stating the question in {language}. + You are a helpful assistant that details a question based on a conversation history, in such a way that the detailed + question is understandable without that history. The conversation is a consequence of questions and context provided + by the HUMAN, and the AI (you) answering back, in chronological order. The most recent (i.e. last) elements are the + most important when detailing the question. + You answer by stating the detailed question in {language}. History: ```{history}``` Question to be detailed: diff --git a/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py index 5ac065b..d5df0b8 100644 --- a/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py +++ b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py @@ -58,9 +58,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor): self.rag_crew, ) - def execute(self, arguments: SpecialistArguments) -> SpecialistResult: - formatted_context, citations = self.retrieve_context(arguments) - + def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult: self.log_tuning("RAG Specialist execution started", {}) flow_inputs = { diff --git a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py index 9a2f0db..28e6371 100644 --- a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py +++ b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py @@ -116,9 +116,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor): self.rag_consolidation_crew ) - def execute(self, arguments: SpecialistArguments) -> SpecialistResult: - formatted_context, citations = self.retrieve_context(arguments) - + def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult: self.log_tuning("SPIN Specialist execution started", {}) flow_inputs = { @@ -161,8 +159,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor): return results - # TODO: metrics - class SPINSpecialistInput(BaseModel): language: Optional[str] = Field(None, alias="language") diff --git a/eveai_chat_workers/specialists/base_specialist.py b/eveai_chat_workers/specialists/base_specialist.py index 383af75..c046230 100644 --- a/eveai_chat_workers/specialists/base_specialist.py +++ b/eveai_chat_workers/specialists/base_specialist.py @@ -94,7 +94,7 @@ class BaseSpecialistExecutor(ABC): self.ept.send_update(self.task_id, processing_type, data) @abstractmethod - def execute(self, arguments: SpecialistArguments) -> SpecialistResult: + def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult: """Execute the specialist's logic""" pass diff --git a/eveai_chat_workers/specialists/crewai_base_specialist.py b/eveai_chat_workers/specialists/crewai_base_specialist.py index b28e8b7..c4aaa10 100644 --- a/eveai_chat_workers/specialists/crewai_base_specialist.py +++ b/eveai_chat_workers/specialists/crewai_base_specialist.py @@ -3,10 +3,12 @@ from typing import Dict, Any, Optional, Type, TypeVar, List, Tuple from crewai.flow.flow import FlowState from flask import current_app +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate from common.models.interaction import Specialist from common.utils.business_event_context import current_event -from common.utils.model_utils import get_model_variables, get_crewai_llm +from common.utils.model_utils import get_model_variables, get_crewai_llm, create_language_template from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAIAgent, EveAICrewAITask from crewai.tools import BaseTool @@ -20,7 +22,7 @@ from common.utils.cache.crewai_configuration import ( ProcessedAgentConfig, ProcessedTaskConfig, ProcessedToolConfig, SpecialistProcessedConfig ) -from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments +from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments, SpecialistResult T = TypeVar('T') # For generic type hints @@ -67,18 +69,19 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): # Format history for the prompt self._formatted_history = "\n\n".join([ f"HUMAN:\n{interaction.specialist_results.get('detailed_query')}\n\n" - f"AI:\n{interaction.specialist_results.get('answer')}" + f"AI:\n{interaction.specialist_results.get('rag_output').get('answer')}" for interaction in self._cached_session.interactions ]) @property def formatted_history(self) -> str: - formatted_history = "\n\n".join([ - f"HUMAN:\n{interaction.specialist_results.get('query')}\n\n" - f"AI:\n{interaction.specialist_results.get('rag_output').get('answer', '')}" - for interaction in self._cached_session.interactions - ]) - return formatted_history + if not self._formatted_history: + self._formatted_history = "\n\n".join([ + f"HUMAN:\n{interaction.specialist_results.get('detailed_query')}\n\n" + f"AI:\n{interaction.specialist_results.get('rag_output').get('answer', '')}" + for interaction in self._cached_session.interactions + ]) + return self._formatted_history def _add_task_agent(self, task_name: str, agent_name: str): self._task_agents[task_name.lower()] = agent_name @@ -201,15 +204,68 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): """Instantiate a crew (or flow) to set up the complete specialist, using the assets (agents, tasks, tools). The assets can be retrieved using their type name in lower case, e.g. rag_agent""" - def retrieve_context(self, arguments: SpecialistArguments) -> Tuple[str, List[int]]: + def _detail_question(self, language: str, question: str) -> str: + """Detail question based on conversation history""" + try: + with current_event.create_span("Specialist Detail Question"): + # Get LLM and template + llm = self.model_variables.get_llm(temperature=0.3) + template = cache_manager.prompts_config_cache.get_config('history').get('content', '') + current_app.logger.debug(f"History Template: {template}") + language_template = create_language_template(template, language) + current_app.logger.debug(f"History Language Template: {template}") + + # Create prompt + history_prompt = ChatPromptTemplate.from_template(language_template) + + # Create chain + chain = ( + history_prompt | + llm | + StrOutputParser() + ) + + # Execute chain + current_app.logger.debug(f"Formatted History: {self.formatted_history}") + detailed_question = chain.invoke({ + "history": self.formatted_history, + "question": question + }) + + self.log_tuning("_detail_question", { + "cached_session_id": self._cached_session.session_id, + "cached_session.interactions": str(self._cached_session.interactions), + "original_question": question, + "history_used": self.formatted_history, + "detailed_question": detailed_question, + }) + + self.update_progress("Detail Question", {"name": self.type}) + + return detailed_question + + except Exception as e: + current_app.logger.error(f"Error detailing question: {e}") + return question # Fallback to original question + + def _retrieve_context(self, arguments: SpecialistArguments) -> Tuple[str, List[int]]: with current_event.create_span("Specialist Retrieval"): self.log_tuning("Starting context retrieval", { "num_retrievers": len(self.retrievers), "all arguments": arguments.model_dump(), }) + current_app.logger.debug(f"Retrieving context from arguments: {arguments}") + + original_query = arguments.query + detailed_query = self._detail_question(arguments.language, original_query) + + modified_arguments = arguments.model_copy(update={ + "query": detailed_query, + "original_query": original_query + }) # Get retriever-specific arguments - retriever_arguments = arguments.retriever_arguments + retriever_arguments = modified_arguments.retriever_arguments # Collect context from all retrievers all_context = [] @@ -233,6 +289,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): retriever_args = RetrieverArguments(**current_retriever_args) # Each retriever gets its own specific arguments + current_app.logger.debug(f"Retrieving context {retriever_id} with arguments {retriever_args}") retriever_result = retriever.retrieve(retriever_args) all_context.extend(retriever_result) @@ -266,3 +323,31 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor): "Citations": citations}) return formatted_context, citations + + @abstractmethod + def execute(self, arguments: SpecialistArguments, formatted_context: str, citations: List[int]) -> SpecialistResult: + pass + + def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult: + # Detail the incoming query + if self._cached_session.interactions: + query = arguments.query + language = arguments.language + detailed_query = self._detail_question(language, query) + else: + detailed_query = arguments.query + + modified_arguments = { + "query": detailed_query, + "original_query": arguments.query + } + detailed_arguments = arguments.model_copy(update=modified_arguments) + formatted_context, citations = self._retrieve_context(detailed_arguments) + result = self.execute(detailed_arguments, formatted_context, citations) + + modified_result = { + "detailed_query": detailed_query, + } + final_result = result.model_copy(update=modified_result) + + return final_result diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index d3742cd..0ce79d4 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -290,7 +290,7 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict ) # Execute specialist - result = specialist_instance.execute(complete_arguments) + result = specialist_instance.execute_specialist(complete_arguments) # Update interaction record new_interaction.specialist_results = result.model_dump(mode='json') # Store complete result