- Introduction of PARTNER_RAG retriever, PARTNER_RAG_SPECIALIST and linked Agent and Task, to support documentation inquiries in the management app (eveai_app)

- Addition of a tenant_partner_services view to show partner services from the viewpoint of a tenant
- Addition of domain model diagrams
- Addition of license_periods views and form
This commit is contained in:
Josako
2025-07-16 21:24:08 +02:00
parent 000636a229
commit f3a243698c
30 changed files with 1566 additions and 356 deletions

View File

@@ -0,0 +1,243 @@
import json
from os import wait
from typing import Optional, List, Dict, Any
from crewai.flow.flow import start, listen, and_
from flask import current_app
from pydantic import BaseModel, Field
from common.extensions import db
from common.models.interaction import SpecialistRetriever
from common.models.user import Partner, PartnerService, PartnerTenant, PartnerRAGRetriever
from common.services.utils.translation_services import TranslationServices
from common.utils.business_event_context import current_event
from eveai_chat_workers.retrievers.base_retriever import BaseRetriever, get_retriever_class
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.globals.rag.rag_v1_0 import RAGOutput
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
INSUFFICIENT_INFORMATION_MESSAGE = (
"We do not have the necessary information to provide you with the requested answers. "
"Please accept our apologies. Don't hesitate to ask other questions, and I'll do my best to answer them.")
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: RAG_SPECIALIST
type_version: 1.0
RAG Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.rag_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
@property
def type(self) -> str:
return "RAG_SPECIALIST"
@property
def type_version(self) -> str:
return "1.1"
def _initialize_retrievers(self) -> List[BaseRetriever]:
"""Initialize all retrievers associated with this specialist"""
retrievers = []
partner_ids = (
db.session.query(Partner.id)
.join(PartnerService, Partner.id == PartnerService.partner_id)
.join(PartnerTenant, PartnerService.id == PartnerTenant.partner_service_id)
.filter(PartnerTenant.tenant_id == self.tenant_id)
.distinct()
.all()
)
# Extract the actual partner IDs from the query result
partner_ids_list = [partner_id[0] for partner_id in partner_ids]
# Get all corresponding PartnerRagRetrievers for the partner_ids list
partner_rag_retrievers = (
PartnerRAGRetriever.query
.filter(PartnerRAGRetriever.partner_id.in_(partner_ids_list))
.filter(PartnerRAGRetriever.tenant_id == self.tenant_id)
.all()
)
retriever_executor_class = get_retriever_class("PARTNER_RAG", "1.0")
# Get retriever associations from database
self.log_tuning("_initialize_retrievers", {"Nr of partner retrievers": len(partner_rag_retrievers)})
for partner_rag_retriever in partner_rag_retrievers :
# Get retriever configuration from database
self.log_tuning("_initialize_retrievers", {
"Partner id": partner_rag_retriever.partner_id,
"Tenant id": partner_rag_retriever.tenant_id,
"Retriever id": partner_rag_retriever.retriever_id,
})
retriever_executor = retriever_executor_class(partner_rag_retriever.tenant_id,
partner_rag_retriever.retriever_id)
# Initialize retriever with its configuration
retrievers.append(retriever_executor)
return retrievers
def _config_task_agents(self):
self._add_task_agent("rag_task", "rag_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
def _config_state_result_relations(self):
self._add_state_result_relation("rag_output")
self._add_state_result_relation("citations")
def _instantiate_specialist(self):
verbose = self.tuning
rag_agents = [self.rag_agent]
rag_tasks = [self.rag_task]
self.rag_crew = EveAICrewAICrew(
self,
"Rag Crew",
agents=rag_agents,
tasks=rag_tasks,
verbose=verbose,
)
self.flow = RAGFlow(
self,
self.rag_crew,
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist execution started", {})
if not self._cached_session.interactions:
specialist_phase = "initial"
else:
specialist_phase = self._cached_session.interactions[-1].specialist_results.get('phase', 'initial')
results = None
match specialist_phase:
case "initial":
results = self.execute_initial_state(arguments, formatted_context, citations)
case "rag":
results = self.execute_rag_state(arguments, formatted_context, citations)
self.log_tuning(f"RAG Specialist execution ended", {"Results": results.model_dump()})
return results
def execute_initial_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist initial_state execution started", {})
welcome_message = self.specialist.configuration.get('welcome_message', 'Welcome! You can start asking questions')
welcome_message = TranslationServices.translate(self.tenant_id, welcome_message, arguments.language)
self.flow.state.answer = welcome_message
self.flow.state.phase = "rag"
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
return results
def execute_rag_state(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist rag_state execution started", {})
insufficient_info_message = TranslationServices.translate(self.tenant_id,
INSUFFICIENT_INFORMATION_MESSAGE,
arguments.language)
formatted_context, citations = self._retrieve_context(arguments)
if formatted_context:
flow_inputs = {
"language": arguments.language,
"question": arguments.question,
"context": formatted_context,
"history": self.formatted_history,
"name": self.specialist.configuration.get('name', ''),
"welcome_message": self.specialist.configuration.get('welcome_message', '')
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
if flow_results.rag_output.insufficient_info:
flow_results.rag_output.answer = insufficient_info_message
rag_output = flow_results.rag_output
else:
rag_output = RAGOutput(answer=insufficient_info_message, insufficient_info=True)
self.flow.state.rag_output = rag_output
self.flow.state.citations = citations
self.flow.state.answer = rag_output.answer
self.flow.state.phase = "rag"
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
return results
class RAGSpecialistInput(BaseModel):
language: Optional[str] = Field(None, alias="language")
question: Optional[str] = Field(None, alias="question")
context: Optional[str] = Field(None, alias="context")
history: Optional[str] = Field(None, alias="history")
name: Optional[str] = Field(None, alias="name")
welcome_message: Optional[str] = Field(None, alias="welcome_message")
class RAGSpecialistResult(SpecialistResult):
rag_output: Optional[RAGOutput] = Field(None, alias="Rag Output")
class RAGFlowState(EveAIFlowState):
"""Flow state for RAG specialist that automatically updates from task outputs"""
input: Optional[RAGSpecialistInput] = None
rag_output: Optional[RAGOutput] = None
citations: Optional[List[Dict[str, Any]]] = None
class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
rag_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "RAG Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.rag_crew = rag_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_rag(self):
inputs = self.state.input.model_dump()
try:
crew_output = await self.rag_crew.kickoff_async(inputs=inputs)
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = RAGOutput.model_validate(raw_json)
self.state.rag_output = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
self.state.input = RAGSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -1,239 +0,0 @@
from datetime import datetime
from typing import Dict, Any, List
from flask import current_app
from langchain_core.exceptions import LangChainException
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from common.langchain.outputs.base import OutputRegistry
from common.langchain.outputs.rag import RAGOutput
from common.utils.business_event_context import current_event
from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments, SpecialistResult
from eveai_chat_workers.chat_session_cache import get_chat_history
from common.models.interaction import Specialist
from common.utils.model_utils import create_language_template, replace_variable_in_template, \
get_template
from eveai_chat_workers.specialists.base_specialist import BaseSpecialistExecutor
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
class SpecialistExecutor(BaseSpecialistExecutor):
"""
type: STANDARD_RAG
type_version: 1.0
Standard Q&A RAG Specialist implementation that combines retriever results
with LLM processing to generate answers.
"""
def __init__(self, tenant_id: int, specialist_id: int, session_id: str, task_id: str):
super().__init__(tenant_id, specialist_id, session_id, task_id)
# Check and load the specialist
specialist = Specialist.query.get_or_404(specialist_id)
# Set the specific configuration for the RAG Specialist
self.specialist_context = specialist.configuration.get('specialist_context', '')
self.temperature = specialist.configuration.get('temperature', 0.3)
self.tuning = specialist.tuning
# Initialize retrievers
self.retrievers = self._initialize_retrievers()
@property
def type(self) -> str:
return "STANDARD_RAG_SPECIALIST"
@property
def type_version(self) -> str:
return "1.0"
@property
def required_templates(self) -> List[str]:
"""List of required templates for this specialist"""
return ['rag', 'history']
def _detail_question(self, language: str, question: str) -> str:
"""Detail question based on conversation history"""
try:
# Get cached session history
cached_session = get_chat_history(self.session_id)
# Format history for the prompt
formatted_history = "\n\n".join([
f"HUMAN:\n{interaction.specialist_results.get('detailed_query')}\n\n"
f"AI:\n{interaction.specialist_results.get('answer')}"
for interaction in cached_session.interactions
])
# Get LLM and template
template, llm = get_template("history", temperature=0.3)
language_template = create_language_template(template, language)
# Create prompt
history_prompt = ChatPromptTemplate.from_template(language_template)
# Create chain
chain = (
history_prompt |
llm |
StrOutputParser()
)
# Execute chain
detailed_question = chain.invoke({
"history": formatted_history,
"question": question
})
if self.tuning:
self.log_tuning("_detail_question", {
"cached_session_id": cached_session.session_id,
"cached_session.interactions": str(cached_session.interactions),
"original_question": question,
"history_used": formatted_history,
"detailed_question": detailed_question,
})
return detailed_question
except Exception as e:
current_app.logger.error(f"Error detailing question: {e}")
return question # Fallback to original question
def execute(self, arguments: SpecialistArguments) -> SpecialistResult:
"""
Execute the RAG specialist to generate an answer
"""
start_time = datetime.now()
try:
with current_event.create_span("Specialist Detail Question"):
self.update_progress("Detail Question Start", {})
# Get required arguments
language = arguments.language
query = arguments.query
detailed_question = self._detail_question(language, query)
self.update_progress("Detail Question End", {})
# Log the start of retrieval process if tuning is enabled
with current_event.create_span("Specialist Retrieval"):
self.log_tuning("Starting context retrieval", {
"num_retrievers": len(self.retrievers),
"all arguments": arguments.model_dump(),
})
self.update_progress("EveAI Retriever Start", {})
# Get retriever-specific arguments
retriever_arguments = arguments.retriever_arguments
# Collect context from all retrievers
all_context = []
for retriever in self.retrievers:
# Get arguments for this specific retriever
retriever_id = str(retriever.retriever_id)
if retriever_id not in retriever_arguments:
current_app.logger.error(f"Missing arguments for retriever {retriever_id}")
continue
# Get the retriever's arguments and update the query
current_retriever_args = retriever_arguments[retriever_id]
if isinstance(retriever_arguments[retriever_id], RetrieverArguments):
updated_args = current_retriever_args.model_dump()
updated_args['query'] = detailed_question
retriever_args = RetrieverArguments(**updated_args)
else:
# Create a new RetrieverArguments instance from the dictionary
current_retriever_args['query'] = detailed_question
retriever_args = RetrieverArguments(**current_retriever_args)
# Each retriever gets its own specific arguments
retriever_result = retriever.retrieve(retriever_args)
all_context.extend(retriever_result)
# Sort by similarity if available and get unique contexts
all_context.sort(key=lambda x: x.similarity, reverse=True)
unique_contexts = []
seen_chunks = set()
for ctx in all_context:
if ctx.chunk not in seen_chunks:
unique_contexts.append(ctx)
seen_chunks.add(ctx.chunk)
self.log_tuning("Context retrieval completed", {
"total_contexts": len(all_context),
"unique_contexts": len(unique_contexts),
"average_similarity": sum(ctx.similarity for ctx in unique_contexts) / len(
unique_contexts) if unique_contexts else 0
})
self.update_progress("EveAI Retriever Complete", {})
# Prepare context for LLM
formatted_context = "\n\n".join([
f"SOURCE: {ctx.metadata.document_id}\n\n{ctx.chunk}"
for ctx in unique_contexts
])
with current_event.create_span("Specialist RAG invocation"):
try:
self.update_progress(self.task_id, "EveAI Chain Start", {})
template, llm = get_template("rag", self.temperature)
language_template = create_language_template(template, language)
full_template = replace_variable_in_template(
language_template,
"{tenant_context}",
self.specialist_context
)
if self.tuning:
self.log_tuning("Template preparation completed", {
"template": full_template,
"context": formatted_context,
"tenant_context": self.specialist_context,
})
# Create prompt
rag_prompt = ChatPromptTemplate.from_template(full_template)
# Setup chain components
setup_and_retrieval = RunnableParallel({
"context": lambda x: formatted_context,
"question": lambda x: x
})
# Get output schema for structured output
output_schema = OutputRegistry.get_schema(self.type)
structured_llm = llm.with_structured_output(output_schema)
chain = setup_and_retrieval | rag_prompt | structured_llm
raw_result = chain.invoke(detailed_question)
result = SpecialistResult.create_for_type(
self.type,
self.type_version,
detailed_question=detailed_question,
answer=raw_result.answer,
citations=[ctx.metadata.document_id for ctx in unique_contexts
if ctx.id in raw_result.citations],
insufficient_info=raw_result.insufficient_info
)
if self.tuning:
self.log_tuning("LLM chain execution completed", {
"Result": result.model_dump()
})
self.update_progress("EveAI Chain Complete", {})
except Exception as e:
current_app.logger.error(f"Error in LLM processing: {e}")
if self.tuning:
self.log_tuning("LLM processing error", {"error": str(e)})
raise
return result
except Exception as e:
current_app.logger.error(f'Error in RAG specialist execution: {str(e)}')
raise
# Register the specialist type
OutputRegistry.register("STANDARD_RAG_SPECIALIST", RAGOutput)