- Full Traicie Selection Specialist Flow implemented

- Added Specialist basics for handling phases and automatically transferring data between state and output
- Added QR-code generation for Magic Links
This commit is contained in:
Josako
2025-06-23 11:46:56 +02:00
parent 5b2c04501c
commit 7b87880045
10 changed files with 272 additions and 33 deletions

View File

@@ -50,15 +50,20 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
self._task_pydantic_outputs: Dict[str, Type[BaseModel]] = {}
self._task_state_names: Dict[str, str] = {}
# Processed configurations
# State-Result relations (for adding / restoring information to / from history
self._state_result_relations: Dict[str, str] = {}
# Process configurations
self._config = cache_manager.crewai_processed_config_cache.get_specialist_config(tenant_id, specialist_id)
self._config_task_agents()
self._config_pydantic_outputs()
self._instantiate_crew_assets()
self._instantiate_specialist()
self._config_state_result_relations()
# Retrieve history
self._cached_session = cache_manager.chat_session_cache.get_cached_session(self.session_id)
self._restore_state_from_history()
# Format history for the prompt
self._formatted_history = self._generate_formatted_history()
@@ -106,6 +111,19 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
"""Configure the task pydantic outputs by adding task-output combinations. Use _add_pydantic_output()"""
raise NotImplementedError
def _add_state_result_relation(self, state_name: str, result_name: str = None):
"""Add a state-result relation to the specialist. This is used to add information to the history
If result_name is None, the state name is used as the result name. (default behavior)
"""
if not result_name:
result_name = state_name
self._state_result_relations[state_name] = result_name
@abstractmethod
def _config_state_result_relations(self):
"""Configure the state-result relations by adding state-result combinations. Use _add_state_result_relation()"""
raise NotImplementedError
@property
def task_pydantic_outputs(self):
return self._task_pydantic_outputs
@@ -330,6 +348,27 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
return formatted_context, citations
def _update_specialist_results(self, specialist_results: SpecialistResult) -> SpecialistResult:
"""Update the specialist results with the latest state information"""
update_data = {}
state_dict = self.flow.state.model_dump()
for state_name, result_name in self._state_result_relations.items():
if state_name in state_dict and state_dict[state_name] is not None:
update_data[result_name] = state_dict[state_name]
return specialist_results.model_copy(update=update_data)
def _restore_state_from_history(self):
"""Restore the state from the history"""
if not self._cached_session.interactions:
return
last_interaction = self._cached_session.interactions[-1]
if not last_interaction.specialist_results:
return
for state_name, result_name in self._state_result_relations.items():
if result_name in last_interaction.specialist_results:
setattr(self.flow.state, state_name, last_interaction.specialist_results[result_name])
@abstractmethod
def execute(self, arguments: SpecialistArguments, formatted_context: str, citations: List[int]) -> SpecialistResult:
raise NotImplementedError
@@ -356,8 +395,10 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
"detailed_query": detailed_query,
"citations": citations,
}
final_result = result.model_copy(update=modified_result)
intermediate_result = result.model_copy(update=modified_result)
else:
final_result = self.execute(arguments, "", [])
intermediate_result = self.execute(arguments, "", [])
final_result = self._update_specialist_results(intermediate_result)
return final_result

View File

