- RAG Specialist fully implemented new style

- Selection Specialist - VA version - fully implemented
- Correction of TRAICIE_ROLE_DEFINITION_SPECIALIST - adaptation to new style
- Removal of 'debug' statements
This commit is contained in:
Josako
2025-07-10 10:39:42 +02:00
parent 509ee95d81
commit 51fd16bcc6
40 changed files with 110 additions and 298 deletions

View File

@@ -4,4 +4,4 @@ from pydantic import BaseModel, Field
class QAOutput(BaseModel):
answer: bool = Field(None, description="True or False")
answer: bool = Field(None, description="Your answer, True or False")

View File

@@ -4,6 +4,6 @@ from pydantic import BaseModel, Field
class RAGOutput(BaseModel):
answer: str = Field(None, description="Answer to the questions asked")
answer: str = Field(None, description="Answer to the questions asked, in Markdown format.")
insufficient_info: bool = Field(None, description="An indication if there's insufficient information to answer")

View File

@@ -108,6 +108,5 @@ def get_retriever_class(retriever_type: str, type_version: str):
module_path = f"eveai_chat_workers.retrievers.{partner}.{retriever_type}.{major_minor}"
else:
module_path = f"eveai_chat_workers.retrievers.globals.{retriever_type}.{major_minor}"
current_app.logger.debug(f"Importing retriever class from {module_path}")
module = importlib.import_module(module_path)
return module.RetrieverExecutor

View File

