- New version of RAG_SPECIALIST and RAG_AGENT, including definition of conversation_purpose and response_depth.

This commit is contained in:
Josako
2025-10-20 15:37:36 +02:00
parent 451f95fbc1
commit aab766fe5e
8 changed files with 429 additions and 9 deletions

View File

@@ -0,0 +1,30 @@
CHANNEL_ADAPTATION = [
{
"name": "Mobile Chat/Text",
"description": "Short, scannable messages. Uses bullet points, emojis, and concise language.",
"when_to_use": "Instant messaging, SMS, or chatbots."
},
{
"name": "Voice/Spoken",
"description": "Natural, conversational language. Includes pauses, intonation, and simple sentences.",
"when_to_use": "Voice assistants, phone calls, or voice-enabled apps."
},
{
"name": "Email/Formal Text",
"description": "Structured, polished, and professional. Uses paragraphs and clear formatting.",
"when_to_use": "Emails, reports, or official documentation."
},
{
"name": "Multimodal",
"description": "Combines text with visuals, buttons, or interactive elements for clarity and engagement.",
"when_to_use": "Websites, apps, or platforms supporting rich media."
}
]
def get_channel_adaptation_context(channel_adaptation:str) -> str:
selected_channel_adaptation = next(
(item for item in CHANNEL_ADAPTATION if item["name"] == channel_adaptation),
None
)
channel_adaptation_context = f"{selected_channel_adaptation['description']}"
return channel_adaptation_context

View File

@@ -0,0 +1,30 @@
CONVERSATION_PURPOSE = [
{
"name": "Informative",
"description": "Focus on sharing facts, explanations, or instructions.",
"when_to_use": "User seeks knowledge, clarification, or guidance."
},
{
"name": "Persuasive",
"description": "Aim to convince, motivate, or drive action. Highlights benefits and calls to action.",
"when_to_use": "Sales, marketing, or when encouraging the user to take a specific step."
},
{
"name": "Supportive",
"description": "Empathetic, solution-oriented, and reassuring. Prioritizes the user's needs and emotions.",
"when_to_use": "Customer support, healthcare, or emotionally sensitive situations."
},
{
"name": "Collaborative",
"description": "Encourages dialogue, brainstorming, and co-creation. Invites user input and ideas.",
"when_to_use": "Teamwork, creative processes, or open-ended discussions."
}
]
def get_conversation_purpose_context(conversation_purpose:str) -> str:
selected_conversation_purpose = next(
(item for item in CONVERSATION_PURPOSE if item["name"] == conversation_purpose),
None
)
conversation_purpose_context = f"{selected_conversation_purpose['description']}"
return conversation_purpose_context

View File

@@ -0,0 +1,25 @@
RESPONSE_DEPTH = [
{
"name": "Concise",
"description": "Short, direct answers. No extra explanation or context.",
"when_to_use": "Quick queries, urgent requests, or when the user prefers brevity."
},
{
"name": "Balanced",
"description": "Clear answers with minimal context or next steps. Not too short, not too detailed.",
"when_to_use": "General interactions, FAQs, or when the user needs a mix of speed and clarity."
},
{
"name": "Detailed",
"description": "Comprehensive answers with background, examples, or step-by-step guidance.",
"when_to_use": "Complex topics, tutorials, or when the user requests in-depth information."
}
]
def get_response_depth_context(response_depth:str) -> str:
selected_response_depth = next(
(item for item in RESPONSE_DEPTH if item["name"] == response_depth),
None
)
response_depth_context = f"{selected_response_depth['description']}"
return response_depth_context

View File

