Merge branch 'feature/Improvement_of_RAG_Specialist' into develop

This commit is contained in:
Josako
2025-10-21 14:00:20 +02:00
12 changed files with 440 additions and 19 deletions

View File

@@ -140,21 +140,17 @@ def enforce_tenant_consent_ui():
"""Check if the user has consented to the terms of service""" """Check if the user has consented to the terms of service"""
path = getattr(request, 'path', '') or '' path = getattr(request, 'path', '') or ''
if path.startswith('/healthz') or path.startswith('/_healthz'): if path.startswith('/healthz') or path.startswith('/_healthz'):
current_app.logger.debug(f'Health check request, bypassing consent guard: {path}')
return None return None
if not current_user.is_authenticated: if not current_user.is_authenticated:
current_app.logger.debug('Not authenticated, bypassing consent guard')
return None return None
endpoint = request.endpoint or '' endpoint = request.endpoint or ''
if is_exempt_endpoint(endpoint) or request.method == 'OPTIONS': if is_exempt_endpoint(endpoint) or request.method == 'OPTIONS':
current_app.logger.debug(f'Endpoint exempt from consent guard: {endpoint}')
return None return None
# Global bypass: Super User and Partner Admin always allowed # Global bypass: Super User and Partner Admin always allowed
if current_user.has_roles('Super User') or current_user.has_roles('Partner Admin'): if current_user.has_roles('Super User') or current_user.has_roles('Partner Admin'):
current_app.logger.debug('Global bypass: Super User or Partner Admin')
return None return None
tenant_id = getattr(current_user, 'tenant_id', None) tenant_id = getattr(current_user, 'tenant_id', None)
@@ -176,16 +172,13 @@ def enforce_tenant_consent_ui():
status = ConsentStatus.NOT_CONSENTED status = ConsentStatus.NOT_CONSENTED
if status == ConsentStatus.CONSENTED: if status == ConsentStatus.CONSENTED:
current_app.logger.debug('User has consented')
return None return None
if status == ConsentStatus.NOT_CONSENTED: if status == ConsentStatus.NOT_CONSENTED:
current_app.logger.debug('User has not consented')
if current_user.has_roles('Tenant Admin'): if current_user.has_roles('Tenant Admin'):
return redirect(prefixed_url_for('user_bp.tenant_consent', for_redirect=True)) return redirect(prefixed_url_for('user_bp.tenant_consent', for_redirect=True))
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True)) return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
if status == ConsentStatus.RENEWAL_REQUIRED: if status == ConsentStatus.RENEWAL_REQUIRED:
current_app.logger.debug('Consent renewal required')
if current_user.has_roles('Tenant Admin'): if current_user.has_roles('Tenant Admin'):
flash( flash(
"You need to renew your consent to our DPA or T&Cs. Failing to do so in time will stop you from accessing our services.", "You need to renew your consent to our DPA or T&Cs. Failing to do so in time will stop you from accessing our services.",

View File

@@ -1,4 +1,4 @@
version: "1.0.0" version: "1.1.0"
name: "Rag Agent" name: "Rag Agent"
role: > role: >
{tenant_name} Spokesperson. {custom_role} {tenant_name} Spokesperson. {custom_role}
@@ -7,7 +7,7 @@ goal: >
of the current conversation. of the current conversation.
{custom_goal} {custom_goal}
backstory: > backstory: >
You are the primary contact for {tenant_name}. You are known by {name}, and can be addressed by this name, or you. You are You are the primary contact for {tenant_name}. You are known by {name}, and can be addressed by this name, or 'you'. You are
a very good communicator, and adapt to the style used by the human asking for information (e.g. formal or informal). a very good communicator, and adapt to the style used by the human asking for information (e.g. formal or informal).
You always stay correct and polite, whatever happens. And you ensure no discriminating language is used. You always stay correct and polite, whatever happens. And you ensure no discriminating language is used.
You are perfectly multilingual in all known languages, and do your best to answer questions in {language}, whatever You are perfectly multilingual in all known languages, and do your best to answer questions in {language}, whatever

View File

@@ -0,0 +1,28 @@
version: "1.2.0"
name: "Rag Agent"
role: >
{tenant_name}'s Spokesperson. {custom_role}
goal: >
You get questions by a human correspondent, and give answers based on a given context, taking into account the history
of the current conversation.
{custom_goal}
backstory: >
You are the primary contact for {tenant_name}. You are known by {name}, and can be addressed by this name, or 'you'.
You are a very good communicator.
We want you to answer using the following Tone of Voice: {tone_of_voice} - {tone_of_voice_context}.
And we want you to answer using the following Language Level: {language_level} - {language_level_context}.
We want your answers to be {response_depth} - {response_depth_context}.
We want your answers to be {conversation_purpose} - {conversation_purpose_context}.
You can change Tone of Voice and Language level if required by the person you are talking to..
You always stay correct and polite, whatever happens. And you ensure no discriminating language is used.
You are perfectly multilingual in all known languages, and do your best to answer questions in {language}, whatever
language the context provided to you is in. You are participating in a conversation, not writing e.g. an email. Do not
include a salutation or closing greeting in your answer.
{custom_backstory}
full_model_name: "mistral.mistral-medium-latest"
temperature: 0.4
metadata:
author: "Josako"
date_added: "2025-01-08"
description: "An Agent that does RAG based on a user's question, RAG content & history"
changes: "Initial version"

View File

@@ -11,7 +11,7 @@ fields:
email: email:
name: "Email" name: "Email"
type: "str" type: "str"
description: "Your Name" description: "Your Email"
required: true required: true
phone: phone:
name: "Phone Number" name: "Phone Number"

View File

@@ -0,0 +1,77 @@
version: "1.2.0"
name: "RAG Specialist"
framework: "crewai"
chat: true
configuration:
name:
name: "name"
type: "str"
description: "The name the specialist is called upon."
required: true
tone_of_voice:
name: "Tone of Voice"
description: "The tone of voice the specialist uses to communicate"
type: "enum"
allowed_values: [ "Professional & Neutral", "Warm & Empathetic", "Energetic & Enthusiastic", "Accessible & Informal", "Expert & Trustworthy", "No-nonsense & Goal-driven" ]
default: "Professional & Neutral"
required: true
language_level:
name: "Language Level"
description: "Language level to be used when communicating, relating to CEFR levels"
type: "enum"
allowed_values: [ "Basic", "Standard", "Professional" ]
default: "Standard"
required: true
response_depth:
name: "Response Depth"
description: "Response depth to be used when communicating"
type: "enum"
allowed_values: [ "Concise", "Balanced", "Detailed",]
default: "Balanced"
required: true
conversation_purpose:
name: "Conversation Purpose"
description: "Purpose of the conversation, resulting in communication style"
type: "enum"
allowed_values: [ "Informative", "Persuasive", "Supportive", "Collaborative" ]
default: "Informative"
required: true
welcome_message:
name: "Welcome Message"
type: "string"
description: "Welcome Message to be given to the end user"
required: false
arguments:
language:
name: "Language"
type: "str"
description: "Language code to be used for receiving questions and giving answers"
required: true
results:
rag_output:
answer:
name: "answer"
type: "str"
description: "Answer to the query"
required: true
citations:
name: "citations"
type: "List[str]"
description: "List of citations"
required: false
insufficient_info:
name: "insufficient_info"
type: "bool"
description: "Whether or not the query is insufficient info"
required: true
agents:
- type: "RAG_AGENT"
version: "1.2"
tasks:
- type: "RAG_TASK"
version: "1.1"
metadata:
author: "Josako"
date_added: "2025-01-08"
changes: "Initial version"
description: "A Specialist that performs Q&A activities"

View File

@@ -3,24 +3,29 @@ name: "RAG Task"
task_description: > task_description: >
Answer the following question (in between triple £): Answer the following question (in between triple £):
£££{question}£££ £££
{question}
£££
Base your answer on the following context (in between triple $): Base your answer on the following context (in between triple $):
$$${context}$$$ $$$
{context}
$$$
Take into account the following history of the conversation (in between triple €): Take into account the following history of the conversation (in between triple €):
€€€{history}€€€ €€€
{history}
€€€
The HUMAN parts indicate the interactions by the end user, the AI parts are your interactions. The HUMAN parts indicate the interactions by the end user, the AI parts are your interactions.
Best Practices are: Best Practices are:
- Answer the provided question as precisely and directly as you can, combining elements of the provided context. - Answer the provided question, combining elements of the provided context.
- Always focus your answer on the actual HUMAN question. - Always focus your answer on the actual question.
- Try not to repeat your answers (preceded by AI), unless absolutely necessary. - Try not to repeat your historic answers, unless absolutely necessary.
- Focus your answer on the question at hand.
- Always be friendly and helpful for the end user. - Always be friendly and helpful for the end user.
{custom_description} {custom_description}

View File

@@ -113,7 +113,6 @@ def create_app(config_file=None):
# Register global consent guard via extension # Register global consent guard via extension
@app.before_request @app.before_request
def enforce_tenant_consent(): def enforce_tenant_consent():
app.logger.debug("Enforcing tenant consent")
return enforce_tenant_consent_ui() return enforce_tenant_consent_ui()
# @app.before_request # @app.before_request

View File

@@ -155,7 +155,7 @@ class EditRetrieverForm(DynamicFormBase):
description = TextAreaField('Description', validators=[Optional()]) description = TextAreaField('Description', validators=[Optional()])
# Select Field for Retriever Type (Uses the RETRIEVER_TYPES defined in config) # Select Field for Retriever Type (Uses the RETRIEVER_TYPES defined in config)
type = StringField('Processor Type', validators=[DataRequired()], render_kw={'readonly': True}) type = StringField('Retriever Type', validators=[DataRequired()], render_kw={'readonly': True})
type_version = StringField('Retriever Type Version', validators=[DataRequired()], render_kw={'readonly': True}) type_version = StringField('Retriever Type Version', validators=[DataRequired()], render_kw={'readonly': True})
tuning = BooleanField('Enable Tuning', default=False) tuning = BooleanField('Enable Tuning', default=False)

View File

@@ -0,0 +1,30 @@
CHANNEL_ADAPTATION = [
{
"name": "Mobile Chat/Text",
"description": "Short, scannable messages. Uses bullet points, emojis, and concise language.",
"when_to_use": "Instant messaging, SMS, or chatbots."
},
{
"name": "Voice/Spoken",
"description": "Natural, conversational language. Includes pauses, intonation, and simple sentences.",
"when_to_use": "Voice assistants, phone calls, or voice-enabled apps."
},
{
"name": "Email/Formal Text",
"description": "Structured, polished, and professional. Uses paragraphs and clear formatting.",
"when_to_use": "Emails, reports, or official documentation."
},
{
"name": "Multimodal",
"description": "Combines text with visuals, buttons, or interactive elements for clarity and engagement.",
"when_to_use": "Websites, apps, or platforms supporting rich media."
}
]
def get_channel_adaptation_context(channel_adaptation:str) -> str:
selected_channel_adaptation = next(
(item for item in CHANNEL_ADAPTATION if item["name"] == channel_adaptation),
None
)
channel_adaptation_context = f"{selected_channel_adaptation['description']}"
return channel_adaptation_context

View File

@@ -0,0 +1,30 @@
CONVERSATION_PURPOSE = [
{
"name": "Informative",
"description": "Focus on sharing facts, explanations, or instructions.",
"when_to_use": "User seeks knowledge, clarification, or guidance."
},
{
"name": "Persuasive",
"description": "Aim to convince, motivate, or drive action. Highlights benefits and calls to action.",
"when_to_use": "Sales, marketing, or when encouraging the user to take a specific step."
},
{
"name": "Supportive",
"description": "Empathetic, solution-oriented, and reassuring. Prioritizes the user's needs and emotions.",
"when_to_use": "Customer support, healthcare, or emotionally sensitive situations."
},
{
"name": "Collaborative",
"description": "Encourages dialogue, brainstorming, and co-creation. Invites user input and ideas.",
"when_to_use": "Teamwork, creative processes, or open-ended discussions."
}
]
def get_conversation_purpose_context(conversation_purpose:str) -> str:
selected_conversation_purpose = next(
(item for item in CONVERSATION_PURPOSE if item["name"] == conversation_purpose),
None
)
conversation_purpose_context = f"{selected_conversation_purpose['description']}"
return conversation_purpose_context

View File

@@ -0,0 +1,25 @@
RESPONSE_DEPTH = [
{
"name": "Concise",
"description": "Short, direct answers. No extra explanation or context.",
"when_to_use": "Quick queries, urgent requests, or when the user prefers brevity."
},
{
"name": "Balanced",
"description": "Clear answers with minimal context or next steps. Not too short, not too detailed.",
"when_to_use": "General interactions, FAQs, or when the user needs a mix of speed and clarity."
},
{
"name": "Detailed",
"description": "Comprehensive answers with background, examples, or step-by-step guidance.",
"when_to_use": "Complex topics, tutorials, or when the user requests in-depth information."
}
]
def get_response_depth_context(response_depth:str) -> str:
selected_response_depth = next(
(item for item in RESPONSE_DEPTH if item["name"] == response_depth),
None
)
response_depth_context = f"{selected_response_depth['description']}"
return response_depth_context

View File

@@ -0,0 +1,234 @@
import json
import random
from os import wait
from typing import Optional, List, Dict, Any
from crewai.flow.flow import start, listen, and_
from flask import current_app
from pydantic import BaseModel, Field
from common.services.utils.translation_services import TranslationServices
from common.utils.business_event_context import current_event
from eveai_chat_workers.definitions.conversation_purpose.conversation_purpose_v1_0 import \
get_conversation_purpose_context
from eveai_chat_workers.definitions.language_level.language_level_v1_0 import get_language_level_context
from eveai_chat_workers.definitions.response_depth.response_depth_v1_0 import get_response_depth_context
from eveai_chat_workers.definitions.tone_of_voice.tone_of_voice_v1_0 import get_tone_of_voice_context
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.globals.rag.rag_v1_0 import RAGOutput
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
INSUFFICIENT_INFORMATION_MESSAGES = [
"I'm afraid I don't have enough information to answer that properly. Feel free to ask something else!",
"There isnt enough data available right now to give you a clear answer. You're welcome to rephrase or ask a different question.",
"Sorry, I can't provide a complete answer based on the current information. Would you like to try asking something else?",
"I dont have enough details to give you a confident answer. You can always ask another question if youd like.",
"Unfortunately, I cant answer that accurately with the information at hand. Please feel free to ask something else.",
"Thats a great question, but I currently lack the necessary information to respond properly. Want to ask something different?",
"I wish I could help more, but the data I have isn't sufficient to answer this. Youre welcome to explore other questions.",
"Theres not enough context for me to provide a good answer. Dont hesitate to ask another question if you'd like!",
"I'm not able to give a definitive answer to that. Perhaps try a different question or angle?",
"Thanks for your question. At the moment, I cant give a solid answer — but I'm here if you want to ask something else!"
]
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: RAG_SPECIALIST
type_version: 1.0
RAG Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.rag_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
@property
def type(self) -> str:
return "RAG_SPECIALIST"
@property
def type_version(self) -> str:
return "1.2"
def _config_task_agents(self):
self._add_task_agent("rag_task", "rag_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
def _config_state_result_relations(self):
self._add_state_result_relation("rag_output")
self._add_state_result_relation("citations")
def _instantiate_specialist(self):
verbose = self.tuning
rag_agents = [self.rag_agent]
rag_tasks = [self.rag_task]
self.rag_crew = EveAICrewAICrew(
self,
"Rag Crew",
agents=rag_agents,
tasks=rag_tasks,
verbose=verbose,
)
self.flow = RAGFlow(
self,
self.rag_crew,
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist execution started", {})
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
match specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
case "rag":
results = self.execute_rag_state(arguments, formatted_context, citations)
self.log_tuning(f"RAG Specialist execution ended", {"Results": results.model_dump()})
return results
def execute_initial_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist initial_state execution started", {})
welcome_message = self.specialist.configuration.get('welcome_message', 'Welcome! You can start asking questions')
welcome_message = TranslationServices.translate(self.tenant_id, welcome_message, arguments.language)
self.flow.state.answer = welcome_message
self.flow.state.phase = "rag"
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
return results
def execute_rag_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist rag_state execution started", {})
insufficient_info_message = TranslationServices.translate(self.tenant_id,
random.choice(INSUFFICIENT_INFORMATION_MESSAGES),
arguments.language)
formatted_context, citations = self._retrieve_context(arguments)
self.flow.state.citations = citations
tone_of_voice = self.specialist.configuration.get('tone_of_voice', 'Professional & Neutral')
tone_of_voice_context = get_tone_of_voice_context(tone_of_voice)
language_level = self.specialist.configuration.get('language_level', 'Standard')
language_level_context = get_language_level_context(language_level)
response_depth = self.specialist.configuration.get('response_depth', 'Balanced')
response_depth_context = get_response_depth_context(response_depth)
conversation_purpose = self.specialist.configuration.get('conversation_purpose', 'Informative')
conversation_purpose_context = get_conversation_purpose_context(conversation_purpose)
if formatted_context:
flow_inputs = {
"language": arguments.language,
"question": arguments.question,
"context": formatted_context,
"history": self.formatted_history,
"name": self.specialist.configuration.get('name', ''),
"welcome_message": self.specialist.configuration.get('welcome_message', ''),
"tone_of_voice": tone_of_voice,
"tone_of_voice_context": tone_of_voice_context,
"language_level": language_level,
"language_level_context": language_level_context,
"response_depth": response_depth,
"response_depth_context": response_depth_context,
"conversation_purpose": conversation_purpose,
"conversation_purpose_context": conversation_purpose_context,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
if flow_results.rag_output.insufficient_info:
flow_results.rag_output.answer = insufficient_info_message
rag_output = flow_results.rag_output
else:
rag_output = RAGOutput(answer=insufficient_info_message, insufficient_info=True)
self.flow.state.rag_output = rag_output
self.flow.state.citations = citations
self.flow.state.answer = rag_output.answer
self.flow.state.phase = "rag"
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
return results
class RAGSpecialistInput(BaseModel):
language: Optional[str] = Field(None, alias="language")
question: Optional[str] = Field(None, alias="question")
context: Optional[str] = Field(None, alias="context")
history: Optional[str] = Field(None, alias="history")
name: Optional[str] = Field(None, alias="name")
welcome_message: Optional[str] = Field(None, alias="welcome_message")
tone_of_voice: Optional[str] = Field(None, alias="tone_of_voice")
tone_of_voice_context: Optional[str] = Field(None, alias="tone_of_voice_context")
language_level: Optional[str] = Field(None, alias="language_level")
language_level_context: Optional[str] = Field(None, alias="language_level_context")
response_depth: Optional[str] = Field(None, alias="response_depth")
response_depth_context: Optional[str] = Field(None, alias="response_depth_context")
conversation_purpose: Optional[str] = Field(None, alias="conversation_purpose")
conversation_purpose_context: Optional[str] = Field(None, alias="conversation_purpose_context")
class RAGSpecialistResult(SpecialistResult):
rag_output: Optional[RAGOutput] = Field(None, alias="Rag Output")
class RAGFlowState(EveAIFlowState):
"""Flow state for RAG specialist that automatically updates from task outputs"""
input: Optional[RAGSpecialistInput] = None
rag_output: Optional[RAGOutput] = None
citations: Optional[List[Dict[str, Any]]] = None
class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
rag_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "RAG Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.rag_crew = rag_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_rag(self):
inputs = self.state.input.model_dump()
try:
crew_output = await self.rag_crew.kickoff_async(inputs=inputs)
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = RAGOutput.model_validate(raw_json)
self.state.rag_output = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
self.state.input = RAGSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state