- Introduction of TRACIE_KO_INTERVIEW_DEFINITION_SPECIALIST

- Re-introduction of EveAIAsset
- Make translation services resistent for situation with and without current_event defined.
- Ensure first question is asked in eveai_chat_client
- Start of version 1.4.0 of TRAICIE_SELECTION_SPECIALIST
This commit is contained in:
Josako
2025-07-02 16:58:43 +02:00
parent fbc9f44ac8
commit 51d029d960
34 changed files with 1292 additions and 302 deletions

View File

@@ -0,0 +1,308 @@
from datetime import datetime as dt, timezone as tz
from typing import Optional, List, Dict
import json
import yaml
from crewai.flow.flow import start, listen
from flask import current_app
from pydantic import BaseModel, Field
from common.extensions import db, minio_client
from common.models.interaction import Specialist, EveAIAsset
from common.utils.eveai_exceptions import EveAISpecialistExecutionError
from common.utils.model_logging_utils import set_logging_information
from eveai_chat_workers.definitions.language_level.language_level_v1_0 import LANGUAGE_LEVEL
from eveai_chat_workers.definitions.tone_of_voice.tone_of_voice_v1_0 import TONE_OF_VOICE
from eveai_chat_workers.outputs.traicie.knockout_questions.knockout_questions_v1_0 import KOQuestions, KOQuestion
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: TRAICIE_KO_INTERVIEW_DEFINITION_SPECIALIST
type_version: 1.0
Traicie Selection Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.ko_def_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
@property
def type(self) -> str:
return "TRAICIE_KO_INTERVIEW_DEFINITION_SPECIALIST"
@property
def type_version(self) -> str:
return "1.0"
def _config_task_agents(self):
self._add_task_agent("traicie_ko_criteria_interview_definition_task", "traicie_recruiter_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("traicie_ko_criteria_interview_definition_task", KOQuestions, "ko_questions")
def _config_state_result_relations(self):
self._add_state_result_relation("ko_questions")
def _instantiate_specialist(self):
verbose = self.tuning
ko_def_agents = [self.traicie_recruiter_agent]
ko_def_tasks = [self.traicie_ko_criteria_interview_definition_task]
self.ko_def_crew = EveAICrewAICrew(
self,
"KO Criteria Interview Definition Crew",
agents=ko_def_agents,
tasks=ko_def_tasks,
verbose=verbose,
)
self.flow = KOFlow(
self,
self.ko_def_crew
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie KO Criteria Interview Definition Specialist execution started", {})
current_app.logger.debug(f"Arguments: {arguments.model_dump()}")
current_app.logger.debug(f"Formatted Context: {formatted_context}")
current_app.logger.debug(f"Formatted History: {self._formatted_history}")
current_app.logger.debug(f"Cached Chat Session: {self._cached_session}")
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
match specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
self.log_tuning(f"Traicie KO Criteria Interview Definition Specialist execution ended",
{"Results": results.model_dump() if results else "No info"})
return results
def execute_initial_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie KO Criteria Interview Definition Specialist initial_state_execution started", {})
selection_specialist = Specialist.query.get(arguments.specialist_id)
if not selection_specialist:
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id,
"No selection specialist found")
if selection_specialist.type != "TRAICIE_SELECTION_SPECIALIST":
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id,
"Specialist is no Selection Specialist")
current_app.logger.debug(f"Specialist Competencies:\n"
f"{selection_specialist.configuration.get("competencies", [])}")
ko_competencies = []
for competency in selection_specialist.configuration.get("competencies", []):
if competency["is_knockout"] is True and competency["assess"] is True:
current_app.logger.debug(f"Assessable Knockout competency: {competency}")
ko_competencies.append({"title": competency["title"], "description": competency["description"]})
tone_of_voice = selection_specialist.configuration.get('tone_of_voice', 'Professional & Neutral')
selected_tone_of_voice = next(
(item for item in TONE_OF_VOICE if item["name"] == tone_of_voice),
None # fallback indien niet gevonden
)
current_app.logger.debug(f"Selected tone of voice: {selected_tone_of_voice}")
tone_of_voice_context = f"{selected_tone_of_voice["description"]}"
language_level = selection_specialist.configuration.get('language_level', 'Standard')
selected_language_level = next(
(item for item in LANGUAGE_LEVEL if item["name"] == language_level),
None
)
current_app.logger.debug(f"Selected language level: {selected_language_level}")
language_level_context = (f"{selected_language_level['description']}, "
f"corresponding to CEFR level {selected_language_level['cefr_level']}")
flow_inputs = {
'tone_of_voice': tone_of_voice,
'tone_of_voice_context': tone_of_voice_context,
'language_level': language_level,
'language_level_context': language_level_context,
'ko_criteria': ko_competencies,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
current_app.logger.debug(f"Flow results: {flow_results}")
current_app.logger.debug(f"Flow state: {self.flow.state}")
new_type = "TRAICIE_KO_CRITERIA_QUESTIONS"
current_app.logger.debug(f"KO Criteria Questions:\n {self.flow.state.ko_questions}")
# Controleer of we een KOQuestions object hebben of een lijst van KOQuestion objecten
if hasattr(self.flow.state.ko_questions, 'to_json'):
# Het is een KOQuestions object
json_str = self.flow.state.ko_questions.to_json()
elif isinstance(self.flow.state.ko_questions, list):
# Het is een lijst van KOQuestion objecten
# Maak een KOQuestions object en gebruik to_json daarop
ko_questions_obj = KOQuestions.from_question_list(self.flow.state.ko_questions)
json_str = ko_questions_obj.to_json()
else:
# Fallback voor het geval het een onverwacht type is
current_app.logger.warning(f"Unexpected type for ko_questions: {type(self.flow.state.ko_questions)}")
ko_questions_data = [q.model_dump() for q in self.flow.state.ko_questions]
json_str = json.dumps(ko_questions_data, ensure_ascii=False, indent=2)
current_app.logger.debug(f"KO Criteria Questions json style:\n {json_str}")
try:
asset = db.session.query(EveAIAsset).filter(
EveAIAsset.type == new_type,
EveAIAsset.type_version == "1.0.0",
EveAIAsset.configuration.is_not(None),
EveAIAsset.configuration.has_key('specialist_id'),
EveAIAsset.configuration['specialist_id'].astext.cast(db.Integer) == selection_specialist.id
).first()
except (ValueError, TypeError) as e:
current_app.logger.warning(f"Error casting specialist_id in asset configuration: {str(e)}")
asset = None
if not asset:
asset = EveAIAsset(
name=f"KO Criteria Form for specialist {selection_specialist.id}",
type=new_type,
type_version="1.0.0",
system_metadata={
"Creator Specialist Type": self.type,
"Creator Specialist Type Version": self.type_version,
"Creator Specialist ID": self.specialist_id
},
configuration={
"specialist_id": selection_specialist.id,
},
)
set_logging_information(asset, dt.now(tz=tz.utc))
asset.last_used_at = asset.created_at
else:
asset.last_used_at = dt.now(tz=tz.utc)
try:
# Stap 1: Asset aanmaken maar nog niet committen
db.session.add(asset)
db.session.flush() # Geeft ons het ID zonder te committen
# Stap 2: Upload naar MinIO (kan falen zonder database impact)
bucket_name, object_name, file_size = minio_client.upload_asset_file(
tenant_id=self.tenant_id,
asset_id=asset.id,
asset_type=new_type,
file_type="json",
file_data=json_str
)
# Stap 3: Storage metadata toevoegen
asset.bucket_name = bucket_name
asset.object_name = object_name
asset.file_size = file_size
asset.file_type = "json"
# Stap 4: Token usage toevoegen
asset.prompt_tokens = self.ko_def_crew.usage_metrics.prompt_tokens
asset.completion_tokens = self.ko_def_crew.usage_metrics.completion_tokens
# Alles in één keer committen
db.session.commit()
except Exception as e:
current_app.logger.error(f"Error creating asset: {str(e)}")
db.session.rollback()
# Probeer MinIO cleanup als upload is gelukt maar database commit faalde
try:
if 'bucket_name' in locals() and 'object_name' in locals():
minio_client.delete_object(bucket_name, object_name)
except:
pass # Log maar ga door met originele exception
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id,
f"Failed to create asset: {str(e)}")
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"asset {asset.id} created for specialist {selection_specialist.id}",
phase="finished",
asset_id=asset.id,
)
return results
class KODefInput(BaseModel):
tone_of_voice: Optional[str] = Field(None, alias="tone_of_voice")
tone_of_voice_context: Optional[str] = Field(None, alias="tone_of_voice_context")
language_level: Optional[str] = Field(None, alias="language_level")
language_level_context: Optional[str] = Field(None, alias="language_level_context")
ko_criteria: Optional[List[Dict[str, str]]] = Field(None, alias="ko_criteria")
class KODefResult(SpecialistResult):
asset_id: Optional[int] = Field(None, alias="asset_id")
class KOFlowState(EveAIFlowState):
"""Flow state for Traicie Role Definition specialist that automatically updates from task outputs"""
input: Optional[KODefInput] = None
ko_questions: Optional[List[KOQuestion]] = Field(None, alias="ko_questions")
phase: Optional[str] = Field(None, alias="phase")
class KOFlow(EveAICrewAIFlow[KOFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
ko_def_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "Traicie KO Interview Definiton Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.ko_def_crew = ko_def_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_ko_def_definition(self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("Run execute_ko_interview_definition")
crew_output = await self.ko_def_crew.kickoff_async(inputs=inputs)
# Unfortunately, crew_output will only contain the output of the latest task.
# As we will only take into account the flow state, we need to ensure both competencies and criteria
# are copies to the flow state.
update = {}
for task in self.ko_def_crew.tasks:
current_app.logger.debug(f"Task {task.name} output:\n{task.output}")
if task.name == "traicie_ko_criteria_interview_definition_task":
# update["competencies"] = task.output.pydantic.competencies
self.state.ko_questions = task.output.pydantic.ko_questions
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
self.state.phase = "personal_contact_data"
current_app.logger.debug(f"State after execute_ko_def_definition: {self.state}")
current_app.logger.debug(f"State dump after execute_ko_def_definition: {self.state.model_dump()}")
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_ko_def Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = KODefInput.model_validate(inputs)
current_app.logger.debug(f"State: {self.state}")
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -181,8 +181,8 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
answer = f"Let's start our selection process by asking you a few important questions."
if arguments.language != 'en':
TranslationServices.translate_config(ko_form, "fields", arguments.language)
TranslationServices.translate(answer, arguments.language)
TranslationServices.translate_config(self.tenant_id, ko_form, "fields", arguments.language)
TranslationServices.translate(self.tenant_id, answer, arguments.language)
results = SpecialistResult.create_for_type(self.type, self.type_version,
@@ -234,7 +234,8 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
if arguments.language != 'nl':
answer = TranslationServices.translate(answer, arguments.language)
if arguments.language != 'en':
contact_form = TranslationServices.translate_config(contact_form, "fields", arguments.language)
contact_form = TranslationServices.translate_config(self.tenant_id, contact_form, "fields",
arguments.language)
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=answer,
form_request=contact_form,

View File

@@ -0,0 +1,370 @@
import asyncio
import json
from os import wait
from typing import Optional, List, Dict, Any
from datetime import date
from time import sleep
from crewai.flow.flow import start, listen, and_
from flask import current_app
from pydantic import BaseModel, Field, EmailStr
from sqlalchemy.exc import SQLAlchemyError
from common.extensions import db
from common.models.user import Tenant
from common.models.interaction import Specialist
from common.services.utils.translation_services import TranslationServices
from eveai_chat_workers.outputs.globals.basic_types.list_item import ListItem
from eveai_chat_workers.outputs.traicie.knockout_questions.knockout_questions_v1_0 import KOQuestions, KOQuestion
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.traicie.competencies.competencies_v1_1 import Competencies
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
from common.services.interaction.specialist_services import SpecialistServices
from common.extensions import cache_manager
from eveai_chat_workers.definitions.language_level.language_level_v1_0 import LANGUAGE_LEVEL
from eveai_chat_workers.definitions.tone_of_voice.tone_of_voice_v1_0 import TONE_OF_VOICE
from common.utils.eveai_exceptions import EveAISpecialistExecutionError
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: TRAICIE_SELECTION_SPECIALIST
type_version: 1.1
Traicie Selection Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.role_definition_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
# Load the Tenant & set language
self.tenant = Tenant.query.get_or_404(tenant_id)
@property
def type(self) -> str:
return "TRAICIE_SELECTION_SPECIALIST"
@property
def type_version(self) -> str:
return "1.3"
def _config_task_agents(self):
self._add_task_agent("traicie_ko_criteria_interview_definition_task", "traicie_recruiter_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("traicie_ko_criteria_interview_definition_task", KOQuestions, "ko_questions")
def _config_state_result_relations(self):
self._add_state_result_relation("ko_criteria_questions")
self._add_state_result_relation("ko_criteria_scores")
self._add_state_result_relation("competency_questions")
self._add_state_result_relation("competency_scores")
self._add_state_result_relation("personal_contact_data")
def _instantiate_specialist(self):
verbose = self.tuning
ko_def_agents = [self.traicie_recruiter_agent]
ko_def_tasks = [self.traicie_ko_criteria_interview_definition_task]
self.ko_def_crew = EveAICrewAICrew(
self,
"KO Criteria Interview Definition Crew",
agents=ko_def_agents,
tasks=ko_def_tasks,
verbose=verbose,
)
self.flow = SelectionFlow(
self,
self.ko_def_crew
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist execution started", {})
current_app.logger.debug(f"Arguments: {arguments.model_dump()}")
current_app.logger.debug(f"Formatted Context: {formatted_context}")
current_app.logger.debug(f"Formatted History: {self._formatted_history}")
current_app.logger.debug(f"Cached Chat Session: {self._cached_session}")
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
match specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
case "ko_question_evaluation":
results = self.execute_ko_question_evaluation(arguments, formatted_context, citations)
case "personal_contact_data":
results = self.execute_personal_contact_data(arguments, formatted_context, citations)
case "no_valid_candidate":
results = self.execute_no_valid_candidate(arguments, formatted_context, citations)
case "candidate_selected":
results = self.execute_candidate_selected(arguments, formatted_context, citations)
self.log_tuning(f"Traicie Selection Specialist execution ended", {"Results": results.model_dump() if results else "No info"})
return results
def execute_initial_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist initial_state execution started", {})
current_app.logger.debug(f"Specialist Competencies:\n{self.specialist.configuration.get("competencies", [])}")
ko_competencies = []
for competency in self.specialist.configuration.get("competencies", []):
if competency["is_knockout"] is True and competency["assess"] is True:
current_app.logger.debug(f"Assessable Knockout competency: {competency}")
ko_competencies.append({"title: ": competency["title"], "description": competency["description"]})
tone_of_voice = self.specialist.configuration.get('tone_of_voice', 'Professional & Neutral')
selected_tone_of_voice = next(
(item for item in TONE_OF_VOICE if item["name"] == tone_of_voice),
None # fallback indien niet gevonden
)
current_app.logger.debug(f"Selected tone of voice: {selected_tone_of_voice}")
tone_of_voice_context = f"{selected_tone_of_voice["description"]}"
language_level = self.specialist.configuration.get('language_level', 'Standard')
selected_language_level = next(
(item for item in LANGUAGE_LEVEL if item["name"] == language_level),
None
)
current_app.logger.debug(f"Selected language level: {selected_language_level}")
language_level_context = (f"{selected_language_level['description']}, "
f"corresponding to CEFR level {selected_language_level['cefr_level']}")
flow_inputs = {
"region": arguments.region,
"working_schedule": arguments.working_schedule,
"start_date": arguments.start_date,
"language": arguments.language,
"interaction_mode": arguments.interaction_mode,
'tone_of_voice': tone_of_voice,
'tone_of_voice_context': tone_of_voice_context,
'language_level': language_level,
'language_level_context': language_level_context,
'ko_criteria': ko_competencies,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
current_app.logger.debug(f"Flow results: {flow_results}")
current_app.logger.debug(f"Flow state: {self.flow.state}")
fields = {}
for ko_question in self.flow.state.ko_criteria_questions:
fields[ko_question.title] = {
"name": ko_question.title,
"description": ko_question.title,
"context": ko_question.question,
"type": "options",
"required": True,
"allowed_values": [ko_question.answer_positive, ko_question.answer_negative]
}
ko_form = {
"type": "KO_CRITERIA_FORM",
"version": "1.0.0",
"name": "Starter Questions",
"icon": "verified",
"fields": fields,
}
answer = f"Let's start our selection process by asking you a few important questions."
if arguments.language != 'en':
TranslationServices.translate_config(self.tenant_id, ko_form, "fields", arguments.language)
TranslationServices.translate(self.tenant_id, answer, arguments.language)
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=answer,
form_request=ko_form,
phase="ko_question_evaluation")
return results
def execute_ko_question_evaluation(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist ko_question_evaluation started", {})
# Check if the form has been returned (it should)
if not arguments.form_values:
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id, "No form values returned")
current_app.logger.debug(f"Form values: {arguments.form_values}")
# Load the previous KO Questions
previous_ko_questions = self.flow.state.ko_criteria_questions
current_app.logger.debug(f"Previous KO Questions: {previous_ko_questions}")
# Evaluate KO Criteria
evaluation = "positive"
for criterium, answer in arguments.form_values.items():
for qa in previous_ko_questions:
if qa.get("title") == criterium:
if qa.get("answer_positive") != answer:
evaluation = "negative"
break
if evaluation == "negative":
break
if evaluation == "negative":
answer = (f"We hebben de antwoorden op onze eerste vragen verwerkt. Je voldoet jammer genoeg niet aan de "
f"minimale vereisten voor deze job.")
if arguments.language != 'nl':
answer = TranslationServices.translate(answer, arguments.language)
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=answer,
form_request=None,
phase="no_valid_candidate")
else:
answer = (f"We hebben de antwoorden op de KO criteria verwerkt. Je bent een geschikte kandidaat. "
f"Ben je bereid je contactgegevens door te geven, zodat we je kunnen contacteren voor een verder "
f"gesprek?")
# Check if answers to questions are positive
contact_form = cache_manager.specialist_forms_config_cache.get_config("PERSONAL_CONTACT_FORM", "1.0")
if arguments.language != 'nl':
answer = TranslationServices.translate(answer, arguments.language)
if arguments.language != 'en':
contact_form = TranslationServices.translate_config(self.tenant_id, contact_form, "fields", arguments.language)
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=answer,
form_request=contact_form,
phase="personal_contact_data")
return results
def execute_personal_contact_data(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist personal_contact_data started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben de contactgegevens verwerkt. We nemen zo snel mogelijk contact met je op.",
phase="candidate_selected")
return results
def execute_no_valid_candidate(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist no_valid_candidate started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"Je voldoet jammer genoeg niet aan de minimale vereisten voor deze job. Maar solliciteer gerust voor één van onze andere jobs.",
phase="no_valid_candidate")
def execute_candidate_selected(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist candidate_selected started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben je contactgegegevens verwerkt. We nemen zo snel mogelijk contact met je op.",
phase="candidate_selected")
return results
class SelectionInput(BaseModel):
region: str = Field(..., alias="region")
working_schedule: Optional[str] = Field(..., alias="working_schedule")
start_date: Optional[date] = Field(None, alias="vacancy_text")
language: Optional[str] = Field(None, alias="language")
interaction_mode: Optional[str] = Field(None, alias="interaction_mode")
tone_of_voice: Optional[str] = Field(None, alias="tone_of_voice")
tone_of_voice_context: Optional[str] = Field(None, alias="tone_of_voice_context")
language_level: Optional[str] = Field(None, alias="language_level")
language_level_context: Optional[str] = Field(None, alias="language_level_context")
ko_criteria: Optional[List[Dict[str, str]]] = Field(None, alias="ko_criteria")
question: Optional[str] = Field(None, alias="question")
field_values: Optional[Dict[str, Any]] = Field(None, alias="field_values")
class SelectionKOCriteriumScore(BaseModel):
criterium: Optional[str] = Field(None, alias="criterium")
answer: Optional[str] = Field(None, alias="answer")
score: Optional[int] = Field(None, alias="score")
class SelectionCompetencyScore(BaseModel):
competency: Optional[str] = Field(None, alias="competency")
answer: Optional[str] = Field(None, alias="answer")
score: Optional[int] = Field(None, alias="score")
class PersonalContactData(BaseModel):
name: str = Field(..., description="Your name", alias="name")
email: EmailStr = Field(..., description="Your Name", alias="email")
phone: str = Field(..., description="Your Phone Number", alias="phone")
address: Optional[str] = Field(None, description="Your Address", alias="address")
zip: Optional[str] = Field(None, description="Postal Code", alias="zip")
city: Optional[str] = Field(None, description="City", alias="city")
country: Optional[str] = Field(None, description="Country", alias="country")
consent: bool = Field(..., description="Consent", alias="consent")
class SelectionResult(SpecialistResult):
ko_criteria_questions: Optional[List[ListItem]] = Field(None, alias="ko_criteria_questions")
ko_criteria_scores: Optional[List[SelectionKOCriteriumScore]] = Field(None, alias="ko_criteria_scores")
competency_questions: Optional[List[ListItem]] = Field(None, alias="competency_questions")
competency_scores: Optional[List[SelectionCompetencyScore]] = Field(None, alias="competency_scores")
personal_contact_data: Optional[PersonalContactData] = Field(None, alias="personal_contact_data")
class SelectionFlowState(EveAIFlowState):
"""Flow state for Traicie Role Definition specialist that automatically updates from task outputs"""
input: Optional[SelectionInput] = None
ko_criteria_questions: Optional[List[KOQuestion]] = Field(None, alias="ko_criteria_questions")
ko_criteria_scores: Optional[List[SelectionKOCriteriumScore]] = Field(None, alias="ko_criteria_scores")
competency_questions: Optional[List[ListItem]] = Field(None, alias="competency_questions")
competency_scores: Optional[List[SelectionCompetencyScore]] = Field(None, alias="competency_scores")
personal_contact_data: Optional[PersonalContactData] = Field(None, alias="personal_contact_data")
phase: Optional[str] = Field(None, alias="phase")
interaction_mode: Optional[str] = Field(None, alias="mode")
class SelectionFlow(EveAICrewAIFlow[SelectionFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
ko_def_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "Traicie Role Definition Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.ko_def_crew = ko_def_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_ko_def_definition(self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("execute_ko_interview_definition")
crew_output = await self.ko_def_crew.kickoff_async(inputs=inputs)
# Unfortunately, crew_output will only contain the output of the latest task.
# As we will only take into account the flow state, we need to ensure both competencies and criteria
# are copies to the flow state.
update = {}
for task in self.ko_def_crew.tasks:
current_app.logger.debug(f"Task {task.name} output:\n{task.output}")
if task.name == "traicie_ko_criteria_interview_definition_task":
# update["competencies"] = task.output.pydantic.competencies
self.state.ko_criteria_questions = task.output.pydantic.ko_questions
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
self.state.phase = "personal_contact_data"
current_app.logger.debug(f"State after execute_ko_def_definition: {self.state}")
current_app.logger.debug(f"State dump after execute_ko_def_definition: {self.state.model_dump()}")
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_ko_def Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = SelectionInput.model_validate(inputs)
current_app.logger.debug(f"State: {self.state}")
result = await super().kickoff_async(inputs)
return self.state