@@ -116,8 +116,8 @@ class RetrieverExecutor(BaseRetriever):
))
self.log_tuning('retrieve', {
"arguments": arguments.model_dump(),
"similarity_threshold": self.similarity_threshold,
"k": self.k,
"similarity_threshold": similarity_threshold,
"k": k,
"query": compiled_query,
"Raw Results": str(results),
"Processed Results": [r.model_dump() for r in processed_results],

View File

@@ -135,11 +135,9 @@ def get_specialist_class(specialist_type: str, type_version: str):
major_minor = '_'.join(type_version.split('.')[:2])
specialist_config = cache_manager.specialists_config_cache.get_config(specialist_type, type_version)
partner = specialist_config.get("partner", None)
current_app.logger.debug(f"Specialist partner for {specialist_type} {type_version} is {partner}")
if partner:
module_path = f"eveai_chat_workers.specialists.{partner}.{specialist_type}.{major_minor}"
else:
module_path = f"eveai_chat_workers.specialists.globals.{specialist_type}.{major_minor}"
current_app.logger.debug(f"Importing specialist class from {module_path}")
module = importlib.import_module(module_path)
return module.SpecialistExecutor

View File

@@ -40,7 +40,6 @@ class EveAICrewAIAgent(Agent):
Returns:
Output of the agent
"""
current_app.logger.debug(f"Task Execution {task.name} by {self.name}")
# with current_event.create_span(f"Task Execution {task.name} by {self.name}"):
self.specialist.log_tuning(f"EveAI Agent {self.name}, Task {task.name} Start", {})
self.specialist.update_progress("EveAI Agent Task Start",
@@ -134,11 +133,17 @@ class EveAICrewAIFlow(Flow):
return self.state
class Citation(BaseModel):
document_id: int
document_version_id: int
url: str
class EveAIFlowState(BaseModel):
"""Base class for all EveAI flow states"""
answer: Optional[str] = None
detailed_question: Optional[str] = None
question: Optional[str] = None
phase: Optional[str] = None
form_request: Optional[Dict[str, Any]] = None
citations: Optional[Dict[str, Any]] = None
citations: Optional[List[Citation]] = None

View File

@@ -78,14 +78,15 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
return "\n\n".join([
"\n\n".join([
f"HUMAN:\n"
f"{interaction.specialist_results['detailed_question']}"
if interaction.specialist_results.get('detailed_question') else "",
f"{interaction.specialist_arguments['question']}"
if interaction.specialist_arguments.get('question') else "",
f"{interaction.specialist_arguments.get('form_values')}"
if interaction.specialist_arguments.get('form_values') else "",
f"AI:\n{interaction.specialist_results['answer']}"
if interaction.specialist_results.get('answer') else ""
]).strip()
for interaction in self._cached_session.interactions
if interaction.specialist_arguments.get('question') != "Initialize"
])
def _add_task_agent(self, task_name: str, agent_name: str):
@@ -120,10 +121,9 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
self._state_result_relations[state_name] = result_name
def _config_default_state_result_relations(self):
for default_attribute_name in ['answer', 'detailed_question', 'form_request', 'phase', 'citations']:
for default_attribute_name in ['answer', 'form_request', 'phase', 'citations']:
self._add_state_result_relation(default_attribute_name)
@abstractmethod
def _config_state_result_relations(self):
"""Configure the state-result relations by adding state-result combinations. Use _add_state_result_relation()"""
@@ -150,6 +150,7 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
agent_goal = agent_config.get('goal', '').replace('{custom_goal}', agent.goal or '')
agent_goal = self._replace_system_variables(agent_goal)
agent_backstory = agent_config.get('backstory', '').replace('{custom_backstory}', agent.backstory or '')
agent_backstory = self._replace_system_variables(agent_backstory)
agent_full_model_name = agent_config.get('full_model_name', 'mistral.mistral-large-latest')
agent_temperature = agent_config.get('temperature', 0.3)
llm = get_crewai_llm(agent_full_model_name, agent_temperature)
@@ -183,12 +184,9 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
"verbose": task.tuning
}
task_name = task.type.lower()
current_app.logger.debug(f"Task {task_name} is getting processed")
if task_name in self._task_pydantic_outputs:
task_kwargs["output_pydantic"] = self._task_pydantic_outputs[task_name]
current_app.logger.debug(f"Task {task_name} has an output pydantic: {self._task_pydantic_outputs[task_name]}")
if task_name in self._task_agents:
current_app.logger.debug(f"Task {task_name} has an agent: {self._task_agents[task_name]}")
task_kwargs["agent"] = self._agents[self._task_agents[task_name]]
# Instantiate the task with dynamic arguments
@@ -236,46 +234,6 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
The assets can be retrieved using their type name in lower case, e.g. rag_agent"""
raise NotImplementedError
def _detail_question(self, language: str, question: str) -> str:
"""Detail question based on conversation history"""
try:
with current_event.create_span("Specialist Detail Question"):
# Get LLM and template
template, llm = get_template("history", temperature=0.3)
language_template = create_language_template(template, language)
# Create prompt
history_prompt = ChatPromptTemplate.from_template(language_template)
# Create chain
chain = (
history_prompt |
llm |
StrOutputParser()
)
# Execute chain
detailed_question = chain.invoke({
"history": self.formatted_history,
"question": question
})
self.log_tuning("_detail_question", {
"cached_session_id": self._cached_session.session_id,
"cached_session.interactions": str(self._cached_session.interactions),
"original_question": question,
"history_used": self.formatted_history,
"detailed_question": detailed_question,
})
self.update_progress("Detail Question", {"name": self.type})
return detailed_question
except Exception as e:
current_app.logger.error(f"Error detailing question: {e}")
return question # Fallback to original question
def _retrieve_context(self, arguments: SpecialistArguments) -> tuple[str, list[dict[str, Any]]]:
with current_event.create_span("Specialist Retrieval"):
self.log_tuning("Starting context retrieval", {
@@ -283,12 +241,8 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
"all arguments": arguments.model_dump(),
})
original_question = arguments.question
detailed_question = self._detail_question(arguments.language, original_question)
modified_arguments = arguments.model_copy(update={
"query": detailed_question,
"original_query": original_question
"query": arguments.question
})
@@ -361,11 +315,8 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
update_data = {}
state_dict = self.flow.state.model_dump()
current_app.logger.debug(f"Updating specialist results with state: {state_dict}")
for state_name, result_name in self._state_result_relations.items():
current_app.logger.debug(f"Try Updating {result_name} with {state_name}")
if state_name in state_dict and state_dict[state_name] is not None:
current_app.logger.debug(f"Updating {result_name} with {state_name} = {state_dict[state_name]}")
update_data[result_name] = state_dict[state_name]
return specialist_results.model_copy(update=update_data)
@@ -383,35 +334,22 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
# Initialize the standard state values
self.flow.state.answer = None
self.flow.state.detailed_question = None
self.flow.state.question = None
self.flow.state.form_request = None
self.flow.state.phase = None
self.flow.state.citations = []
@abstractmethod
def execute(self, arguments: SpecialistArguments, formatted_context: str, citations: List[int]) -> SpecialistResult:
def execute(self, arguments: SpecialistArguments,
formatted_context: Optional[str], citations: Optional[list[dict[str, Any]]]) -> SpecialistResult:
raise NotImplementedError
def execute_specialist(self, arguments: SpecialistArguments) -> SpecialistResult:
current_app.logger.debug(f"Retrievers for this specialist: {self.retrievers}")
if self.retrievers:
# Detail the incoming query
if self._cached_session.interactions:
question = arguments.question
language = arguments.language
detailed_question = self._detail_question(language, question)
else:
detailed_question = arguments.question
modified_arguments = {
"question": detailed_question,
"original_question": arguments.question
}
detailed_arguments = arguments.model_copy(update=modified_arguments)
formatted_context, citations = self._retrieve_context(detailed_arguments)
result = self.execute(detailed_arguments, formatted_context, citations)
formatted_context = None
citations = None
result = self.execute(arguments, formatted_context, citations)
modified_result = {
"detailed_question": detailed_question,
"citations": citations,
}
intermediate_result = result.model_copy(update=modified_result)

View File

@@ -69,18 +69,12 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist execution started", {})
current_app.logger.debug(f"Arguments: {arguments.model_dump()}")
current_app.logger.debug(f"Formatted Context: {formatted_context}")
current_app.logger.debug(f"Formatted History: {self._formatted_history}")
current_app.logger.debug(f"Cached Chat Session: {self._cached_session}")
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
current_app.logger.debug(f"Specialist Phase: {specialist_phase}")
match specialist_phase:
case "initial":
@@ -191,7 +185,6 @@ class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
self.state.input = RAGSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -1,6 +1,6 @@
import json
from os import wait
from typing import Optional, List
from typing import Optional, List, Dict, Any
from crewai.flow.flow import start, listen, and_
from flask import current_app
@@ -47,6 +47,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
def _config_state_result_relations(self):
self._add_state_result_relation("rag_output")
self._add_state_result_relation("citations")
def _instantiate_specialist(self):
verbose = self.tuning
@@ -69,18 +70,12 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist execution started", {})
current_app.logger.debug(f"Arguments: {arguments.model_dump()}")
current_app.logger.debug(f"Formatted Context: {formatted_context}")
current_app.logger.debug(f"Formatted History: {self._formatted_history}")
current_app.logger.debug(f"Cached Chat Session: {self._cached_session}")
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
current_app.logger.debug(f"Specialist Phase: {specialist_phase}")
match specialist_phase:
case "initial":
@@ -112,6 +107,8 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
INSUFFICIENT_INFORMATION_MESSAGE,
arguments.language)
formatted_context, citations = self._retrieve_context(arguments)
if formatted_context:
flow_inputs = {
"language": arguments.language,
@@ -128,16 +125,18 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
flow_results.rag_output.answer = insufficient_info_message
rag_output = flow_results.rag_output
else:
rag_output = RAGOutput(answer=insufficient_info_message, insufficient_info=True)
self.flow.state.rag_output = rag_output
self.flow.state.citations = citations
self.flow.state.answer = rag_output.answer
self.flow.state.phase = "rag"
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
return results
class RAGSpecialistInput(BaseModel):
language: Optional[str] = Field(None, alias="language")
@@ -156,6 +155,7 @@ class RAGFlowState(EveAIFlowState):
"""Flow state for RAG specialist that automatically updates from task outputs"""
input: Optional[RAGSpecialistInput] = None
rag_output: Optional[RAGOutput] = None
citations: Optional[List[Dict[str, Any]]] = None
class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
@@ -190,8 +190,6 @@ class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = RAGSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -216,9 +216,7 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
async def execute_rag(self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("In execute_rag")
crew_output = await self.rag_crew.kickoff_async(inputs=inputs)
current_app.logger.debug(f"Crew execution ended with output:\n{crew_output}")
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
@@ -277,11 +275,8 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
if self.state.spin:
additional_questions = additional_questions + self.state.spin.questions
inputs["additional_questions"] = additional_questions
current_app.logger.debug(f"Prepared Answers: \n{inputs['prepared_answers']}")
current_app.logger.debug(f"Additional Questions: \n{additional_questions}")
try:
crew_output = await self.rag_consolidation_crew.kickoff_async(inputs=inputs)
current_app.logger.debug(f"Consolidation output after crew execution:\n{crew_output}")
self.specialist_executor.log_tuning("RAG Consolidation Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
@@ -295,7 +290,6 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
self.state.input = SPINSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -1,4 +1,4 @@
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from pydantic import BaseModel, Field, model_validator
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
from common.extensions import cache_manager
@@ -103,10 +103,9 @@ class SpecialistResult(BaseModel):
# Structural optional fields available for all specialists
answer: Optional[str] = Field(None, description="Optional textual answer from the specialist")
detailed_question: Optional[str] = Field(None, description="Optional detailed question for the specialist")
form_request: Optional[Dict[str, Any]] = Field(None, description="Optional form definition to request user input")
phase: Optional[str] = Field(None, description="Phase of the specialist's workflow")
citations: Optional[Dict[str, Any]] = Field(None, description="Citations for the specialist's answer")
citations: Optional[List[Dict[str, Any]]] = Field(None, description="Citations for the specialist's answer")
@model_validator(mode='after')
def validate_required_results(self) -> 'SpecialistResult':

View File

@@ -71,11 +71,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie KO Criteria Interview Definition Specialist execution started", {})
current_app.logger.debug(f"Arguments: {arguments.model_dump()}")
current_app.logger.debug(f"Formatted Context: {formatted_context}")
current_app.logger.debug(f"Formatted History: {self._formatted_history}")
current_app.logger.debug(f"Cached Chat Session: {self._cached_session}")
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
@@ -104,13 +99,9 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id,
"Specialist is no Selection Specialist")
current_app.logger.debug(f"Specialist Competencies:\n"
f"{selection_specialist.configuration.get("competencies", [])}")
ko_competencies = []
for competency in selection_specialist.configuration.get("competencies", []):
if competency["is_knockout"] is True and competency["assess"] is True:
current_app.logger.debug(f"Assessable Knockout competency: {competency}")
ko_competencies.append({"title": competency["title"], "description": competency["description"]})
tone_of_voice = selection_specialist.configuration.get('tone_of_voice', 'Professional & Neutral')
@@ -118,7 +109,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
(item for item in TONE_OF_VOICE if item["name"] == tone_of_voice),
None # fallback indien niet gevonden
)
current_app.logger.debug(f"Selected tone of voice: {selected_tone_of_voice}")
tone_of_voice_context = f"{selected_tone_of_voice["description"]}"
language_level = selection_specialist.configuration.get('language_level', 'Standard')
@@ -126,7 +116,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
(item for item in LANGUAGE_LEVEL if item["name"] == language_level),
None
)
current_app.logger.debug(f"Selected language level: {selected_language_level}")
language_level_context = (f"{selected_language_level['description']}, "
f"corresponding to CEFR level {selected_language_level['cefr_level']}")
@@ -140,12 +129,9 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
current_app.logger.debug(f"Flow results: {flow_results}")
current_app.logger.debug(f"Flow state: {self.flow.state}")
new_type = "TRAICIE_KO_CRITERIA_QUESTIONS"
current_app.logger.debug(f"KO Criteria Questions:\n {self.flow.state.ko_questions}")
# Controleer of we een KOQuestions object hebben of een lijst van KOQuestion objecten
if hasattr(self.flow.state.ko_questions, 'to_json'):
# Het is een KOQuestions object
@@ -161,8 +147,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
ko_questions_data = [q.model_dump() for q in self.flow.state.ko_questions]
json_str = json.dumps(ko_questions_data, ensure_ascii=False, indent=2)
current_app.logger.debug(f"KO Criteria Questions json style:\n {json_str}")
try:
asset = db.session.query(EveAIAsset).filter(
EveAIAsset.type == new_type,
@@ -281,21 +265,17 @@ class KOFlow(EveAICrewAIFlow[KOFlowState]):
async def execute_ko_def_definition(self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("Run execute_ko_interview_definition")
crew_output = await self.ko_def_crew.kickoff_async(inputs=inputs)
# Unfortunately, crew_output will only contain the output of the latest task.
# As we will only take into account the flow state, we need to ensure both competencies and criteria
# are copies to the flow state.
update = {}
for task in self.ko_def_crew.tasks:
current_app.logger.debug(f"Task {task.name} output:\n{task.output}")
if task.name == "traicie_ko_criteria_interview_definition_task":
# update["competencies"] = task.output.pydantic.competencies
self.state.ko_questions = task.output.pydantic.ko_questions
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
self.state.phase = "personal_contact_data"
current_app.logger.debug(f"State after execute_ko_def_definition: {self.state}")
current_app.logger.debug(f"State dump after execute_ko_def_definition: {self.state.model_dump()}")
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_ko_def Kickoff Error: {str(e)}")
@@ -303,9 +283,6 @@ class KOFlow(EveAICrewAIFlow[KOFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = KODefInput.model_validate(inputs)
current_app.logger.debug(f"State: {self.state}")
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -143,8 +143,6 @@ class VacancyDefinitionFlow(EveAICrewAIFlow[VacancyDefFlowState]):
# update["criteria"] = task.output.pydantic.criteria
self.state.criteria = task.output.pydantic.criteria
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
current_app.logger.debug(f"State after execute_vac_def: {self.state}")
current_app.logger.debug(f"State dump after execute_vac_def: {self.state.model_dump()}")
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_vac_def Kickoff Error: {str(e)}")
@@ -152,7 +150,6 @@ class VacancyDefinitionFlow(EveAICrewAIFlow[VacancyDefFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
self.state.input = VacancyDefinitionSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -168,20 +168,16 @@ class RoleDefinitionFlow(EveAICrewAIFlow[RoleDefFlowState]):
async def execute_role_definition (self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("In execute_role_definition")
crew_output = await self.role_definition_crew.kickoff_async(inputs=inputs)
# Unfortunately, crew_output will only contain the output of the latest task.
# As we will only take into account the flow state, we need to ensure both competencies and criteria
# are copies to the flow state.
update = {}
for task in self.role_definition_crew.tasks:
current_app.logger.debug(f"Task {task.name} output:\n{task.output}")
if task.name == "traicie_get_competencies_task":
# update["competencies"] = task.output.pydantic.competencies
self.state.competencies = task.output.pydantic.competencies
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
current_app.logger.debug(f"State after execute_role_definition: {self.state}")
current_app.logger.debug(f"State dump after execute_role_definition: {self.state.model_dump()}")
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_role_definition Kickoff Error: {str(e)}")
@@ -189,9 +185,6 @@ class RoleDefinitionFlow(EveAICrewAIFlow[RoleDefFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = RoleDefinitionSpecialistInput.model_validate(inputs)
current_app.logger.debug(f"State: {self.state}")
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -174,20 +174,16 @@ class RoleDefinitionFlow(EveAICrewAIFlow[RoleDefFlowState]):
async def execute_role_definition (self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("In execute_role_definition")
crew_output = await self.role_definition_crew.kickoff_async(inputs=inputs)
# Unfortunately, crew_output will only contain the output of the latest task.
# As we will only take into account the flow state, we need to ensure both competencies and criteria
# are copies to the flow state.
update = {}
for task in self.role_definition_crew.tasks:
current_app.logger.debug(f"Task {task.name} output:\n{task.output}")
if task.name == "traicie_get_competencies_task":
# update["competencies"] = task.output.pydantic.competencies
self.state.competencies = task.output.pydantic.competencies
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
current_app.logger.debug(f"State after execute_role_definition: {self.state}")
current_app.logger.debug(f"State dump after execute_role_definition: {self.state.model_dump()}")
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_role_definition Kickoff Error: {str(e)}")
@@ -195,9 +191,6 @@ class RoleDefinitionFlow(EveAICrewAIFlow[RoleDefFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = RoleDefinitionSpecialistInput.model_validate(inputs)
current_app.logger.debug(f"State: {self.state}")
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -61,6 +61,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
# Load the Tenant & set language
self.tenant = Tenant.query.get_or_404(tenant_id)
self.specialist_phase = "initial"
@property
def type(self) -> str:
@@ -106,19 +107,13 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist execution started", {})
current_app.logger.debug(f"Arguments: {arguments.model_dump()}")
current_app.logger.debug(f"Formatted Context: {formatted_context}")
current_app.logger.debug(f"Formatted History: {self._formatted_history}")
current_app.logger.debug(f"Cached Chat Session: {self._cached_session}")
if not self._cached_session.interactions:
specialist_phase = "initial"
self.specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
self.specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
current_app.logger.debug(f"Specialist phase: {specialist_phase}")
match specialist_phase:
match self.specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
case "start_selection_procedure":
@@ -149,16 +144,21 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
interaction_mode = arguments.interaction_mode
if not interaction_mode:
interaction_mode = "selection"
current_app.logger.debug(f"Interaction mode: {interaction_mode}")
welcome_message = self.specialist.configuration.get("welcome_message", "Welcome to our selection process.")
welcome_message = TranslationServices.translate(self.tenant_id, welcome_message, arguments.language)
if interaction_mode == "selection":
return self.execute_start_selection_procedure_state(arguments, formatted_context, citations,
welcome_message)
else: # We are in orientation mode, so we perform standard rag
return self.execute_rag_state(arguments, formatted_context, citations, welcome_message)
# We are in orientation mode, so we give a standard message, and move to rag state
start_selection_question = TranslationServices.translate(self.tenant_id, START_SELECTION_QUESTION,
arguments.language)
self.flow.state.answer = f"{welcome_message}\n\n{start_selection_question}"
self.flow.state.phase = "rag"
results = SelectionResult.create_for_type(self.type, self.type_version)
return results
def execute_start_selection_procedure_state(self, arguments: SpecialistArguments, formatted_context, citations,
start_message=None) -> SpecialistResult:
@@ -172,7 +172,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
ko_questions = self._get_ko_questions()
fields = {}
for ko_question in ko_questions.ko_questions:
current_app.logger.debug(f"KO Question: {ko_question}")
fields[ko_question.title] = {
"name": ko_question.title,
"description": ko_question.title,
@@ -213,11 +212,9 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
if not arguments.form_values:
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id,
"No form values returned")
current_app.logger.debug(f"Form values: {arguments.form_values}")
# Load the previous KO Questions
previous_ko_questions = self._get_ko_questions().ko_questions
current_app.logger.debug(f"Previous KO Questions: {previous_ko_questions}")
# Evaluate KO Criteria
evaluation = "positive"
@@ -355,39 +352,37 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_rag_state(self, arguments: SpecialistArguments, formatted_context, citations, welcome_message=None) \
def execute_rag_state(self, arguments: SpecialistArguments, formatted_context, citations) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist rag_state started", {})
start_selection_question = TranslationServices.translate(self.tenant_id, START_SELECTION_QUESTION,
arguments.language)
if welcome_message:
answer = f"{welcome_message}\n\n{start_selection_question}"
else:
answer = ""
rag_results = None
if arguments.question:
if HumanAnswerServices.check_additional_information(self.tenant_id,
START_SELECTION_QUESTION,
arguments.question,
arguments.language):
rag_results = self.execute_rag(arguments, formatted_context, citations)
self.flow.state.rag_output = rag_results.rag_output
answer = f"{answer}\n{rag_results.answer}"
rag_output = None
if HumanAnswerServices.check_affirmative_answer(self.tenant_id,
if HumanAnswerServices.check_additional_information(self.tenant_id,
START_SELECTION_QUESTION,
arguments.question,
arguments.language):
return self.execute_start_selection_procedure_state(arguments, formatted_context, citations, answer)
rag_output = self.execute_rag(arguments, formatted_context, citations)
self.flow.state.rag_output = rag_output
answer = rag_output.answer
else:
answer = ""
self.flow.state.answer = answer
self.flow.state.phase = "rag"
self.flow.state.form_request = None
if HumanAnswerServices.check_affirmative_answer(self.tenant_id,
START_SELECTION_QUESTION,
arguments.question,
arguments.language):
return self.execute_start_selection_procedure_state(arguments, formatted_context, citations, answer)
else:
self.flow.state.answer = f"{answer}\n\n{start_selection_question}"
self.flow.state.phase = "rag"
self.flow.state.form_request = None
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_rag(self, arguments: SpecialistArguments, formatted_context, citations) -> RAGOutput:
self.log_tuning("RAG Specialist execution started", {})
@@ -395,6 +390,9 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
insufficient_info_message = TranslationServices.translate(self.tenant_id,
INSUFFICIENT_INFORMATION_MESSAGE,
arguments.language)
formatted_context, citations = self._retrieve_context(arguments)
self.flow.state.citations = citations
if formatted_context:
flow_inputs = {
"language": arguments.language,
@@ -403,9 +401,11 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"history": self.formatted_history,
"name": self.specialist.configuration.get('name', ''),
}
rag_output = self.flow.kickoff(inputs=flow_inputs)
if rag_output.rag_output.insufficient_info:
rag_output.rag_output.answer = insufficient_info_message
flow_results = self.flow.kickoff(inputs=flow_inputs)
if flow_results.rag_output.insufficient_info:
flow_results.rag_output.answer = insufficient_info_message
rag_output = flow_results.rag_output
else:
rag_output = RAGOutput(answer=insufficient_info_message, insufficient_info=True)
@@ -418,8 +418,11 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
START_SELECTION_QUESTION,
arguments.question,
arguments.language):
results = self.execute_rag(arguments, formatted_context, citations)
return results
rag_output = self.execute_rag(arguments, formatted_context, citations)
self.flow.state.rag_output = rag_output
return rag_output
else:
return None
@@ -439,7 +442,6 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
ko_questions_data = minio_client.download_asset_file(self.tenant_id, ko_questions_asset.bucket_name,
ko_questions_asset.object_name)
ko_questions = KOQuestions.from_json(ko_questions_data)
current_app.logger.debug(f"KO Questions: {ko_questions}")
return ko_questions
@@ -470,8 +472,8 @@ class SelectionInput(BaseModel):
history: Optional[str] = Field(None, alias="history")
name: Optional[str] = Field(None, alias="name")
# Selection elements
region: str = Field(..., alias="region")
working_schedule: Optional[str] = Field(..., alias="working_schedule")
region: Optional[str] = Field(None, alias="region")
working_schedule: Optional[str] = Field(None, alias="working_schedule")
start_date: Optional[date] = Field(None, alias="vacancy_text")
interaction_mode: Optional[str] = Field(None, alias="interaction_mode")
tone_of_voice: Optional[str] = Field(None, alias="tone_of_voice")
@@ -489,6 +491,7 @@ class SelectionFlowState(EveAIFlowState):
ko_criteria_answers: Optional[Dict[str, str]] = None
personal_contact_data: Optional[PersonalContactData] = None
contact_time: Optional[str] = None
citations: Optional[List[Dict[str, Any]]] = None
class SelectionResult(SpecialistResult):
@@ -530,7 +533,6 @@ class SelectionFlow(EveAICrewAIFlow[SelectionFlowState]):
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
self.state.input = SelectionInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state