diff --git a/config/specialists/RAG_SPECIALIST/1.0.0.yaml b/config/specialists/RAG_SPECIALIST/1.0.0.yaml new file mode 100644 index 0000000..b068790 --- /dev/null +++ b/config/specialists/RAG_SPECIALIST/1.0.0.yaml @@ -0,0 +1,53 @@ +version: "1.0.0" +name: "RAG Specialist" +framework: "crewai" +configuration: + name: + name: "name" + type: "str" + description: "The name the specialist is called upon." + required: true + company: + name: "company" + type: "str" + description: "The name of your company. If not provided, your tenant's name will be used." + required: false +arguments: + language: + name: "Language" + type: "str" + description: "Language code to be used for receiving questions and giving answers" + required: true + query: + name: "query" + type: "str" + description: "Query or response to process" + required: true +results: + rag_output: + answer: + name: "answer" + type: "str" + description: "Answer to the query" + required: true + citations: + name: "citations" + type: "List[str]" + description: "List of citations" + required: false + insufficient_info: + name: "insufficient_info" + type: "bool" + description: "Whether or not the query is insufficient info" + required: true +agents: + - type: "RAG_AGENT" + version: "1.0" +tasks: + - type: "RAG_TASK" + version: "1.0" +metadata: + author: "Josako" + date_added: "2025-01-08" + changes: "Initial version" + description: "A Specialist that performs Q&A activities" \ No newline at end of file diff --git a/config/type_defs/specialist_types.py b/config/type_defs/specialist_types.py index d5890e7..1970197 100644 --- a/config/type_defs/specialist_types.py +++ b/config/type_defs/specialist_types.py @@ -4,6 +4,10 @@ SPECIALIST_TYPES = { "name": "Q&A RAG Specialist", "description": "Standard Q&A through RAG Specialist", }, + "RAG_SPECIALIST": { + "name": "RAG Specialist", + "description": "Q&A through RAG Specialist", + }, "SPIN_SPECIALIST": { "name": "Spin Sales Specialist", "description": "A specialist that allows to answer user queries, try to get SPIN-information and Identification", diff --git a/eveai_chat_workers/retrievers/standard_rag.py b/eveai_chat_workers/retrievers/standard_rag.py index 4c1b087..ac975d9 100644 --- a/eveai_chat_workers/retrievers/standard_rag.py +++ b/eveai_chat_workers/retrievers/standard_rag.py @@ -27,9 +27,11 @@ class StandardRAGRetriever(BaseRetriever): self.catalog_id = retriever.catalog_id self.tenant_id = tenant_id catalog = Catalog.query.get_or_404(self.catalog_id) + embedding_model = "mistral.mistral-embed" + self.embedding_model, self.embedding_model_class = get_embedding_model_and_class(self.tenant_id, self.catalog_id, - catalog.embedding_model) + embedding_model) self.similarity_threshold = retriever.configuration.get('es_similarity_threshold', 0.3) self.k = retriever.configuration.get('es_k', 8) self.tuning = retriever.tuning diff --git a/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py new file mode 100644 index 0000000..5ac065b --- /dev/null +++ b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py @@ -0,0 +1,156 @@ +import json +from os import wait +from typing import Optional, List + +from crewai.flow.flow import start, listen, and_ +from flask import current_app +from pydantic import BaseModel, Field + +from common.utils.business_event_context import current_event +from eveai_chat_workers.retrievers.retriever_typing import RetrieverArguments +from eveai_chat_workers.specialists.crewai_base_specialist import CrewAIBaseSpecialistExecutor +from eveai_chat_workers.specialists.specialist_typing import SpecialistResult, SpecialistArguments +from eveai_chat_workers.outputs.rag.rag_v1_0 import RAGOutput +from eveai_chat_workers.specialists.crewai_base_classes import EveAICrewAICrew, EveAICrewAIFlow, EveAIFlowState + + +class SpecialistExecutor(CrewAIBaseSpecialistExecutor): + """ + type: RAG_SPECIALIST + type_version: 1.0 + RAG Specialist Executor class + """ + + def __init__(self, tenant_id, specialist_id, session_id, task_id, **kwargs): + self.rag_crew = None + + super().__init__(tenant_id, specialist_id, session_id, task_id) + + @property + def type(self) -> str: + return "RAG_SPECIALIST" + + @property + def type_version(self) -> str: + return "1.0" + + def _config_task_agents(self): + self._add_task_agent("rag_task", "rag_agent") + + def _config_pydantic_outputs(self): + self._add_pydantic_output("rag_task", RAGOutput, "rag_output") + + def _instantiate_specialist(self): + verbose = self.tuning + + rag_agents = [self.rag_agent] + rag_tasks = [self.rag_task] + self.rag_crew = EveAICrewAICrew( + self, + "Rag Crew", + agents=rag_agents, + tasks=rag_tasks, + verbose=verbose, + ) + + self.flow = RAGFlow( + self, + self.rag_crew, + ) + + def execute(self, arguments: SpecialistArguments) -> SpecialistResult: + formatted_context, citations = self.retrieve_context(arguments) + + self.log_tuning("RAG Specialist execution started", {}) + + flow_inputs = { + "language": arguments.language, + "query": arguments.query, + "context": formatted_context, + "citations": citations, + "history": self.formatted_history, + "name": self.specialist.configuration.get('name', ''), + "company": self.specialist.configuration.get('company', ''), + } + # crew_results = self.rag_crew.kickoff(inputs=flow_inputs) + # current_app.logger.debug(f"Test Crew Output received: {crew_results}") + flow_results = self.flow.kickoff(inputs=flow_inputs) + + flow_state = self.flow.state + + results = RAGSpecialistResult.create_for_type(self.type, self.type_version) + update_data = {} + if flow_state.rag_output: # Fallback + update_data["rag_output"] = flow_state.rag_output + + results = results.model_copy(update=update_data) + + self.log_tuning(f"RAG Specialist execution ended", {"Results": results.model_dump()}) + + return results + + +class RAGSpecialistInput(BaseModel): + language: Optional[str] = Field(None, alias="language") + query: Optional[str] = Field(None, alias="query") + context: Optional[str] = Field(None, alias="context") + citations: Optional[List[int]] = Field(None, alias="citations") + history: Optional[str] = Field(None, alias="history") + name: Optional[str] = Field(None, alias="name") + company: Optional[str] = Field(None, alias="company") + + +class RAGSpecialistResult(SpecialistResult): + rag_output: Optional[RAGOutput] = Field(None, alias="Rag Output") + + +class RAGFlowState(EveAIFlowState): + """Flow state for RAG specialist that automatically updates from task outputs""" + input: Optional[RAGSpecialistInput] = None + rag_output: Optional[RAGOutput] = None + + +class RAGFlow(EveAICrewAIFlow[RAGFlowState]): + def __init__(self, + specialist_executor: CrewAIBaseSpecialistExecutor, + rag_crew: EveAICrewAICrew, + **kwargs): + super().__init__(specialist_executor, "RAG Specialist Flow", **kwargs) + self.specialist_executor = specialist_executor + self.rag_crew = rag_crew + self.exception_raised = False + + @start() + def process_inputs(self): + return "" + + @listen(process_inputs) + def execute_rag(self): + inputs = self.state.input.model_dump() + try: + crew_output = self.rag_crew.kickoff(inputs=inputs) + self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump()) + output_pydantic = crew_output.pydantic + if not output_pydantic: + raw_json = json.loads(crew_output.raw) + output_pydantic = RAGOutput.model_validate(raw_json) + self.state.rag_output = output_pydantic + return crew_output + except Exception as e: + current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}") + self.exception_raised = True + raise e + + def kickoff(self, inputs=None): + with current_event.create_span("RAG Specialist Execution"): + self.specialist_executor.log_tuning("Inputs retrieved", inputs) + self.state.input = RAGSpecialistInput.model_validate(inputs) + self.specialist.update_progress("EveAI Flow Start", {"name": "RAG"}) + try: + result = super().kickoff() + except Exception as e: + current_app.logger.error(f"Error kicking of Flow: {str(e)}") + + self.specialist.update_progress("EveAI Flow End", {"name": "RAG"}) + + return self.state diff --git a/eveai_chat_workers/specialists/RAG_SPECIALIST/__init__.py b/eveai_chat_workers/specialists/RAG_SPECIALIST/__init__.py new file mode 100644 index 0000000..e69de29