@@ -22,6 +22,7 @@ from common.services.interaction.specialist_services import SpecialistServices
from common.extensions import cache_manager
from eveai_chat_workers.definitions.language_level.language_level_v1_0 import LANGUAGE_LEVEL
from eveai_chat_workers.definitions.tone_of_voice.tone_of_voice_v1_0 import TONE_OF_VOICE
from common.utils.eveai_exceptions import EveAISpecialistExecutionError
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
@@ -53,6 +54,13 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
def _config_pydantic_outputs(self):
self._add_pydantic_output("traicie_ko_criteria_interview_definition_task", KOQuestions, "ko_questions")
def _config_state_result_relations(self):
self._add_state_result_relation("ko_criteria_questions")
self._add_state_result_relation("ko_criteria_scores")
self._add_state_result_relation("competency_questions")
self._add_state_result_relation("competency_scores")
self._add_state_result_relation("personal_contact_data")
def _instantiate_specialist(self):
verbose = self.tuning
@@ -89,16 +97,14 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
match specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
case "ko_questions":
contact_form = cache_manager.specialist_forms_config_cache.get_config("PERSONAL_CONTACT_FORM", "1.0")
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben de antwoorden op de KO criteria verwerkt. Je bent een geschikte kandidaat. Kan je je contactegevens doorgeven?",
form_request=contact_form,
phase="personal_contact_data")
case "ko_question_evaluation":
results = self.execute_ko_question_evaluation(arguments, formatted_context, citations)
case "personal_contact_data":
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben de contactgegevens verwerkt. We nemen zo snel mogelijk contact met je op.",
phase="candidate_selected")
results = self.execute_personal_contact_data(arguments, formatted_context, citations)
case "no_valid_candidate":
results = self.execute_no_valid_candidate(arguments, formatted_context, citations)
case "candidate_selected":
results = self.execute_candidate_selected(arguments, formatted_context, citations)
self.log_tuning(f"Traicie Selection Specialist execution ended", {"Results": results.model_dump() if results else "No info"})
@@ -108,18 +114,30 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
def execute_initial_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist initial_state_execution started", {})
knockout_competencies = [
{
"title": c["title"],
"description": c["description"]
}
for c in self.specialist.configuration.get("competencies", [])
if c.get("is_knockout") is True
]
current_app.logger.debug(f"Specialist Competencies:\n{self.specialist.configuration.get("competencies", [])}")
# Convert TONE_OF_VOICE en LANGUAGE_LEVEL lists tp strings usable by the LLM
tone_of_voice_str = "\n\n".join([f"Name: {item['name']}\nDescription: {item['description']}\nWhen to use: {item['when_to_use']}" for item in TONE_OF_VOICE])
language_level_str = "\n\n".join([f"Name: {item['name']}\nDescription: {item['description']}\nCEFR level: {item['cefr_level']}\nIdeal Target Audience: {item['ideal_audience']}" for item in LANGUAGE_LEVEL])
ko_competencies = []
for competency in self.specialist.configuration.get("competencies", []):
if competency["is_knockout"] is True and competency["assess"] is True:
current_app.logger.debug(f"Assessable Knockout competency: {competency}")
ko_competencies.append({"title: ": competency["title"], "description": competency["description"]})
tone_of_voice = self.specialist.configuration.get('tone_of_voice', 'Professional & Neutral')
selected_tone_of_voice = next(
(item for item in TONE_OF_VOICE if item["name"] == tone_of_voice),
None # fallback indien niet gevonden
)
current_app.logger.debug(f"Selected tone of voice: {selected_tone_of_voice}")
tone_of_voice_context = f"{selected_tone_of_voice["description"]}"
language_level = self.specialist.configuration.get('language_level', 'Standard')
selected_language_level = next(
(item for item in LANGUAGE_LEVEL if item["name"] == language_level),
None
)
current_app.logger.debug(f"Selected language level: {selected_language_level}")
language_level_context = (f"{selected_language_level['description']}, "
f"corresponding to CEFR level {selected_language_level['cefr_level']}")
flow_inputs = {
"region": arguments.region,
@@ -127,11 +145,11 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"start_date": arguments.start_date,
"language": arguments.language,
"interaction_mode": arguments.interaction_mode,
'tone_of_voice': self.specialist.configuration.get('tone_of_voice', 'Professional & Neutral'),
'tone_of_voice_context': tone_of_voice_str,
'language_level': self.specialist.configuration.get('language_level', 'Standard'),
'language_level_context': language_level_str,
'ko_criteria': knockout_competencies,
'tone_of_voice': tone_of_voice,
'tone_of_voice_context': tone_of_voice_context,
'language_level': language_level,
'language_level_context': language_level_context,
'ko_criteria': ko_competencies,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
@@ -162,10 +180,69 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We starten met een aantal KO Criteria vragen",
form_request=ko_form,
phase="ko_questions")
phase="ko_question_evaluation")
return results
def execute_ko_question_evaluation(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist ko_question_evaluation started", {})
# Check if the form has been returned (it should)
if not arguments.form_values:
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id, "No form values returned")
current_app.logger.debug(f"Form values: {arguments.form_values}")
# Load the previous KO Questions
previous_ko_questions = self.flow.state.ko_criteria_questions
current_app.logger.debug(f"Previous KO Questions: {previous_ko_questions}")
# Evaluate KO Criteria
evaluation = "positive"
for criterium, answer in arguments.form_values.items():
for qa in previous_ko_questions:
if qa.get("title") == criterium:
if qa.get("answer_positive") != answer:
evaluation = "negative"
break
if evaluation == "negative":
break
if evaluation == "negative":
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben de antwoorden op de KO criteria verwerkt. Je voldoet jammer genoeg niet aan de minimale vereisten voor deze job.",
form_request=None,
phase="no_valid_candidate")
else:
# Check if answers to questions are positive
contact_form = cache_manager.specialist_forms_config_cache.get_config("PERSONAL_CONTACT_FORM", "1.0")
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben de antwoorden op de KO criteria verwerkt. Je bent een geschikte kandidaat. Kan je je contactegevens doorgeven?",
form_request=contact_form,
phase="personal_contact_data")
return results
def execute_personal_contact_data(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist personal_contact_data started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben de contactgegevens verwerkt. We nemen zo snel mogelijk contact met je op.",
phase="candidate_selected")
return results
def execute_no_valid_candidate(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist no_valid_candidate started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"Je voldoet jammer genoeg niet aan de minimale vereisten voor deze job. Maar solliciteer gerust voor één van onze andere jobs.",
phase="no_valid_candidate")
def execute_candidate_selected(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist candidate_selected started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben je contactgegegevens verwerkt. We nemen zo snel mogelijk contact met je op.",
phase="candidate_selected")
return results
class SelectionInput(BaseModel):
region: str = Field(..., alias="region")