@@ -0,0 +1,225 @@
import json
from os import wait
from typing import Optional, List, Dict, Any
from crewai.flow.flow import start, listen, and_
from flask import current_app
from pydantic import BaseModel, Field
from common.services.utils.translation_services import TranslationServices
from common.utils.business_event_context import current_event
from eveai_chat_workers.definitions.conversation_purpose.conversation_purpose_v1_0 import \
get_conversation_purpose_context
from eveai_chat_workers.definitions.language_level.language_level_v1_0 import get_language_level_context
from eveai_chat_workers.definitions.response_depth.response_depth_v1_0 import get_response_depth_context
from eveai_chat_workers.definitions.tone_of_voice.tone_of_voice_v1_0 import get_tone_of_voice_context
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.globals.rag.rag_v1_0 import RAGOutput
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
INSUFFICIENT_INFORMATION_MESSAGES = [
"I'm afraid I don't have enough information to answer that properly. Feel free to ask something else!",
"There isnt enough data available right now to give you a clear answer. You're welcome to rephrase or ask a different question.",
"Sorry, I can't provide a complete answer based on the current information. Would you like to try asking something else?",
"I dont have enough details to give you a confident answer. You can always ask another question if youd like.",
"Unfortunately, I cant answer that accurately with the information at hand. Please feel free to ask something else.",
"Thats a great question, but I currently lack the necessary information to respond properly. Want to ask something different?",
"I wish I could help more, but the data I have isn't sufficient to answer this. Youre welcome to explore other questions.",
"Theres not enough context for me to provide a good answer. Dont hesitate to ask another question if you'd like!",
"I'm not able to give a definitive answer to that. Perhaps try a different question or angle?",
"Thanks for your question. At the moment, I cant give a solid answer — but I'm here if you want to ask something else!"
]
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: RAG_SPECIALIST
type_version: 1.0
RAG Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.rag_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
@property
def type(self) -> str:
return "RAG_SPECIALIST"
@property
def type_version(self) -> str:
return "1.2"
def _config_task_agents(self):
self._add_task_agent("rag_task", "rag_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
def _config_state_result_relations(self):
self._add_state_result_relation("rag_output")
self._add_state_result_relation("citations")
def _instantiate_specialist(self):
verbose = self.tuning
rag_agents = [self.rag_agent]
rag_tasks = [self.rag_task]
self.rag_crew = EveAICrewAICrew(
self,
"Rag Crew",
agents=rag_agents,
tasks=rag_tasks,
verbose=verbose,
)
self.flow = RAGFlow(
self,
self.rag_crew,
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist execution started", {})
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
match specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
case "rag":
results = self.execute_rag_state(arguments, formatted_context, citations)
self.log_tuning(f"RAG Specialist execution ended", {"Results": results.model_dump()})
return results
def execute_initial_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist initial_state execution started", {})
welcome_message = self.specialist.configuration.get('welcome_message', 'Welcome! You can start asking questions')
welcome_message = TranslationServices.translate(self.tenant_id, welcome_message, arguments.language)
self.flow.state.answer = welcome_message
self.flow.state.phase = "rag"
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
return results
def execute_rag_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist rag_state execution started", {})
insufficient_info_message = TranslationServices.translate(self.tenant_id,
INSUFFICIENT_INFORMATION_MESSAGE,
arguments.language)
formatted_context, citations = self._retrieve_context(arguments)
self.flow.state.citations = citations
tone_of_voice = self.specialist.configuration.get('tone_of_voice', 'Professional & Neutral')
tone_of_voice_context = get_tone_of_voice_context(tone_of_voice)
language_level = self.specialist.configuration.get('language_level', 'Standard')
language_level_context = get_language_level_context(language_level)
response_depth = self.specialist.configuration.get('response_depth', 'Balanced')
response_depth_context = get_response_depth_context(response_depth)
conversation_purpose = self.specialist.configuration.get('conversation_purpose', 'Informative')
conversation_purpose_context = get_conversation_purpose_context(conversation_purpose)
if formatted_context:
flow_inputs = {
"language": arguments.language,
"question": arguments.question,
"context": formatted_context,
"history": self.formatted_history,
"name": self.specialist.configuration.get('name', ''),
"welcome_message": self.specialist.configuration.get('welcome_message', ''),
"tone_of_voice": tone_of_voice,
"tone_of_voice_context": tone_of_voice_context,
"language_level": language_level,
"language_level_context": language_level_context,
"response_depth": response_depth,
"response_depth_context": response_depth_context,
"conversation_purpose": conversation_purpose,
"conversation_purpose_context": conversation_purpose_context,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
if flow_results.rag_output.insufficient_info:
flow_results.rag_output.answer = insufficient_info_message
rag_output = flow_results.rag_output
else:
rag_output = RAGOutput(answer=insufficient_info_message, insufficient_info=True)
self.flow.state.rag_output = rag_output
self.flow.state.citations = citations
self.flow.state.answer = rag_output.answer
self.flow.state.phase = "rag"
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
return results
class RAGSpecialistInput(BaseModel):
language: Optional[str] = Field(None, alias="language")
question: Optional[str] = Field(None, alias="question")
context: Optional[str] = Field(None, alias="context")
history: Optional[str] = Field(None, alias="history")
name: Optional[str] = Field(None, alias="name")
welcome_message: Optional[str] = Field(None, alias="welcome_message")
class RAGSpecialistResult(SpecialistResult):
rag_output: Optional[RAGOutput] = Field(None, alias="Rag Output")
class RAGFlowState(EveAIFlowState):
"""Flow state for RAG specialist that automatically updates from task outputs"""
input: Optional[RAGSpecialistInput] = None
rag_output: Optional[RAGOutput] = None
citations: Optional[List[Dict[str, Any]]] = None
class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
rag_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "RAG Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.rag_crew = rag_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_rag(self):
inputs = self.state.input.model_dump()
try:
crew_output = await self.rag_crew.kickoff_async(inputs=inputs)
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = RAGOutput.model_validate(raw_json)
self.state.rag_output = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
self.state.input = RAGSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state