- smaller changes to eveai.css to ensure background of selected buttons do not get all white and to ensure that the background of fiels in editable cells do not become white in a tabulator.

- The Role Definition Specialist now creates a new selection specialist upon completion
This commit is contained in:
Josako
2025-06-01 10:09:34 +02:00
parent 578981c745
commit 81e754317a
6 changed files with 460 additions and 9 deletions

View File

@@ -0,0 +1,197 @@
import asyncio
import json
from os import wait
from typing import Optional, List
from crewai.flow.flow import start, listen, and_
from flask import current_app
from pydantic import BaseModel, Field
from sqlalchemy.exc import SQLAlchemyError
from common.extensions import db
from common.models.user import Tenant
from common.models.interaction import Specialist
from eveai_chat_workers.outputs.globals.basic_types.list_item import ListItem
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.traicie.competencies.competencies_v1_1 import Competencies
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
from common.services.interaction.specialist_services import SpecialistServices
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: TRAICIE_ROLE_DEFINITION_SPECIALIST
type_version: 1.0
Traicie Role Definition Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.role_definition_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
# Load the Tenant & set language
self.tenant = Tenant.query.get_or_404(tenant_id)
@property
def type(self) -> str:
return "TRAICIE_ROLE_DEFINITION_SPECIALIST"
@property
def type_version(self) -> str:
return "1.1"
def _config_task_agents(self):
self._add_task_agent("traicie_get_competencies_task", "traicie_hr_bp_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("traicie_get_competencies_task", Competencies, "competencies")
def _instantiate_specialist(self):
verbose = self.tuning
role_definition_agents = [self.traicie_hr_bp_agent]
role_definition_tasks = [self.traicie_get_competencies_task]
self.role_definition_crew = EveAICrewAICrew(
self,
"Role Definition Crew",
agents=role_definition_agents,
tasks=role_definition_tasks,
verbose=verbose,
)
self.flow = RoleDefinitionFlow(
self,
self.role_definition_crew
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("Traicie Role Definition Specialist execution started", {})
flow_inputs = {
"vacancy_text": arguments.vacancy_text,
"role_name": arguments.role_name,
'role_reference': arguments.role_reference,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
flow_state = self.flow.state
results = RoleDefinitionSpecialistResult.create_for_type(self.type, self.type_version)
if flow_state.competencies:
results.competencies = flow_state.competencies
self.create_selection_specialist(arguments, flow_state.competencies)
self.log_tuning(f"Traicie Role Definition Specialist execution ended", {"Results": results.model_dump()})
return results
def create_selection_specialist(self, arguments: SpecialistArguments, competencies: List[ListItem]):
"""This method creates a new TRAICIE_SELECTION_SPECIALIST specialist with the given competencies."""
current_app.logger.info(f"Creating selection with arguments: {arguments.model_dump()}")
selection_comptencies = []
for competency in competencies:
selection_competency = {
"title": competency.title,
"description": competency.description,
"assess": True,
"is_knockout": False,
}
selection_comptencies.append(selection_competency)
selection_config = {
"name": arguments.specialist_name,
"competencies": selection_comptencies,
"tone_of_voice": "Professional & Neutral",
"language_level": "Standard",
"role_reference": arguments.role_reference,
}
name = arguments.role_name
if len(name) > 50:
name = name[:47] + "..."
new_specialist = Specialist(
name=name,
description=f"Specialist for {arguments.role_name} role",
type="TRAICIE_SELECTION_SPECIALIST",
type_version="1.0",
tuning=False,
configuration=selection_config,
)
try:
db.session.add(new_specialist)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.error(f"Error creating selection specialist: {str(e)}")
raise e
SpecialistServices.initialize_specialist(new_specialist.id, "TRAICIE_SELECTION_SPECIALIST", "1.0")
class RoleDefinitionSpecialistInput(BaseModel):
role_name: str = Field(..., alias="role_name")
role_reference: Optional[str] = Field(..., alias="role_reference")
vacancy_text: Optional[str] = Field(None, alias="vacancy_text")
class RoleDefinitionSpecialistResult(SpecialistResult):
competencies: Optional[List[ListItem]] = None
class RoleDefFlowState(EveAIFlowState):
"""Flow state for Traicie Role Definition specialist that automatically updates from task outputs"""
input: Optional[RoleDefinitionSpecialistInput] = None
competencies: Optional[List[ListItem]] = None
class RoleDefinitionFlow(EveAICrewAIFlow[RoleDefFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
role_definitiion_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "Traicie Role Definition Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.role_definition_crew = role_definitiion_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_role_definition (self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("In execute_role_definition")
crew_output = await self.role_definition_crew.kickoff_async(inputs=inputs)
# Unfortunately, crew_output will only contain the output of the latest task.
# As we will only take into account the flow state, we need to ensure both competencies and criteria
# are copies to the flow state.
update = {}
for task in self.role_definition_crew.tasks:
current_app.logger.debug(f"Task {task.name} output:\n{task.output}")
if task.name == "traicie_get_competencies_task":
# update["competencies"] = task.output.pydantic.competencies
self.state.competencies = task.output.pydantic.competencies
# crew_output.pydantic = crew_output.pydantic.model_copy(update=update)
current_app.logger.debug(f"State after execute_role_definition: {self.state}")
current_app.logger.debug(f"State dump after execute_role_definition: {self.state.model_dump()}")
return crew_output
except Exception as e:
current_app.logger.error(f"CREW execute_role_definition Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
current_app.logger.debug(f"Inputs: {inputs}")
self.state.input = RoleDefinitionSpecialistInput.model_validate(inputs)
current_app.logger.debug(f"State: {self.state}")
result = await super().kickoff_async(inputs)
return self.state