- Full implementation of Traicie Selection Specialist - VA version

- Improvements to CrewAI specialists and Specialists in general
- Addition of reusable components to check or get answers to questions from the full Human Message - HumanAnswerServices
This commit is contained in:
Josako
2025-07-06 20:01:30 +02:00
parent 50773fe602
commit 33b5742d2f
20 changed files with 609 additions and 281 deletions

View File

@@ -1,65 +0,0 @@
from flask import current_app, session
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from common.utils.business_event import BusinessEvent
from common.utils.business_event_context import current_event
from common.utils.model_utils import get_template
from eveai_chat_workers.outputs.globals.q_a_output.q_a_output_v1_0 import QAOutput
class AnswerCheckServices:
@staticmethod
def check_affirmative_answer(question: str, answer: str, language_iso: str) -> bool:
return AnswerCheckServices._check_answer(question, answer, language_iso, "check_affirmative_answer",
"Check Affirmative Answer")
@staticmethod
def check_additional_information(question: str, answer: str, language_iso: str) -> bool:
return AnswerCheckServices._check_answer(question, answer, language_iso, "check_additional_information",
"Check Additional Information")
@staticmethod
def _check_answer(question: str, answer: str, language_iso: str, template_name: str, span_name: str) -> bool:
if language_iso.strip() == '':
raise ValueError("Language cannot be empty")
language = current_app.config.get('SUPPORTED_LANGUAGE_ISO639_1_LOOKUP').get(language_iso)
if language is None:
raise ValueError(f"Unsupported language: {language_iso}")
if question.strip() == '':
raise ValueError("Question cannot be empty")
if answer.strip() == '':
raise ValueError("Answer cannot be empty")
tenant_id = session.get('tenant').get('id')
if not current_event:
with BusinessEvent('Answer Check Service', tenant_id):
with current_event.create_span(span_name):
return AnswerCheckServices._check_answer_logic(question, answer, language, template_name)
else:
with current_event.create_span('Check Affirmative Answer'):
return AnswerCheckServices._check_answer_logic(question, answer, language, template_name)
@staticmethod
def _check_answer_logic(question: str, answer: str, language: str, template_name: str) -> bool:
prompt_params = {
'question': question,
'answer': answer,
'language': language,
}
template, llm = get_template(template_name)
check_answer_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_schema = QAOutput
structured_llm = llm.with_structured_output(output_schema)
chain = (setup | check_answer_prompt | structured_llm )
raw_answer = chain.invoke(prompt_params)
current_app.logger.debug(f"Raw answer: {raw_answer}")
return raw_answer.answer

View File

@@ -0,0 +1,108 @@
from flask import current_app, session
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from common.utils.business_event import BusinessEvent
from common.utils.business_event_context import current_event
from common.utils.model_utils import get_template
from eveai_chat_workers.outputs.globals.a2q_output.q_a_output_v1_0 import A2QOutput
from eveai_chat_workers.outputs.globals.q_a_output.q_a_output_v1_0 import QAOutput
class HumanAnswerServices:
@staticmethod
def check_affirmative_answer(tenant_id: int, question: str, answer: str, language_iso: str) -> bool:
return HumanAnswerServices._check_answer(tenant_id, question, answer, language_iso, "check_affirmative_answer",
"Check Affirmative Answer")
@staticmethod
def check_additional_information(tenant_id: int, question: str, answer: str, language_iso: str) -> bool:
return HumanAnswerServices._check_answer(tenant_id, question, answer, language_iso,
"check_additional_information", "Check Additional Information")
@staticmethod
def get_answer_to_question(tenant_id: int, question: str, answer: str, language_iso: str) -> str:
language = HumanAnswerServices._process_arguments(question, answer, language_iso)
span_name = "Get Answer To Question"
template_name = "get_answer_to_question"
if not current_event:
with BusinessEvent('Answer Check Service', tenant_id):
with current_event.create_span(span_name):
return HumanAnswerServices._get_answer_to_question_logic(question, answer, language, template_name)
else:
with current_event.create_span('Check Affirmative Answer'):
return HumanAnswerServices._get_answer_to_question_logic(question, answer, language, template_name)
@staticmethod
def _check_answer(tenant_id: int, question: str, answer: str, language_iso: str, template_name: str,
span_name: str) -> bool:
language = HumanAnswerServices._process_arguments(question, answer, language_iso)
if not current_event:
with BusinessEvent('Answer Check Service', tenant_id):
with current_event.create_span(span_name):
return HumanAnswerServices._check_answer_logic(question, answer, language, template_name)
else:
with current_event.create_span(span_name):
return HumanAnswerServices._check_answer_logic(question, answer, language, template_name)
@staticmethod
def _check_answer_logic(question: str, answer: str, language: str, template_name: str) -> bool:
prompt_params = {
'question': question,
'answer': answer,
'language': language,
}
template, llm = get_template(template_name)
check_answer_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_schema = QAOutput
structured_llm = llm.with_structured_output(output_schema)
chain = (setup | check_answer_prompt | structured_llm )
raw_answer = chain.invoke(prompt_params)
current_app.logger.debug(f"Raw answer: {raw_answer}")
return raw_answer.answer
@staticmethod
def _get_answer_to_question_logic(question: str, answer: str, language: str, template_name: str) \
-> str:
prompt_params = {
'question': question,
'answer': answer,
'language': language,
}
template, llm = get_template(template_name)
check_answer_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_schema = A2QOutput
structured_llm = llm.with_structured_output(output_schema)
chain = (setup | check_answer_prompt | structured_llm)
raw_answer = chain.invoke(prompt_params)
current_app.logger.debug(f"Raw answer: {raw_answer}")
return raw_answer.answer
@staticmethod
def _process_arguments(question, answer, language_iso: str) -> str:
if language_iso.strip() == '':
raise ValueError("Language cannot be empty")
language = current_app.config.get('SUPPORTED_LANGUAGE_ISO639_1_LOOKUP').get(language_iso)
if language is None:
raise ValueError(f"Unsupported language: {language_iso}")
if question.strip() == '':
raise ValueError("Question cannot be empty")
if answer.strip() == '':
raise ValueError("Answer cannot be empty")
return language

