- Move global config files to globals iso global folder, as the name global conflicts with python language

- Creation of Traicie Vancancy Definition specialist
- Allow to invoke non-interaction specialists from withing Evie's mgmt interface (eveai_app)
- Improvements to crewai specialized classes
- Introduction to json editor for showing specialists arguments and results in a better way
- Introduction of more complex pagination (adding extra arguments) by adding a global 'get_pagination_html'
- Allow follow-up of ChatSession / Specialist execution
- Improvement in logging of Specialists (but needs to be finished)
This commit is contained in:
Josako
2025-05-26 11:26:03 +02:00
parent d789e431ca
commit 1fdbd2ff45
94 changed files with 1657 additions and 443 deletions

View File

@@ -0,0 +1,145 @@
import json
from os import wait
from typing import Optional, List
from crewai.flow.flow import start, listen, and_
from flask import current_app
from pydantic import BaseModel, Field
from common.utils.business_event_context import current_event
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.globals.rag.rag_v1_0 import RAGOutput
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: RAG_SPECIALIST
type_version: 1.0
RAG Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.rag_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
@property
def type(self) -> str:
return "RAG_SPECIALIST"
@property
def type_version(self) -> str:
return "1.0"
def _config_task_agents(self):
self._add_task_agent("rag_task", "rag_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
def _instantiate_specialist(self):
verbose = self.tuning
rag_agents = [self.rag_agent]
rag_tasks = [self.rag_task]
self.rag_crew = EveAICrewAICrew(
self,
"Rag Crew",
agents=rag_agents,
tasks=rag_tasks,
verbose=verbose,
)
self.flow = RAGFlow(
self,
self.rag_crew,
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("RAG Specialist execution started", {})
flow_inputs = {
"language": arguments.language,
"query": arguments.query,
"context": formatted_context,
"history": self.formatted_history,
"name": self.specialist.configuration.get('name', ''),
"company": self.specialist.configuration.get('company', ''),
}
# crew_results = self.rag_crew.kickoff(inputs=flow_inputs)
# current_app.logger.debug(f"Test Crew Output received: {crew_results}")
flow_results = self.flow.kickoff(inputs=flow_inputs)
flow_state = self.flow.state
results = RAGSpecialistResult.create_for_type(self.type, self.type_version)
update_data = {}
if flow_state.rag_output: # Fallback
update_data["rag_output"] = flow_state.rag_output
results = results.model_copy(update=update_data)
self.log_tuning(f"RAG Specialist execution ended", {"Results": results.model_dump()})
return results
class RAGSpecialistInput(BaseModel):
language: Optional[str] = Field(None, alias="language")
query: Optional[str] = Field(None, alias="query")
context: Optional[str] = Field(None, alias="context")
citations: Optional[List[int]] = Field(None, alias="citations")
history: Optional[str] = Field(None, alias="history")
name: Optional[str] = Field(None, alias="name")
company: Optional[str] = Field(None, alias="company")
class RAGSpecialistResult(SpecialistResult):
rag_output: Optional[RAGOutput] = Field(None, alias="Rag Output")
class RAGFlowState(EveAIFlowState):
"""Flow state for RAG specialist that automatically updates from task outputs"""
input: Optional[RAGSpecialistInput] = None
rag_output: Optional[RAGOutput] = None
class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
rag_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "RAG Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.rag_crew = rag_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_rag(self):
inputs = self.state.input.model_dump()
try:
crew_output = await self.rag_crew.kickoff_async(inputs=inputs)
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = RAGOutput.model_validate(raw_json)
self.state.rag_output = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
self.state.input = RAGSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -0,0 +1,301 @@
import asyncio
import json
from os import wait
from typing import Optional, List
from crewai.flow.flow import start, listen, and_
from flask import current_app
from gevent import sleep
from pydantic import BaseModel, Field
from common.extensions import cache_manager
from common.models.user import Tenant
from common.utils.business_event_context import current_event
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor
from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments
from eveai_chat_workers.outputs.globals.identification.identification_v1_0 import LeadInfoOutput
from eveai_chat_workers.outputs.globals.spin.spin_v1_0 import SPINOutput
from eveai_chat_workers.outputs.globals.rag.rag_v1_0 import RAGOutput
from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState
from common.utils.pydantic_utils import flatten_pydantic_model
class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
"""
type: SPIN_SPECIALIST
type_version: 1.0
SPIN Specialist Executor class
"""
def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs):
self.rag_crew = None
self.spin_crew = None
self.identification_crew = None
self.rag_consolidation_crew = None
super().__init__(tenant_id, specialist_id, session_id, task_id)
# Load the Tenant & set language
self.tenant = Tenant.query.get_or_404(tenant_id)
if self.specialist.configuration['tenant_language'] is None:
self.specialist.configuration['tenant_language'] = self.tenant.language
@property
def type(self) -> str:
return "SPIN_SPECIALIST"
@property
def type_version(self) -> str:
return "1.0"
def _config_task_agents(self):
self._add_task_agent("rag_task", "rag_agent")
self._add_task_agent("spin_detect_task", "spin_detection_agent")
self._add_task_agent("spin_questions_task", "spin_sales_specialist_agent")
self._add_task_agent("identification_detection_task", "identification_agent")
self._add_task_agent("identification_questions_task", "identification_agent")
self._add_task_agent("email_lead_drafting_task", "email_content_agent")
self._add_task_agent("email_lead_engagement_task", "email_engagement_agent")
self._add_task_agent("email_lead_retrieval_task", "email_engagement_agent")
self._add_task_agent("rag_consolidation_task", "rag_communication_agent")
def _config_pydantic_outputs(self):
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
self._add_pydantic_output("spin_questions_task", SPINOutput, "spin_questions")
self._add_pydantic_output("identification_questions_task", LeadInfoOutput, "lead_identification_questions")
self._add_pydantic_output("rag_consolidation_task", RAGOutput, "final_output")
def _instantiate_specialist(self):
verbose = self.tuning
rag_agents = [self.rag_agent]
rag_tasks = [self.rag_task]
self.rag_crew = EveAICrewAICrew(
self,
"Rag Crew",
agents=rag_agents,
tasks=rag_tasks,
verbose=verbose,
)
spin_agents = [self.spin_detection_agent, self.spin_sales_specialist_agent]
spin_tasks = [self.spin_detect_task, self.spin_questions_task]
self.spin_crew = EveAICrewAICrew(
self,
"SPIN Crew",
agents=spin_agents,
tasks=spin_tasks,
verbose=verbose,
)
identification_agents = [self.identification_agent]
identification_tasks = [self.identification_detection_task, self.identification_questions_task]
self.identification_crew = EveAICrewAICrew(
self,
"Identification Crew",
agents=identification_agents,
tasks=identification_tasks,
verbose=verbose,
)
consolidation_agents = [self.rag_communication_agent]
consolidation_tasks = [self.rag_consolidation_task]
self.rag_consolidation_crew = EveAICrewAICrew(
self,
"Rag Consolidation Crew",
agents=consolidation_agents,
tasks=consolidation_tasks,
verbose=verbose,
)
self.flow = SPINFlow(
self,
self.rag_crew,
self.spin_crew,
self.identification_crew,
self.rag_consolidation_crew
)
def execute(self, arguments: SpecialistArguments, formatted_context, citations) -> SpecialistResult:
self.log_tuning("SPIN Specialist execution started", {})
flow_inputs = {
"language": arguments.language,
"query": arguments.query,
"context": formatted_context,
"history": self.formatted_history,
"historic_spin": json.dumps(self.latest_spin, indent=2),
"historic_lead_info": json.dumps(self.latest_lead_info, indent=2),
"name": self.specialist.configuration.get('name', ''),
"company": self.specialist.configuration.get('company', ''),
"products": self.specialist.configuration.get('products', ''),
"product_information": self.specialist.configuration.get('product_information', ''),
"engagement_options": self.specialist.configuration.get('engagement_options', ''),
"tenant_language": self.specialist.configuration.get('tenant_language', ''),
"nr_of_questions": self.specialist.configuration.get('nr_of_questions', ''),
"identification": arguments.identification,
}
flow_results = self.flow.kickoff(inputs=flow_inputs)
flow_state = self.flow.state
results = SPINSpecialistResult.create_for_type(self.type, self.type_version)
update_data = {}
if flow_state.final_output:
update_data["rag_output"] = flow_state.final_output
elif flow_state.rag_output: # Fallback
update_data["rag_output"] = flow_state.rag_output
if flow_state.spin:
update_data["spin"] = flow_state.spin
if flow_state.lead_info:
update_data["lead_info"] = flow_state.lead_info
results = results.model_copy(update=update_data)
self.log_tuning(f"SPIN Specialist execution ended", {"Results": results.model_dump()})
return results
class SPINSpecialistInput(BaseModel):
language: Optional[str] = Field(None, alias="language")
query: Optional[str] = Field(None, alias="query")
context: Optional[str] = Field(None, alias="context")
citations: Optional[List[int]] = Field(None, alias="citations")
history: Optional[str] = Field(None, alias="history")
historic_spin: Optional[str] = Field(None, alias="historic_spin")
historic_lead_info: Optional[str] = Field(None, alias="historic_lead_info")
name: Optional[str] = Field(None, alias="name")
company: Optional[str] = Field(None, alias="company")
products: Optional[str] = Field(None, alias="products")
product_information: Optional[str] = Field(None, alias="product_information")
engagement_options: Optional[str] = Field(None, alias="engagement_options")
tenant_language: Optional[str] = Field(None, alias="tenant_language")
nr_of_questions: Optional[int] = Field(None, alias="nr_of_questions")
identification: Optional[str] = Field(None, alias="identification")
class SPINSpecialistResult(SpecialistResult):
rag_output: Optional[RAGOutput] = Field(None, alias="Rag Output")
spin: Optional[SPINOutput] = Field(None, alias="Spin Output")
lead_info: Optional[LeadInfoOutput] = Field(None, alias="Lead Info Output")
class SPINFlowState(EveAIFlowState):
"""Flow state for SPIN specialist that automatically updates from task outputs"""
input: Optional[SPINSpecialistInput] = None
rag_output: Optional[RAGOutput] = None
lead_info: Optional[LeadInfoOutput] = None
spin: Optional[SPINOutput] = None
final_output: Optional[RAGOutput] = None
class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
def __init__(self,
specialist_executor: CrewAIBaseSpecialistExecutor,
rag_crew: EveAICrewAICrew,
spin_crew: EveAICrewAICrew,
identification_crew: EveAICrewAICrew,
rag_consolidation_crew: EveAICrewAICrew,
**kwargs):
super().__init__(specialist_executor, "SPIN Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.rag_crew = rag_crew
self.spin_crew = spin_crew
self.identification_crew = identification_crew
self.rag_consolidation_crew = rag_consolidation_crew
self.exception_raised = False
@start()
def process_inputs(self):
return ""
@listen(process_inputs)
async def execute_rag(self):
inputs = self.state.input.model_dump()
try:
current_app.logger.debug("In execute_rag")
crew_output = await self.rag_crew.kickoff_async(inputs=inputs)
current_app.logger.debug(f"Crew execution ended with output:\n{crew_output}")
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = RAGOutput.model_validate(raw_json)
self.state.rag_output = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
@listen(process_inputs)
async def execute_spin(self):
inputs = self.state.input.model_dump()
try:
crew_output = await self.spin_crew.kickoff_async(inputs=inputs)
current_app.logger.info(f"SPIN Crew Executed, output: {crew_output.model_dump()}")
self.specialist_executor.log_tuning("Spin Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = SPINOutput.model_validate(raw_json)
self.state.spin = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW spin_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
@listen(process_inputs)
async def execute_identification(self):
inputs = self.state.input.model_dump()
try:
crew_output = await self.identification_crew.kickoff_async(inputs=inputs)
self.specialist_executor.log_tuning("Identification Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = LeadInfoOutput.model_validate(raw_json)
self.state.lead_info = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW identification_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
@listen(and_(execute_rag, execute_spin, execute_identification))
async def consolidate(self):
inputs = self.state.input.model_dump()
if self.state.rag_output:
inputs["prepared_answers"] = self.state.rag_output.answer
additional_questions = ""
if self.state.lead_info:
additional_questions = self.state.lead_info.questions + "\n"
if self.state.spin:
additional_questions = additional_questions + self.state.spin.questions
inputs["additional_questions"] = additional_questions
current_app.logger.debug(f"Prepared Answers: \n{inputs['prepared_answers']}")
current_app.logger.debug(f"Additional Questions: \n{additional_questions}")
try:
crew_output = await self.rag_consolidation_crew.kickoff_async(inputs=inputs)
current_app.logger.debug(f"Consolidation output after crew execution:\n{crew_output}")
self.specialist_executor.log_tuning("RAG Consolidation Crew Output", crew_output.model_dump())
output_pydantic = crew_output.pydantic
if not output_pydantic:
raw_json = json.loads(crew_output.raw)
output_pydantic = RAGOutput.model_validate(raw_json)
self.state.final_output = output_pydantic
return crew_output
except Exception as e:
current_app.logger.error(f"CREW rag_consolidation_crew Kickoff Error: {str(e)}")
self.exception_raised = True
raise e
async def kickoff_async(self, inputs=None):
current_app.logger.debug(f"Async kickoff {self.name}")
self.state.input = SPINSpecialistInput.model_validate(inputs)
result = await super().kickoff_async(inputs)
return self.state

View File

@@ -0,0 +1,239 @@
from datetime import datetime
from typing import Dict, Any, List
from flask import current_app
from langchain_core.exceptions import LangChainException
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from common.langchain.outputs.base import OutputRegistry
from common.langchain.outputs.rag import RAGOutput
from common.utils.business_event_context import current_event
from eveai_chat_workers.specialists.specialist_typing import SpecialistArguments, SpecialistResult
from eveai_chat_workers.chat_session_cache import get_chat_history
from common.models.interaction import Specialist
from common.utils.model_utils import create_language_template, replace_variable_in_template, \
get_template
from eveai_chat_workers.specialists.base_specialist import BaseSpecialistExecutor
from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments
class SpecialistExecutor(BaseSpecialistExecutor):
"""
type: STANDARD_RAG
type_version: 1.0
Standard Q&A RAG Specialist implementation that combines retriever results
with LLM processing to generate answers.
"""
def __init__(self, tenant_id: int, specialist_id: int, session_id: str, task_id: str):
super().__init__(tenant_id, specialist_id, session_id, task_id)
# Check and load the specialist
specialist = Specialist.query.get_or_404(specialist_id)
# Set the specific configuration for the RAG Specialist
self.specialist_context = specialist.configuration.get('specialist_context', '')
self.temperature = specialist.configuration.get('temperature', 0.3)
self.tuning = specialist.tuning
# Initialize retrievers
self.retrievers = self._initialize_retrievers()
@property
def type(self) -> str:
return "STANDARD_RAG_SPECIALIST"
@property
def type_version(self) -> str:
return "1.0"
@property
def required_templates(self) -> List[str]:
"""List of required templates for this specialist"""
return ['rag', 'history']
def _detail_question(self, language: str, question: str) -> str:
"""Detail question based on conversation history"""
try:
# Get cached session history
cached_session = get_chat_history(self.session_id)
# Format history for the prompt
formatted_history = "\n\n".join([
f"HUMAN:\n{interaction.specialist_results.get('detailed_query')}\n\n"
f"AI:\n{interaction.specialist_results.get('answer')}"
for interaction in cached_session.interactions
])
# Get LLM and template
template, llm = get_template("history", temperature=0.3)
language_template = create_language_template(template, language)
# Create prompt
history_prompt = ChatPromptTemplate.from_template(language_template)
# Create chain
chain = (
history_prompt |
llm |
StrOutputParser()
)
# Execute chain
detailed_question = chain.invoke({
"history": formatted_history,
"question": question
})
if self.tuning:
self.log_tuning("_detail_question", {
"cached_session_id": cached_session.session_id,
"cached_session.interactions": str(cached_session.interactions),
"original_question": question,
"history_used": formatted_history,
"detailed_question": detailed_question,
})
return detailed_question
except Exception as e:
current_app.logger.error(f"Error detailing question: {e}")
return question # Fallback to original question
def execute(self, arguments: SpecialistArguments) -> SpecialistResult:
"""
Execute the RAG specialist to generate an answer
"""
start_time = datetime.now()
try:
with current_event.create_span("Specialist Detail Question"):
self.update_progress("Detail Question Start", {})
# Get required arguments
language = arguments.language
query = arguments.query
detailed_question = self._detail_question(language, query)
self.update_progress("Detail Question End", {})
# Log the start of retrieval process if tuning is enabled
with current_event.create_span("Specialist Retrieval"):
self.log_tuning("Starting context retrieval", {
"num_retrievers": len(self.retrievers),
"all arguments": arguments.model_dump(),
})
self.update_progress("EveAI Retriever Start", {})
# Get retriever-specific arguments
retriever_arguments = arguments.retriever_arguments
# Collect context from all retrievers
all_context = []
for retriever in self.retrievers:
# Get arguments for this specific retriever
retriever_id = str(retriever.retriever_id)
if retriever_id not in retriever_arguments:
current_app.logger.error(f"Missing arguments for retriever {retriever_id}")
continue
# Get the retriever's arguments and update the query
current_retriever_args = retriever_arguments[retriever_id]
if isinstance(retriever_arguments[retriever_id], RetrieverArguments):
updated_args = current_retriever_args.model_dump()
updated_args['query'] = detailed_question
retriever_args = RetrieverArguments(**updated_args)
else:
# Create a new RetrieverArguments instance from the dictionary
current_retriever_args['query'] = detailed_question
retriever_args = RetrieverArguments(**current_retriever_args)
# Each retriever gets its own specific arguments
retriever_result = retriever.retrieve(retriever_args)
all_context.extend(retriever_result)
# Sort by similarity if available and get unique contexts
all_context.sort(key=lambda x: x.similarity, reverse=True)
unique_contexts = []
seen_chunks = set()
for ctx in all_context:
if ctx.chunk not in seen_chunks:
unique_contexts.append(ctx)
seen_chunks.add(ctx.chunk)
self.log_tuning("Context retrieval completed", {
"total_contexts": len(all_context),
"unique_contexts": len(unique_contexts),
"average_similarity": sum(ctx.similarity for ctx in unique_contexts) / len(
unique_contexts) if unique_contexts else 0
})
self.update_progress("EveAI Retriever Complete", {})
# Prepare context for LLM
formatted_context = "\n\n".join([
f"SOURCE: {ctx.metadata.document_id}\n\n{ctx.chunk}"
for ctx in unique_contexts
])
with current_event.create_span("Specialist RAG invocation"):
try:
self.update_progress(self.task_id, "EveAI Chain Start", {})
template, llm = get_template("rag", self.temperature)
language_template = create_language_template(template, language)
full_template = replace_variable_in_template(
language_template,
"{tenant_context}",
self.specialist_context
)
if self.tuning:
self.log_tuning("Template preparation completed", {
"template": full_template,
"context": formatted_context,
"tenant_context": self.specialist_context,
})
# Create prompt
rag_prompt = ChatPromptTemplate.from_template(full_template)
# Setup chain components
setup_and_retrieval = RunnableParallel({
"context": lambda x: formatted_context,
"question": lambda x: x
})
# Get output schema for structured output
output_schema = OutputRegistry.get_schema(self.type)
structured_llm = llm.with_structured_output(output_schema)
chain = setup_and_retrieval | rag_prompt | structured_llm
raw_result = chain.invoke(detailed_question)
result = SpecialistResult.create_for_type(
self.type,
self.type_version,
detailed_query=detailed_question,
answer=raw_result.answer,
citations=[ctx.metadata.document_id for ctx in unique_contexts
if ctx.id in raw_result.citations],
insufficient_info=raw_result.insufficient_info
)
if self.tuning:
self.log_tuning("LLM chain execution completed", {
"Result": result.model_dump()
})
self.update_progress("EveAI Chain Complete", {})
except Exception as e:
current_app.logger.error(f"Error in LLM processing: {e}")
if self.tuning:
self.log_tuning("LLM processing error", {"error": str(e)})
raise
return result
except Exception as e:
current_app.logger.error(f'Error in RAG specialist execution: {str(e)}')
raise
# Register the specialist type
OutputRegistry.register("STANDARD_RAG_SPECIALIST", RAGOutput)