- Re-introduced detail_question to crewai specialists

This commit is contained in:
Josako
2025-03-10 15:49:21 +01:00
parent 4b43f96afe
commit 6465e4f358
6 changed files with 105 additions and 26 deletions

View File

@@ -1,10 +1,10 @@
version: "1.0.0"
content: |
You are a helpful assistant that details a question based on a previous context,
in such a way that the question is understandable without the previous context.
The context is a conversation history, with the HUMAN asking questions, the AI answering questions.
The history is delimited between triple backquotes.
You answer by stating the question in {language}.
You are a helpful assistant that details a question based on a conversation history, in such a way that the detailed
question is understandable without that history. The conversation is a consequence of questions and context provided
by the HUMAN, and the AI (you) answering back, in chronological order. The most recent (i.e. last) elements are the
most important when detailing the question.
You answer by stating the detailed question in {language}.
History:
```{history}```
Question to be detailed:

View File

@@ -58,9 +58,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
self.rag_crew,
)
def execute(self, arguments: SpecialistArguments) -> SpecialistResult:
formatted_context, citations = self.retrieve_context(arguments)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist execution started", {})
flow_inputs = {

View File

@@ -116,9 +116,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
self.rag_consolidation_crew
)
def execute(self, arguments: SpecialistArguments) -> SpecialistResult:
formatted_context, citations = self.retrieve_context(arguments)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("SPIN Specialist execution started", {})
flow_inputs = {
@@ -161,8 +159,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
return results
# TODO: metrics
class SPINSpecialistInput(BaseModel):
language: Optional[str] = Field(None, alias="language")

View File

@@ -94,7 +94,7 @@ class BaseSpecialistExecutor(ABC):
self.ept.send_update(self.task_id, processing_type, data)
@abstractmethod
def execute(self, arguments: SpecialistArguments) -> SpecialistResult:
def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult:
"""Execute the specialist's logic"""
pass

View File

@@ -3,10 +3,12 @@ from typing import Dict, Any, Optional, Type, TypeVar, List, Tuple
from crewai.flow.flow import FlowState
from flask import current_app
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from common.models.interaction import Specialist
from common.utils.business_event_context import current_event
from common.utils.model_utils import get_model_variables, get_crewai_llm
from common.utils.model_utils import get_model_variables, get_crewai_llm, create_language_template
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAIAgent, EveAICrewAITask
from crewai.tools import BaseTool
@@ -20,7 +22,7 @@ from common.utils.cache.crewai_configuration import (
ProcessedAgentConfig, ProcessedTaskConfig, ProcessedToolConfig,
SpecialistProcessedConfig
)
from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments
from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments, SpecialistResult
T = TypeVar('T') # For generic type hints
@@ -67,18 +69,19 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
# Format history for the prompt
self._formatted_history = "\n\n".join([
f"HUMAN:\n{interaction.specialist_results.get('detailed_query')}\n\n"
f"AI:\n{interaction.specialist_results.get('answer')}"
f"AI:\n{interaction.specialist_results.get('rag_output').get('answer')}"
for interaction in self._cached_session.interactions
])
@property
def formatted_history(self) -> str:
formatted_history = "\n\n".join([
f"HUMAN:\n{interaction.specialist_results.get('query')}\n\n"
if not self._formatted_history:
self._formatted_history = "\n\n".join([
f"HUMAN:\n{interaction.specialist_results.get('detailed_query')}\n\n"
f"AI:\n{interaction.specialist_results.get('rag_output').get('answer', '')}"
for interaction in self._cached_session.interactions
])
return formatted_history
return self._formatted_history
def _add_task_agent(self, task_name: str, agent_name: str):
self._task_agents[task_name.lower()] = agent_name
@@ -201,15 +204,68 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
"""Instantiate a crew (or flow) to set up the complete specialist, using the assets (agents, tasks, tools).
The assets can be retrieved using their type name in lower case, e.g. rag_agent"""
def retrieve_context(self, arguments: SpecialistArguments) -> Tuple[str, List[int]]:
def _detail_question(self, language: str, question: str) -> str:
"""Detail question based on conversation history"""
try:
with current_event.create_span("Specialist Detail Question"):
# Get LLM and template
llm = self.model_variables.get_llm(temperature=0.3)
template = cache_manager.prompts_config_cache.get_config('history').get('content', '')
current_app.logger.debug(f"History Template: {template}")
language_template = create_language_template(template, language)
current_app.logger.debug(f"History Language Template: {template}")
# Create prompt
history_prompt = ChatPromptTemplate.from_template(language_template)
# Create chain
chain = (
history_prompt |
llm |
StrOutputParser()
)
# Execute chain
current_app.logger.debug(f"Formatted History: {self.formatted_history}")
detailed_question = chain.invoke({
"history": self.formatted_history,
"question": question
})
self.log_tuning("_detail_question", {
"cached_session_id": self._cached_session.session_id,
"cached_session.interactions": str(self._cached_session.interactions),
"original_question": question,
"history_used": self.formatted_history,
"detailed_question": detailed_question,
})
self.update_progress("Detail Question", {"name": self.type})
return detailed_question
except Exception as e:
current_app.logger.error(f"Error detailing question: {e}")
return question # Fallback to original question
def _retrieve_context(self, arguments: SpecialistArguments) -> Tuple[str, List[int]]:
with current_event.create_span("Specialist Retrieval"):
self.log_tuning("Starting context retrieval", {
"num_retrievers": len(self.retrievers),
"all arguments": arguments.model_dump(),
})
current_app.logger.debug(f"Retrieving context from arguments: {arguments}")
original_query = arguments.query
detailed_query = self._detail_question(arguments.language, original_query)
modified_arguments = arguments.model_copy(update={
"query": detailed_query,
"original_query": original_query
})
# Get retriever-specific arguments
retriever_arguments = arguments.retriever_arguments
retriever_arguments = modified_arguments.retriever_arguments
# Collect context from all retrievers
all_context = []
@@ -233,6 +289,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
retriever_args = RetrieverArguments(**current_retriever_args)
# Each retriever gets its own specific arguments
current_app.logger.debug(f"Retrieving context {retriever_id} with arguments {retriever_args}")
retriever_result = retriever.retrieve(retriever_args)
all_context.extend(retriever_result)
@@ -266,3 +323,31 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
"Citations": citations})
return formatted_context, citations
@abstractmethod
def execute(self, arguments: SpecialistArguments, formatted_context: str, citations: List[int]) -> SpecialistResult:
pass
def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult:
# Detail the incoming query
if self._cached_session.interactions:
query = arguments.query
language = arguments.language
detailed_query = self._detail_question(language, query)
else:
detailed_query = arguments.query
modified_arguments = {
"query": detailed_query,
"original_query": arguments.query
}
detailed_arguments = arguments.model_copy(update=modified_arguments)
formatted_context, citations = self._retrieve_context(detailed_arguments)
result = self.execute(detailed_arguments, formatted_context, citations)
modified_result = {
"detailed_query": detailed_query,
}
final_result = result.model_copy(update=modified_result)
return final_result

View File

@@ -290,7 +290,7 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict
)
# Execute specialist
result = specialist_instance.execute(complete_arguments)
result = specialist_instance.execute_specialist(complete_arguments)
# Update interaction record
new_interaction.specialist_results = result.model_dump(mode='json') # Store complete result