View File

@@ -0,0 +1,22 @@
version: "1.0.0"
name: "Rag Agent"
role: >
{tenant_name} Spokesperson. {custom_role}
goal: >
You get questions by a human correspondent, and give answers based on a given context, taking into account the history
of the current conversation. {custom_goal}
backstory: >
You are the primary contact for {tenant_name}. You are known by {name}, and can be addressed by this name, or you. You are
a very good communicator, and adapt to the style used by the human asking for information (e.g. formal or informal).
You always stay correct and polite, whatever happens. And you ensure no discriminating language is used.
You are perfectly multilingual in all known languages, and do your best to answer questions in {language}, whatever
language the context provided to you is in. You are participating in a conversation, not writing e.g. an email. Do not
include a salutation or closing greeting in your answer.
{custom_backstory}
full_model_name: "mistral.mistral-small-latest"
temperature: 0.3
metadata:
author: "Josako"
date_added: "2025-01-08"
description: "An Agent that does RAG based on a user's question, RAG content & history"
changes: "Initial version"

View File

@@ -0,0 +1,25 @@
version: "1.0.1"
name: "Traicie Recruiter"
role: >
You are an Expert Recruiter working for {tenant_name}, known as {name}. You can be addressed as {name}
{custom_role}
goal: >
As an expert recruiter, you identify, attract, and secure top talent by building genuine relationships, deeply
understanding business needs, and ensuring optimal alignment between candidate potential and organizational goals
, while championing diversity, culture fit, and long-term retention.
{custom_goal}
backstory: >
You started your career in a high-pressure agency setting, where you quickly learned the art of fast-paced hiring and
relationship building. Over the years, you moved in-house, partnering closely with business leaders to shape
recruitment strategies that go beyond filling roles—you focus on finding the right people to drive growth and culture.
With a strong grasp of both tech and non-tech profiles, youve adapted to changing trends, from remote work to
AI-driven sourcing. Youre more than a recruiter—youre a trusted advisor, a brand ambassador, and a connector of
people and purpose.
{custom_backstory}
full_model_name: "mistral.magistral-medium-latest"
temperature: 0.3
metadata:
author: "Josako"
date_added: "2025-07-03"
description: "Traicie Recruiter Agent"
changes: "Ensure recruiter can be addressed by a name"

View File

@@ -1,9 +1,13 @@
version: "1.0.0"
content: >
Check if additional information or questions are available in the answer (answer in {language}), additional to the
following question:
Check if additional information or questions are available in the following answer (answer in between triple
backquotes):
```{answer}```
in addition to answers to the following question (in between triple backquotes):
"{question}"
```{question}```
Answer with True or False, without additional information.
llm_model: "mistral.mistral-medium-latest"

View File

@@ -1,8 +1,12 @@
version: "1.0.0"
content: >
Determine if there is an affirmative answer on the following question in the provided answer (answer in {language}):
Determine if there is an affirmative answer on the following question (in between triple backquotes):
{question}
```{question}```
in the provided answer (in between triple backquotes):
```{answer}```
Answer with True or False, without additional information.
llm_model: "mistral.mistral-medium-latest"

View File

