diff --git a/config/logging_config.py b/config/logging_config.py index e44f83e..a8605f1 100644 --- a/config/logging_config.py +++ b/config/logging_config.py @@ -13,6 +13,26 @@ GRAYLOG_PORT = int(os.environ.get('GRAYLOG_PORT', 12201)) env = os.environ.get('FLASK_ENV', 'development') +def pad_string(s, target_length=100, pad_char='-'): + """ + Pads a string with the specified character until it reaches the target length. + + Args: + s: The original string + target_length: The desired total length + pad_char: Character to use for padding + + Returns: + The padded string + """ + current_length = len(s) + if current_length >= target_length: + return s + + padding_needed = target_length - current_length - 1 + return s + " " + (pad_char * padding_needed) + + class TuningLogRecord(logging.LogRecord): """Extended LogRecord that handles both tuning and business event logging""" @@ -153,7 +173,7 @@ class TuningLogger: level=level, pathname='', lineno=0, - msg=message, + msg=pad_string(message, 100, '-'), args=(), exc_info=None ) diff --git a/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py index 87bfe82..9c580cf 100644 --- a/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py +++ b/eveai_chat_workers/specialists/RAG_SPECIALIST/1_0.py @@ -139,16 +139,8 @@ class RAGFlow(EveAICrewAIFlow[RAGFlowState]): self.exception_raised = True raise e - async def execute_async(self, inputs=None): - async with current_event.create_span_async("RAG Specialist Execution"): - self.specialist_executor.log_tuning("Inputs retrieved", inputs) - self.state.input = RAGSpecialistInput.model_validate(inputs) - self.specialist.update_progress("EveAI Flow Start", {"name": "RAG"}) - try: - result = await super().kickoff_async() - except Exception as e: - current_app.logger.error(f"Error kicking of Flow: {str(e)}") - - self.specialist.update_progress("EveAI Flow End", {"name": "RAG"}) - - return self.state + async def kickoff_async(self, inputs=None): + current_app.logger.debug(f"Async kickoff {self.name}") + self.state.input = RAGSpecialistInput.model_validate(inputs) + result = await super().kickoff_async(inputs) + return self.state diff --git a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py index 2b0d70e..4cff274 100644 --- a/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py +++ b/eveai_chat_workers/specialists/SPIN_SPECIALIST/1_0.py @@ -64,7 +64,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor): self._add_pydantic_output("rag_task", RAGOutput, "rag_output") self._add_pydantic_output("spin_questions_task", SPINOutput, "spin_questions") self._add_pydantic_output("identification_questions_task", LeadInfoOutput, "lead_identification_questions") - self._add_pydantic_output("rag_consolidation_task", RAGOutput, "rag_output") + self._add_pydantic_output("rag_consolidation_task", RAGOutput, "final_output") def _instantiate_specialist(self): verbose = self.tuning @@ -275,7 +275,6 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): additional_questions = self.state.lead_info.questions + "\n" if self.state.spin: additional_questions = additional_questions + self.state.spin.questions - current_app.logger.debug(f"Additional Questions: {additional_questions}") inputs["additional_questions"] = additional_questions try: crew_output = await self.rag_consolidation_crew.kickoff_async(inputs=inputs) @@ -291,19 +290,8 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]): self.exception_raised = True raise e - async def execute_async(self, inputs=None): + async def kickoff_async(self, inputs=None): current_app.logger.debug(f"Async kickoff {self.name}") - async with current_event.create_span_async("SPIN Specialist Execution"): - self.specialist_executor.log_tuning("Inputs retrieved", inputs) - self.state.input = SPINSpecialistInput.model_validate(inputs) - self.specialist.update_progress("EveAI Flow Start", {"name": "SPIN"}) - try: - current_app.logger.debug(f"Async super kickoff {self.name}") - result = await super().kickoff_async() - current_app.logger.debug(f"Async super kickoff {self.name} ended") - except Exception as e: - current_app.logger.error(f"Error kicking of Flow: {str(e)}") - - self.specialist.update_progress("EveAI Flow End", {"name": "SPIN"}) - - return self.state + self.state.input = SPINSpecialistInput.model_validate(inputs) + result = await super().kickoff_async(inputs) + return self.state diff --git a/eveai_chat_workers/specialists/base_specialist.py b/eveai_chat_workers/specialists/base_specialist.py index cb252a0..cc14eb1 100644 --- a/eveai_chat_workers/specialists/base_specialist.py +++ b/eveai_chat_workers/specialists/base_specialist.py @@ -20,7 +20,7 @@ class BaseSpecialistExecutor(ABC): self.session_id = session_id self.task_id = task_id self.tuning = False - self.tuning_logger = None + self.tuning_logger: TuningLogger = None self._setup_tuning_logger() self.ept = ExecutionProgressTracker() diff --git a/eveai_chat_workers/specialists/crewai_base_classes.py b/eveai_chat_workers/specialists/crewai_base_classes.py index 16750f5..d99c9fb 100644 --- a/eveai_chat_workers/specialists/crewai_base_classes.py +++ b/eveai_chat_workers/specialists/crewai_base_classes.py @@ -42,10 +42,7 @@ class EveAICrewAIAgent(Agent): """ current_app.logger.debug(f"Task Execution {task.name} by {self.name}") # with current_event.create_span(f"Task Execution {task.name} by {self.name}"): - self.specialist.log_tuning("EveAI Agent Task Start", - {"name": self.name, - 'task': task.name, - }) + self.specialist.log_tuning(f"EveAI Agent {self.name}, Task {task.name} Start", {}) self.specialist.update_progress("EveAI Agent Task Start", {"name": self.name, 'task': task.name, @@ -53,17 +50,11 @@ class EveAICrewAIAgent(Agent): result = super().execute_task(task, context, tools) - self.specialist.log_tuning("EveAI Agent Task Complete", - {"name": self.name, - 'task': task.name, - 'result': result, - }) + self.specialist.log_tuning(f"EveAI Agent {self.name}, Task {task.name} Complete", {}) self.specialist.update_progress("EveAI Agent Task Complete", {"name": self.name, 'task': task.name, }) - - current_app.logger.debug(f"Task Execution Ended {task.name} by {self.name}") return result @@ -90,25 +81,6 @@ class EveAICrewAICrew(Crew): self.specialist = specialist self.name = name - def kickoff( - self, - inputs: Optional[Dict[str, Any]] = None, - ) -> CrewOutput: - with current_event.create_span(f"Crew {self.name} kickoff"): - start_time = time.time() - results = super().kickoff(inputs) - end_time = time.time() - metrics = { - "total_tokens": self.usage_metrics.total_tokens, - "prompt_tokens": self.usage_metrics.prompt_tokens, - "completion_tokens": self.usage_metrics.completion_tokens, - "time_elapsed": end_time - start_time, - "interaction_type": "Crew Execution" - } - current_event.log_llm_metrics(metrics) - - return results - async def kickoff_async( self, inputs: Optional[Dict[str, Any]] = None, @@ -124,6 +96,7 @@ class EveAICrewAICrew(Crew): "time_elapsed": end_time - start_time, "interaction_type": "Crew Execution" } + self.specialist.log_tuning(f"Crew {self.name} async metrics", metrics) current_event.log_llm_metrics(metrics) return results @@ -141,12 +114,21 @@ class EveAICrewAIFlow(Flow): self.specialist.log_tuning("Initializing EveAICrewAIFlow", {"name": self.name}) self.specialist.update_progress("EveAI Flow Initialisation", {"name": self.name}) - def kickoff(self, inputs=None): - result = asyncio.run(self.execute_async(inputs=inputs)) + async def kickoff_async(self, inputs=None) -> Any: + """Properly override the library's async method""" + async with current_event.create_span_async(f"Flow {self.name} execution"): + self.specialist.log_tuning("Inputs retrieved", inputs) + self.specialist.update_progress("EveAI Flow Start", {"name": self.name}) - @abstractmethod - async def execute_async(self, inputs=None): - raise NotImplementedError + try: + # Call parent's kickoff_async to handle start methods + result = await super().kickoff_async(inputs) + except Exception as e: + current_app.logger.error(f"Error in Flow kickoff_async: {str(e)}") + raise + + self.specialist.update_progress("EveAI Flow End", {"name": self.name}) + return self.state class EveAIFlowState(BaseModel):