@@ -0,0 +1,16 @@
version: "1.0.0"
content: >
Provide us with the answer to the following question (in between triple backquotes) from the text provided to you:
```{question}````
Reply in exact wordings and in the same language. If no answer can be found, reply with "No answer provided"
Text provided to you:
```{answer}```
llm_model: "mistral.mistral-medium-latest"
metadata:
author: "Josako"
date_added: "2025-06-23"
description: "An assistant to check if the answer to a question is affirmative."
changes: "Initial version"

View File

@@ -14,7 +14,7 @@ content: >
I only want you to return the translation. No explanation, no options. I need to be able to directly use your answer
without further interpretation. If more than one option is available, present me with the most probable one.
llm_model: "mistral.ministral-8b-latest"
llm_model: "mistral.mistral-medium-latest"
metadata:
author: "Josako"
date_added: "2025-06-23"

View File

@@ -11,7 +11,7 @@ content: >
I only want you to return the translation. No explanation, no options. I need to be able to directly use your answer
without further interpretation. If more than one option is available, present me with the most probable one.
llm_model: "mistral.ministral-8b-latest"
llm_model: "mistral.mistral-medium-latest"
metadata:
author: "Josako"
date_added: "2025-06-23"

View File

@@ -45,11 +45,6 @@ configuration:
description: "Introductory text given by the specialist - but translated according to Tone of Voice, Language Level and Starting Language"
type: "text"
required: false
closing_message:
name: "Closing Message"
description: "Closing message given by the specialist - but translated according to Tone of Voice, Language Level and Starting Language"
type: "text"
required: false
competency_details:
title:
name: "Title"
@@ -98,8 +93,8 @@ arguments:
name: "Interaction Mode"
type: "enum"
description: "The interaction mode the specialist will start working in."
allowed_values: ["Job Application", "Seduction"]
default: "Job Application"
allowed_values: ["orientation", "seduction"]
default: "orientation"
required: true
results:
competencies:
@@ -108,17 +103,13 @@ results:
description: "List of vacancy competencies and their descriptions"
required: false
agents:
- type: "TRAICIE_RECRUITER_AGENT"
version: "1.0"
- type: "RAG_AGENT"
version: "1.0"
version: "1.1"
tasks:
- type: "TRAICIE_KO_CRITERIA_INTERVIEW_DEFINITION_TASK"
version: "1.0"
- type: "RAG_TASK"
version: "1.0"
version: "1.1"
metadata:
author: "Josako"
date_added: "2025-06-30"
changes: "Add 'RAG' to the selection specialist"
date_added: "2025-07-03"
changes: "Update for a Full Virtual Assistant Experience"
description: "Assistant to assist in candidate selection"

View File

@@ -0,0 +1,23 @@
version: "1.0.0"
name: "RAG Task"
task_description: >
Answer the question based on the following context, and taking into account the history of the discussion. Try not to
repeat answers already given in the recent history, unless confirmation is required or repetition is essential to
give a coherent answer.
{custom_description}
Use the following {language} in your communication, and cite the sources used at the end of the full conversation.
If the question cannot be answered using the given context, answer "I have insufficient information to answer this
question."
Context (in between triple backquotes):
```{context}```
History (in between triple backquotes):
```{history}```
Question (in between triple backquotes):
```{question}```
expected_output: >
metadata:
author: "Josako"
date_added: "2025-01-08"
description: "A Task that gives RAG-based answers"
changes: "Initial version"

View File

@@ -38,7 +38,7 @@
</div>
</div>
<div class="interaction-question">
{{ specialist_results.detailed_query if specialist_results and specialist_results.detailed_query else specialist_arguments.query }}
{{ specialist_results.detailed_question if specialist_results and specialist_results.detailed_question else specialist_arguments.question }}
</div>
</div>
</button>

View File

@@ -0,0 +1,7 @@
from typing import List, Optional
from pydantic import BaseModel, Field
class A2QOutput(BaseModel):
answer: str = Field(None, description="Answer to the question asked")

View File

@@ -136,4 +136,9 @@ class EveAICrewAIFlow(Flow):
class EveAIFlowState(BaseModel):
"""Base class for all EveAI flow states"""
pass
answer: Optional[str] = None
detailed_question: Optional[str] = None
question: Optional[str] = None
phase: Optional[str] = None
form_request: Optional[Dict[str, Any]] = None
citations: Optional[Dict[str, Any]] = None

View File

@@ -78,8 +78,8 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
return "\n\n".join([
"\n\n".join([
f"HUMAN:\n"
f"{interaction.specialist_results['detailed_query']}"
if interaction.specialist_results.get('detailed_query') else "",
f"{interaction.specialist_results['detailed_question']}"
if interaction.specialist_results.get('detailed_question') else "",
f"{interaction.specialist_arguments.get('form_values')}"
if interaction.specialist_arguments.get('form_values') else "",
f"AI:\n{interaction.specialist_results['answer']}"
@@ -119,6 +119,11 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
result_name = state_name
self._state_result_relations[state_name] = result_name
def _config_default_state_result_relations(self):
for default_attribute_name in ['answer', 'detailed_question', 'form_request', 'phase', 'citations']:
self._add_state_result_relation(default_attribute_name)
@abstractmethod
def _config_state_result_relations(self):
"""Configure the state-result relations by adding state-result combinations. Use _add_state_result_relation()"""
@@ -278,14 +283,15 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
"all arguments": arguments.model_dump(),
})
original_query = arguments.query
detailed_query = self._detail_question(arguments.language, original_query)
original_question = arguments.question
detailed_question = self._detail_question(arguments.language, original_question)
modified_arguments = arguments.model_copy(update={
"query": detailed_query,
"original_query": original_query
"query": detailed_question,
"original_query": original_question
})
# Get retriever-specific arguments
retriever_arguments = modified_arguments.retriever_arguments
@@ -350,10 +356,16 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
def _update_specialist_results(self, specialist_results: SpecialistResult) -> SpecialistResult:
"""Update the specialist results with the latest state information"""
# Ensure default state result elements are passed
self._config_default_state_result_relations()
update_data = {}
state_dict = self.flow.state.model_dump()
current_app.logger.debug(f"Updating specialist results with state: {state_dict}")
for state_name, result_name in self._state_result_relations.items():
current_app.logger.debug(f"Try Updating {result_name} with {state_name}")
if state_name in state_dict and state_dict[state_name] is not None:
current_app.logger.debug(f"Updating {result_name} with {state_name} = {state_dict[state_name]}")
update_data[result_name] = state_dict[state_name]
return specialist_results.model_copy(update=update_data)
@@ -369,6 +381,13 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
if result_name in last_interaction.specialist_results:
setattr(self.flow.state, state_name, last_interaction.specialist_results[result_name])
# Initialize the standard state values
self.flow.state.answer = None
self.flow.state.detailed_question = None
self.flow.state.form_request = None
self.flow.state.phase = None
self.flow.state.citations = []
@abstractmethod
def execute(self, arguments: SpecialistArguments, formatted_context: str, citations: List[int]) -> SpecialistResult:
raise NotImplementedError
@@ -378,21 +397,21 @@ class CrewAIBaseSpecialistExecutor(BaseSpecialistExecutor):
if self.retrievers:
# Detail the incoming query
if self._cached_session.interactions:
query = arguments.query
question = arguments.question
language = arguments.language
detailed_query = self._detail_question(language, query)
detailed_question = self._detail_question(language, question)
else:
detailed_query = arguments.query
detailed_question = arguments.question
modified_arguments = {
"query": detailed_query,
"original_query": arguments.query
"question": detailed_question,
"original_question": arguments.question
}
detailed_arguments = arguments.model_copy(update=modified_arguments)
formatted_context, citations = self._retrieve_context(detailed_arguments)
result = self.execute(detailed_arguments, formatted_context, citations)
modified_result = {
"detailed_query": detailed_query,
"detailed_question": detailed_question,
"citations": citations,
}
intermediate_result = result.model_copy(update=modified_result)

View File

@@ -209,7 +209,7 @@ class SpecialistExecutor(BaseSpecialistExecutor):
result = SpecialistResult.create_for_type(
self.type,
self.type_version,
detailed_query=detailed_question,
detailed_question=detailed_question,
answer=raw_result.answer,
citations=[ctx.metadata.document_id for ctx in unique_contexts
if ctx.id in raw_result.citations],

View File

@@ -103,7 +103,7 @@ class SpecialistResult(BaseModel):
# Structural optional fields available for all specialists
answer: Optional[str] = Field(None, description="Optional textual answer from the specialist")
detailed_query: Optional[str] = Field(None, description="Optional detailed query for the specialist")
detailed_question: Optional[str] = Field(None, description="Optional detailed question for the specialist")
form_request: Optional[Dict[str, Any]] = Field(None, description="Optional form definition to request user input")
phase: Optional[str] = Field(None, description="Phase of the specialist's workflow")
citations: Optional[Dict[str, Any]] = Field(None, description="Citations for the specialist's answer")

View File

@@ -131,6 +131,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
f"corresponding to CEFR level {selected_language_level['cefr_level']}")
flow_inputs = {
'name': "Evie",
'tone_of_voice': tone_of_voice,
'tone_of_voice_context': tone_of_voice_context,
'language_level': language_level,
@@ -243,6 +244,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
class KODefInput(BaseModel):
name: Optional[str] = Field(None, alias="name")
tone_of_voice: Optional[str] = Field(None, alias="tone_of_voice")
tone_of_voice_context: Optional[str] = Field(None, alias="tone_of_voice_context")
language_level: Optional[str] = Field(None, alias="language_level")

View File

@@ -19,7 +19,7 @@ from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew,
from common.services.interaction.specialist_services import SpecialistServices
NEW_SPECIALIST_TYPE = "TRAICIE_SELECTION_SPECIALIST"
NEW_SPECIALIST_TYPE_VERSION = "1.3"
NEW_SPECIALIST_TYPE_VERSION = "1.4"
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):

View File

@@ -1,29 +1,50 @@
import asyncio
import json
from os import wait
from typing import Optional, List, Dict, Any
from datetime import date
from time import sleep
from crewai.flow.flow import start, listen, and_
from typing import Optional, List, Dict, Any
from crewai.flow.flow import start, listen
from flask import current_app
from pydantic import BaseModel, Field, EmailStr
from sqlalchemy.exc import SQLAlchemyError
from common.extensions import db
from common.extensions import cache_manager, db, minio_client
from common.models.interaction import EveAIAsset
from common.models.user import Tenant
from common.models.interaction import Specialist
from common.services.utils.human_answer_services import HumanAnswerServices
from common.services.utils.translation_services import TranslationServices
from eveai_chat_workers.outputs.globals.basic_types.list_item import ListItem
from eveai_chat_workers.outputs.traicie.knockout_questions.knockout_questions_v1_0 import KOQuestions, KOQuestion
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.traicie.competencies.competencies_v1_1 import Competencies
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
from common.services.interaction.specialist_services import SpecialistServices
from common.extensions import cache_manager
from common.utils.eveai_exceptions import EveAISpecialistExecutionError
from eveai_chat_workers.definitions.language_level.language_level_v1_0 import LANGUAGE_LEVEL
from eveai_chat_workers.definitions.tone_of_voice.tone_of_voice_v1_0 import TONE_OF_VOICE
from common.utils.eveai_exceptions import EveAISpecialistExecutionError
from eveai_chat_workers.outputs.globals.basic_types.list_item import ListItem
from eveai_chat_workers.outputs.globals.rag.rag_v1_0 import RAGOutput
from eveai_chat_workers.outputs.traicie.knockout_questions.knockout_questions_v1_0 import KOQuestion, KOQuestions
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
INITIALISATION_MESSAGE = "Let's start the selection process by asking you a few important questions."
START_SELECTION_QUESTION = "Do you want to start the selection procedure?"
INSUFFICIENT_INFORMATION_MESSAGE = (
"We do not have the necessary information to provide you with the requested answers. "
"Please accept our apologies. You can ask other questions or proceed with the "
"selection process.")
KO_CRITERIA_NOT_MET_MESSAGE = ("Thank you for answering our questions! We processed your answers. Unfortunately, you do"
"not comply with the minimum requirements for this job. Therefor, we stop this"
"selection procedure")
KO_CRITERIA_MET_MESSAGE = "We processed your answers with a positive result."
RQC_MESSAGE = "You are well suited for this job."
CONTACT_DATA_QUESTION = ("Are you willing to provide us with your contact data, so we can contact you to continue "
"the selection process?")
NO_CONTACT_DATA_QUESTION = ("We are sorry to hear that. The only way to proceed with the selection process is "
"to provide us with your contact data. Do you want to provide us with your contact data?"
"if not, we thank you, and we'll end the selection process.")
CONTACT_DATA_PROCESSED_MESSAGE = "We successfully processed your contact data."
CONTACT_TIME_QUESTION = "When do you prefer us to contact you? Provide us with some preferred weekdays and times!"
NO_CONTACT_TIME_MESSAGE = ("We could not process your preferred contact time. Can you please provide us with your "
"preferred contact time?")
CONTACT_TIME_PROCESSED_MESSAGE = ("We successfully processed your preferred contact time. We will contact you as soon "
"as possible.")
NO_FURTHER_QUESTIONS_MESSAGE = "We do not process further questions."
SUCCESSFUL_ENDING_MESSAGE = "Thank you for your application. We will contact you as soon as possible!"
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
@@ -47,37 +68,39 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
@property
def type_version(self) -> str:
return "1.3"
return "1.4"
def _config_task_agents(self):
self._add_task_agent("traicie_ko_criteria_interview_definition_task", "traicie_recruiter_agent")
self._add_task_agent("rag_task", "rag_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("traicie_ko_criteria_interview_definition_task", KOQuestions, "ko_questions")
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
def _config_state_result_relations(self):
self._add_state_result_relation("rag_output")
self._add_state_result_relation("ko_criteria_questions")
self._add_state_result_relation("ko_criteria_scores")
self._add_state_result_relation("ko_criteria_answers")
self._add_state_result_relation("competency_questions")
self._add_state_result_relation("competency_scores")
self._add_state_result_relation("personal_contact_data")
self._add_state_result_relation("contact_time")
def _instantiate_specialist(self):
verbose = self.tuning
ko_def_agents = [self.traicie_recruiter_agent]
ko_def_tasks = [self.traicie_ko_criteria_interview_definition_task]
self.ko_def_crew = EveAICrewAICrew(
rag_agents = [self.rag_agent]
rag_tasks = [self.rag_task]
self.rag_crew = EveAICrewAICrew(
self,
"KO Criteria Interview Definition Crew",
agents=ko_def_agents,
tasks=ko_def_tasks,
"Rag Crew",
agents=rag_agents,
tasks=rag_tasks,
verbose=verbose,
)
self.flow = SelectionFlow(
self,
self.ko_def_crew
self.rag_crew,
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
@@ -94,73 +117,62 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
current_app.logger.debug(f"Specialist phase: {specialist_phase}")
match specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
case "start_selection_procedure":
results = self.execute_start_selection_procedure_state(arguments, formatted_context, citations)
case "rag":
results = self.execute_rag_state(arguments, formatted_context, citations)
case "ko_question_evaluation":
results = self.execute_ko_question_evaluation(arguments, formatted_context, citations)
case "personal_contact_data":
results = self.execute_personal_contact_data(arguments, formatted_context, citations)
case "personal_contact_data_preparation":
results = self.execute_personal_contact_data_preparation(arguments, formatted_context, citations)
case "personal_contact_data_processing":
results = self.execute_personal_contact_data_processing(arguments, formatted_context, citations)
case "contact_time_evaluation":
results = self.execute_contact_time_evaluation_state(arguments, formatted_context, citations)
case "no_valid_candidate":
results = self.execute_no_valid_candidate(arguments, formatted_context, citations)
results = self.execute_no_valid_candidate_state(arguments, formatted_context, citations)
case "candidate_selected":
results = self.execute_candidate_selected(arguments, formatted_context, citations)
results = self.execute_candidate_selected_state(arguments, formatted_context, citations)
self.log_tuning(f"Traicie Selection Specialist execution ended", {"Results": results.model_dump() if results else "No info"})
self.log_tuning(f"Traicie Selection Specialist execution ended",
{"Results": results.model_dump() if results else "No info"})
return results
def execute_initial_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist initial_state execution started", {})
current_app.logger.debug(f"Specialist Competencies:\n{self.specialist.configuration.get("competencies", [])}")
interaction_mode = arguments.interaction_mode
if not interaction_mode:
interaction_mode = "selection"
current_app.logger.debug(f"Interaction mode: {interaction_mode}")
ko_competencies = []
for competency in self.specialist.configuration.get("competencies", []):
if competency["is_knockout"] is True and competency["assess"] is True:
current_app.logger.debug(f"Assessable Knockout competency: {competency}")
ko_competencies.append({"title: ": competency["title"], "description": competency["description"]})
welcome_message = self.specialist.configuration.get("welcome_message", "Welcome to our selection process.")
welcome_message = TranslationServices.translate(self.tenant_id, welcome_message, arguments.language)
tone_of_voice = self.specialist.configuration.get('tone_of_voice', 'Professional & Neutral')
selected_tone_of_voice = next(
(item for item in TONE_OF_VOICE if item["name"] == tone_of_voice),
None # fallback indien niet gevonden
)
current_app.logger.debug(f"Selected tone of voice: {selected_tone_of_voice}")
tone_of_voice_context = f"{selected_tone_of_voice["description"]}"
if interaction_mode == "selection":
return self.execute_start_selection_procedure_state(arguments, formatted_context, citations,
welcome_message)
else: # We are in orientation mode, so we perform standard rag
return self.execute_rag_state(arguments, formatted_context, citations, welcome_message)
language_level = self.specialist.configuration.get('language_level', 'Standard')
selected_language_level = next(
(item for item in LANGUAGE_LEVEL if item["name"] == language_level),
None
)
current_app.logger.debug(f"Selected language level: {selected_language_level}")
language_level_context = (f"{selected_language_level['description']}, "
f"corresponding to CEFR level {selected_language_level['cefr_level']}")
def execute_start_selection_procedure_state(self, arguments: SpecialistArguments, formatted_context, citations,
start_message=None) -> SpecialistResult:
flow_inputs = {
"region": arguments.region,
"working_schedule": arguments.working_schedule,
"start_date": arguments.start_date,
"language": arguments.language,
"interaction_mode": arguments.interaction_mode,
'tone_of_voice': tone_of_voice,
'tone_of_voice_context': tone_of_voice_context,
'language_level': language_level,
'language_level_context': language_level_context,
'ko_criteria': ko_competencies,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
current_app.logger.debug(f"Flow results: {flow_results}")
current_app.logger.debug(f"Flow state: {self.flow.state}")
answer = ""
if start_message:
initialisation_message = TranslationServices.translate(self.tenant_id, INITIALISATION_MESSAGE,
arguments.language)
answer = f"{start_message}\n\n{initialisation_message}"
ko_questions = self._get_ko_questions()
fields = {}
for ko_question in self.flow.state.ko_criteria_questions:
for ko_question in ko_questions.ko_questions:
current_app.logger.debug(f"KO Question: {ko_question}")
fields[ko_question.title] = {
"name": ko_question.title,
"description": ko_question.title,
@@ -178,105 +190,259 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"fields": fields,
}
answer = f"Let's start our selection process by asking you a few important questions."
rag_answer = self._check_and_execute_rag(arguments, formatted_context, citations)
if rag_answer:
if answer:
answer = f"{answer}\n\n{rag_answer.answer}"
else:
answer = rag_answer.answer
if arguments.language != 'en':
TranslationServices.translate_config(self.tenant_id, ko_form, "fields", arguments.language)
TranslationServices.translate(self.tenant_id, answer, arguments.language)
self.flow.state.answer = answer
self.flow.state.phase = "ko_question_evaluation"
self.flow.state.form_request = ko_form
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=answer,
form_request=ko_form,
phase="ko_question_evaluation")
results = SelectionResult.create_for_type(self.type, self.type_version)
return results
def execute_ko_question_evaluation(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
def execute_ko_question_evaluation(self, arguments: SpecialistArguments, formatted_context, citations) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist ko_question_evaluation started", {})
# Check if the form has been returned (it should)
if not arguments.form_values:
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id, "No form values returned")
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id,
"No form values returned")
current_app.logger.debug(f"Form values: {arguments.form_values}")
# Load the previous KO Questions
previous_ko_questions = self.flow.state.ko_criteria_questions
previous_ko_questions = self._get_ko_questions().ko_questions
current_app.logger.debug(f"Previous KO Questions: {previous_ko_questions}")
# Evaluate KO Criteria
evaluation = "positive"
for criterium, answer in arguments.form_values.items():
for qa in previous_ko_questions:
if qa.get("title") == criterium:
if qa.get("answer_positive") != answer:
if qa.title == criterium:
if qa.answer_positive != answer:
evaluation = "negative"
break
if evaluation == "negative":
break
self.flow.state.ko_criteria_answers = arguments.form_values
if evaluation == "negative":
answer = (f"We hebben de antwoorden op onze eerste vragen verwerkt. Je voldoet jammer genoeg niet aan de "
f"minimale vereisten voor deze job.")
if arguments.language != 'nl':
answer = TranslationServices.translate(answer, arguments.language)
answer = TranslationServices.translate(self.tenant_id, KO_CRITERIA_NOT_MET_MESSAGE, arguments.language)
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=answer,
form_request=None,
phase="no_valid_candidate")
self.flow.state.answer = answer
self.flow.state.phase = "no_valid_candidate"
results = SelectionResult.create_for_type(self.type, self.type_version)
else:
answer = (f"We hebben de antwoorden op de KO criteria verwerkt. Je bent een geschikte kandidaat. "
f"Ben je bereid je contactgegevens door te geven, zodat we je kunnen contacteren voor een verder "
f"gesprek?")
# Check if answers to questions are positive
answer = TranslationServices.translate(self.tenant_id, KO_CRITERIA_MET_MESSAGE, arguments.language)
rag_output = self._check_and_execute_rag(arguments, formatted_context, citations)
if rag_output:
answer = f"{answer}\n\n{rag_output.answer}"
answer = (f"{answer}\n\n"
f"{TranslationServices.translate(self.tenant_id, RQC_MESSAGE, arguments.language)} "
f"{TranslationServices.translate(self.tenant_id, CONTACT_DATA_QUESTION, arguments.language)}")
self.flow.state.answer = answer
self.flow.state.phase = "personal_contact_data_preparation"
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_personal_contact_data_preparation(self, arguments: SpecialistArguments, formatted_context, citations) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist personal_contact_data_preparation started", {})
if HumanAnswerServices.check_affirmative_answer(self.tenant_id, CONTACT_DATA_QUESTION,
arguments.question, arguments.language):
contact_form = cache_manager.specialist_forms_config_cache.get_config("PERSONAL_CONTACT_FORM", "1.0")
if arguments.language != 'nl':
answer = TranslationServices.translate(answer, arguments.language)
if arguments.language != 'en':
contact_form = TranslationServices.translate_config(self.tenant_id, contact_form, "fields", arguments.language)
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=answer,
form_request=contact_form,
phase="personal_contact_data")
contact_form = TranslationServices.translate_config(self.tenant_id, contact_form, "fields",
arguments.language)
rag_output = self._check_and_execute_rag(arguments, formatted_context, citations)
if rag_output:
answer = f"{rag_output.answer}"
else:
answer = ""
self.flow.state.answer = answer
self.flow.state.form_request = contact_form
self.flow.state.phase = "personal_contact_data_processing"
results = SelectionResult.create_for_type(self.type, self.type_version,)
else:
answer = TranslationServices.translate(self.tenant_id, NO_CONTACT_DATA_QUESTION, arguments.language)
self.flow.state.answer = answer
self.flow.state.phase = "personal_contact_data_preparation"
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_personal_contact_data(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist personal_contact_data started", {})
def execute_personal_contact_data_processing(self, arguments: SpecialistArguments, formatted_context, citations) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist personal_contact_data_processing started", {})
answer = (
f"{TranslationServices.translate(self.tenant_id, CONTACT_DATA_PROCESSED_MESSAGE, arguments.language)}\n"
f"{TranslationServices.translate(self.tenant_id, CONTACT_TIME_QUESTION, arguments.language)}")
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben de contactgegevens verwerkt. We nemen zo snel mogelijk contact met je op.",
phase="candidate_selected")
rag_output = self._check_and_execute_rag(arguments, formatted_context, citations)
if rag_output:
answer = f"{answer}\n\n{rag_output.answer}"
self.flow.state.answer = answer
self.flow.state.phase = "contact_time_evaluation"
self.flow.state.personal_contact_data = arguments.form_values
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_no_valid_candidate(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
def execute_contact_time_evaluation_state(self, arguments: SpecialistArguments, formatted_context, citations) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist contact_time_evaluation started", {})
contact_time_answer = HumanAnswerServices.get_answer_to_question(self.tenant_id, CONTACT_TIME_QUESTION,
arguments.question, arguments.language)
rag_output = self._check_and_execute_rag(arguments, formatted_context, citations)
if contact_time_answer == "No answer provided":
answer = TranslationServices.translate(self.tenant_id, NO_CONTACT_TIME_MESSAGE, arguments.language)
if rag_output:
answer = f"{answer}\n\n{rag_output.answer}"
self.flow.state.answer = answer
self.flow.state.phase = "contact_time_evaluation"
results = SelectionResult.create_for_type(self.type, self.type_version,)
else:
answer = TranslationServices.translate(self.tenant_id, CONTACT_TIME_PROCESSED_MESSAGE, arguments.language)
if rag_output:
answer = f"{answer}\n\n{rag_output.answer}"
self.flow.state.answer = answer
self.flow.state.phase = "candidate_selected"
self.flow.state.contact_time = contact_time_answer
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_no_valid_candidate_state(self, arguments: SpecialistArguments, formatted_context, citations) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist no_valid_candidate started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"Je voldoet jammer genoeg niet aan de minimale vereisten voor deze job. Maar solliciteer gerust voor één van onze andere jobs.",
phase="no_valid_candidate")
answer = (f"{TranslationServices.translate(self.tenant_id, KO_CRITERIA_NOT_MET_MESSAGE, arguments.language)}\n"
f"{TranslationServices.translate(self.tenant_id, NO_FURTHER_QUESTIONS_MESSAGE, arguments.language)}\n")
def execute_candidate_selected(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Selection Specialist candidate_selected started", {})
results = SpecialistResult.create_for_type(self.type, self.type_version,
answer=f"We hebben je contactgegegevens verwerkt. We nemen zo snel mogelijk contact met je op.",
phase="candidate_selected")
self.flow.state.answer = answer
self.flow.state.phase = "no_valid_candidate"
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_candidate_selected_state(self, arguments: SpecialistArguments, formatted_context, citations) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist candidate_selected started", {})
answer = TranslationServices.translate(self.tenant_id, SUCCESSFUL_ENDING_MESSAGE, arguments.language)
class SelectionInput(BaseModel):
region: str = Field(..., alias="region")
working_schedule: Optional[str] = Field(..., alias="working_schedule")
start_date: Optional[date] = Field(None, alias="vacancy_text")
language: Optional[str] = Field(None, alias="language")
interaction_mode: Optional[str] = Field(None, alias="interaction_mode")
tone_of_voice: Optional[str] = Field(None, alias="tone_of_voice")
tone_of_voice_context: Optional[str] = Field(None, alias="tone_of_voice_context")
language_level: Optional[str] = Field(None, alias="language_level")
language_level_context: Optional[str] = Field(None, alias="language_level_context")
ko_criteria: Optional[List[Dict[str, str]]] = Field(None, alias="ko_criteria")
question: Optional[str] = Field(None, alias="question")
field_values: Optional[Dict[str, Any]] = Field(None, alias="field_values")
self.flow.state.answer = answer
self.flow.state.phase = "candidate_selected"
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_rag_state(self, arguments: SpecialistArguments, formatted_context, citations, welcome_message=None) \
-> SpecialistResult:
self.log_tuning("Traicie Selection Specialist rag_state started", {})
start_selection_question = TranslationServices.translate(self.tenant_id, START_SELECTION_QUESTION,
arguments.language)
if welcome_message:
answer = f"{welcome_message}\n\n{start_selection_question}"
else:
answer = ""
rag_results = None
if arguments.question:
if HumanAnswerServices.check_additional_information(self.tenant_id,
START_SELECTION_QUESTION,
arguments.question,
arguments.language):
rag_results = self.execute_rag(arguments, formatted_context, citations)
self.flow.state.rag_output = rag_results.rag_output
answer = f"{answer}\n{rag_results.answer}"
if HumanAnswerServices.check_affirmative_answer(self.tenant_id,
START_SELECTION_QUESTION,
arguments.question,
arguments.language):
return self.execute_start_selection_procedure_state(arguments, formatted_context, citations, answer)
self.flow.state.answer = answer
self.flow.state.phase = "rag"
self.flow.state.form_request = None
results = SelectionResult.create_for_type(self.type, self.type_version,)
return results
def execute_rag(self, arguments: SpecialistArguments, formatted_context, citations) -> RAGOutput:
self.log_tuning("RAG Specialist execution started", {})
insufficient_info_message = TranslationServices.translate(self.tenant_id,
INSUFFICIENT_INFORMATION_MESSAGE,
arguments.language)
if formatted_context:
flow_inputs = {
"language": arguments.language,
"question": arguments.question,
"context": formatted_context,
"history": self.formatted_history,
"name": self.specialist.configuration.get('name', ''),
}
rag_output = self.flow.kickoff(inputs=flow_inputs)
if rag_output.rag_output.insufficient_info:
rag_output.rag_output.answer = insufficient_info_message
else:
rag_output = RAGOutput(answer=insufficient_info_message,
insufficient_info=True)
self.log_tuning(f"RAG Specialist execution ended", {"Results": rag_output.model_dump()})
return rag_output
def _check_and_execute_rag(self, arguments: SpecialistArguments, formatted_context, citations) -> RAGOutput:
if HumanAnswerServices.check_additional_information(self.tenant_id,
START_SELECTION_QUESTION,
arguments.question,
arguments.language):
results = self.execute_rag(arguments, formatted_context, citations)
return results
else:
return None
def _get_ko_questions(self) -> KOQuestions:
ko_questions_asset = db.session.query(EveAIAsset).filter(
EveAIAsset.type == "TRAICIE_KO_CRITERIA_QUESTIONS",
EveAIAsset.type_version == "1.0.0",
EveAIAsset.configuration.is_not(None),
EveAIAsset.configuration.has_key('specialist_id'),
EveAIAsset.configuration['specialist_id'].astext.cast(db.Integer) == self.specialist_id
).first()
if not ko_questions_asset:
raise EveAISpecialistExecutionError(self.tenant_id, self.specialist_id, self.session_id,
"No KO criteria questions found")
ko_questions_data = minio_client.download_asset_file(self.tenant_id, ko_questions_asset.bucket_name,
ko_questions_asset.object_name)
ko_questions = KOQuestions.from_json(ko_questions_data)
current_app.logger.debug(f"KO Questions: {ko_questions}")
return ko_questions
class SelectionKOCriteriumScore(BaseModel):
@@ -285,12 +451,6 @@ class SelectionKOCriteriumScore(BaseModel):
score: Optional[int] = Field(None, alias="score")
class SelectionCompetencyScore(BaseModel):
competency: Optional[str] = Field(None, alias="competency")
answer: Optional[str] = Field(None, alias="answer")
score: Optional[int] = Field(None, alias="score")
class PersonalContactData(BaseModel):
name: str = Field(..., description="Your name", alias="name")
email: EmailStr = Field(..., description="Your Name", alias="email")
@@ -302,34 +462,51 @@ class PersonalContactData(BaseModel):
consent: bool = Field(..., description="Consent", alias="consent")
class SelectionResult(SpecialistResult):
ko_criteria_questions: Optional[List[ListItem]] = Field(None, alias="ko_criteria_questions")
ko_criteria_scores: Optional[List[SelectionKOCriteriumScore]] = Field(None, alias="ko_criteria_scores")
competency_questions: Optional[List[ListItem]] = Field(None, alias="competency_questions")
competency_scores: Optional[List[SelectionCompetencyScore]] = Field(None, alias="competency_scores")
personal_contact_data: Optional[PersonalContactData] = Field(None, alias="personal_contact_data")
class SelectionInput(BaseModel):
# RAG elements
language: Optional[str] = Field(None, alias="language")
question: Optional[str] = Field(None, alias="query")
context: Optional[str] = Field(None, alias="context")
citations: Optional[List[int]] = Field(None, alias="citations")
history: Optional[str] = Field(None, alias="history")
name: Optional[str] = Field(None, alias="name")
# Selection elements
region: str = Field(..., alias="region")
working_schedule: Optional[str] = Field(..., alias="working_schedule")
start_date: Optional[date] = Field(None, alias="vacancy_text")
interaction_mode: Optional[str] = Field(None, alias="interaction_mode")
tone_of_voice: Optional[str] = Field(None, alias="tone_of_voice")
tone_of_voice_context: Optional[str] = Field(None, alias="tone_of_voice_context")
language_level: Optional[str] = Field(None, alias="language_level")
language_level_context: Optional[str] = Field(None, alias="language_level_context")
ko_criteria: Optional[List[Dict[str, str]]] = Field(None, alias="ko_criteria")
field_values: Optional[Dict[str, Any]] = Field(None, alias="field_values")
class SelectionFlowState(EveAIFlowState):
"""Flow state for Traicie Role Definition specialist that automatically updates from task outputs"""
"""Flow state for RAG specialist that automatically updates from task outputs"""
input: Optional[SelectionInput] = None
ko_criteria_questions: Optional[List[KOQuestion]] = Field(None, alias="ko_criteria_questions")
ko_criteria_scores: Optional[List[SelectionKOCriteriumScore]] = Field(None, alias="ko_criteria_scores")
competency_questions: Optional[List[ListItem]] = Field(None, alias="competency_questions")
competency_scores: Optional[List[SelectionCompetencyScore]] = Field(None, alias="competency_scores")
rag_output: Optional[RAGOutput] = None
ko_criteria_answers: Optional[Dict[str, str]] = None
personal_contact_data: Optional[PersonalContactData] = None
contact_time: Optional[str] = None
class SelectionResult(SpecialistResult):
rag_output: Optional[RAGOutput] = Field(None, alias="rag_output")
ko_criteria_answers: Optional[Dict[str, str]] = Field(None, alias="ko_criteria_answers")
personal_contact_data: Optional[PersonalContactData] = Field(None, alias="personal_contact_data")
phase: Optional[str] = Field(None, alias="phase")
interaction_mode: Optional[str] = Field(None, alias="mode")
contact_time: Optional[str] = None
class SelectionFlow(EveAICrewAIFlow[SelectionFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
ko_def_crew: EveAICrewAICrew,
rag_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "Traicie Role Definition Specialist Flow", **kwargs)
super().__init__(specialist_executor, "Selection Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.ko_def_crew = ko_def_crew
self.rag_crew = rag_crew
self.exception_raised = False
@start()
@@ -337,34 +514,24 @@ class SelectionFlow(EveAICrewAIFlow[SelectionFlowState]):
return ""
@listen(process_inputs)
async def execute_ko_def_definition(self):
async def execute_rag(self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("execute_ko_interview_definition")
crew_output = await self.ko_def_crew.kickoff_async(inputs=inputs)
# Unfortunately, crew_output will only contain the output of the latest task.
# As we will only take into account the flow state, we need to ensure both competencies and criteria
# are copies to the flow state.
update = {}
for task in self.ko_def_crew.tasks:
current_app.logger.debug(f"Task {task.name} output:\n{task.output}")
if task.name == "traicie_ko_criteria_interview_definition_task":
# update["competencies"] = task.output.pydantic.competencies
self.state.ko_criteria_questions = task.output.pydantic.ko_questions
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
self.state.phase = "personal_contact_data"
current_app.logger.debug(f"State after execute_ko_def_definition: {self.state}")
current_app.logger.debug(f"State dump after execute_ko_def_definition: {self.state.model_dump()}")
crew_output = await self.rag_crew.kickoff_async(inputs=inputs)
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = RAGOutput.model_validate(raw_json)
self.state.rag_output = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_ko_def Kickoff Error: {str(e)}")
current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = SelectionInput.model_validate(inputs)
current_app.logger.debug(f"State: {self.state}")
result = await super().kickoff_async(inputs)
return self.state