- Correct asynchronous behavior in the EveAICrewAI classes.
This commit is contained in:
@@ -13,6 +13,26 @@ GRAYLOG_PORT = int(os.environ.get('GRAYLOG_PORT', 12201))
|
|||||||
env = os.environ.get('FLASK_ENV', 'development')
|
env = os.environ.get('FLASK_ENV', 'development')
|
||||||
|
|
||||||
|
|
||||||
|
def pad_string(s, target_length=100, pad_char='-'):
|
||||||
|
"""
|
||||||
|
Pads a string with the specified character until it reaches the target length.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
s: The original string
|
||||||
|
target_length: The desired total length
|
||||||
|
pad_char: Character to use for padding
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The padded string
|
||||||
|
"""
|
||||||
|
current_length = len(s)
|
||||||
|
if current_length >= target_length:
|
||||||
|
return s
|
||||||
|
|
||||||
|
padding_needed = target_length - current_length - 1
|
||||||
|
return s + " " + (pad_char * padding_needed)
|
||||||
|
|
||||||
|
|
||||||
class TuningLogRecord(logging.LogRecord):
|
class TuningLogRecord(logging.LogRecord):
|
||||||
"""Extended LogRecord that handles both tuning and business event logging"""
|
"""Extended LogRecord that handles both tuning and business event logging"""
|
||||||
|
|
||||||
@@ -153,7 +173,7 @@ class TuningLogger:
|
|||||||
level=level,
|
level=level,
|
||||||
pathname='',
|
pathname='',
|
||||||
lineno=0,
|
lineno=0,
|
||||||
msg=message,
|
msg=pad_string(message, 100, '-'),
|
||||||
args=(),
|
args=(),
|
||||||
exc_info=None
|
exc_info=None
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -139,16 +139,8 @@ class RAGFlow(EveAICrewAIFlow[RAGFlowState]):
|
|||||||
self.exception_raised = True
|
self.exception_raised = True
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
async def execute_async(self, inputs=None):
|
async def kickoff_async(self, inputs=None):
|
||||||
async with current_event.create_span_async("RAG Specialist Execution"):
|
current_app.logger.debug(f"Async kickoff {self.name}")
|
||||||
self.specialist_executor.log_tuning("Inputs retrieved", inputs)
|
self.state.input = RAGSpecialistInput.model_validate(inputs)
|
||||||
self.state.input = RAGSpecialistInput.model_validate(inputs)
|
result = await super().kickoff_async(inputs)
|
||||||
self.specialist.update_progress("EveAI Flow Start", {"name": "RAG"})
|
return self.state
|
||||||
try:
|
|
||||||
result = await super().kickoff_async()
|
|
||||||
except Exception as e:
|
|
||||||
current_app.logger.error(f"Error kicking of Flow: {str(e)}")
|
|
||||||
|
|
||||||
self.specialist.update_progress("EveAI Flow End", {"name": "RAG"})
|
|
||||||
|
|
||||||
return self.state
|
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ class SpecialistExecutor(CrewAIBaseSpecialistExecutor):
|
|||||||
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
|
self._add_pydantic_output("rag_task", RAGOutput, "rag_output")
|
||||||
self._add_pydantic_output("spin_questions_task", SPINOutput, "spin_questions")
|
self._add_pydantic_output("spin_questions_task", SPINOutput, "spin_questions")
|
||||||
self._add_pydantic_output("identification_questions_task", LeadInfoOutput, "lead_identification_questions")
|
self._add_pydantic_output("identification_questions_task", LeadInfoOutput, "lead_identification_questions")
|
||||||
self._add_pydantic_output("rag_consolidation_task", RAGOutput, "rag_output")
|
self._add_pydantic_output("rag_consolidation_task", RAGOutput, "final_output")
|
||||||
|
|
||||||
def _instantiate_specialist(self):
|
def _instantiate_specialist(self):
|
||||||
verbose = self.tuning
|
verbose = self.tuning
|
||||||
@@ -275,7 +275,6 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
|
|||||||
additional_questions = self.state.lead_info.questions + "\n"
|
additional_questions = self.state.lead_info.questions + "\n"
|
||||||
if self.state.spin:
|
if self.state.spin:
|
||||||
additional_questions = additional_questions + self.state.spin.questions
|
additional_questions = additional_questions + self.state.spin.questions
|
||||||
current_app.logger.debug(f"Additional Questions: {additional_questions}")
|
|
||||||
inputs["additional_questions"] = additional_questions
|
inputs["additional_questions"] = additional_questions
|
||||||
try:
|
try:
|
||||||
crew_output = await self.rag_consolidation_crew.kickoff_async(inputs=inputs)
|
crew_output = await self.rag_consolidation_crew.kickoff_async(inputs=inputs)
|
||||||
@@ -291,19 +290,8 @@ class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
|
|||||||
self.exception_raised = True
|
self.exception_raised = True
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
async def execute_async(self, inputs=None):
|
async def kickoff_async(self, inputs=None):
|
||||||
current_app.logger.debug(f"Async kickoff {self.name}")
|
current_app.logger.debug(f"Async kickoff {self.name}")
|
||||||
async with current_event.create_span_async("SPIN Specialist Execution"):
|
self.state.input = SPINSpecialistInput.model_validate(inputs)
|
||||||
self.specialist_executor.log_tuning("Inputs retrieved", inputs)
|
result = await super().kickoff_async(inputs)
|
||||||
self.state.input = SPINSpecialistInput.model_validate(inputs)
|
return self.state
|
||||||
self.specialist.update_progress("EveAI Flow Start", {"name": "SPIN"})
|
|
||||||
try:
|
|
||||||
current_app.logger.debug(f"Async super kickoff {self.name}")
|
|
||||||
result = await super().kickoff_async()
|
|
||||||
current_app.logger.debug(f"Async super kickoff {self.name} ended")
|
|
||||||
except Exception as e:
|
|
||||||
current_app.logger.error(f"Error kicking of Flow: {str(e)}")
|
|
||||||
|
|
||||||
self.specialist.update_progress("EveAI Flow End", {"name": "SPIN"})
|
|
||||||
|
|
||||||
return self.state
|
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ class BaseSpecialistExecutor(ABC):
|
|||||||
self.session_id = session_id
|
self.session_id = session_id
|
||||||
self.task_id = task_id
|
self.task_id = task_id
|
||||||
self.tuning = False
|
self.tuning = False
|
||||||
self.tuning_logger = None
|
self.tuning_logger: TuningLogger = None
|
||||||
self._setup_tuning_logger()
|
self._setup_tuning_logger()
|
||||||
self.ept = ExecutionProgressTracker()
|
self.ept = ExecutionProgressTracker()
|
||||||
|
|
||||||
|
|||||||
@@ -42,10 +42,7 @@ class EveAICrewAIAgent(Agent):
|
|||||||
"""
|
"""
|
||||||
current_app.logger.debug(f"Task Execution {task.name} by {self.name}")
|
current_app.logger.debug(f"Task Execution {task.name} by {self.name}")
|
||||||
# with current_event.create_span(f"Task Execution {task.name} by {self.name}"):
|
# with current_event.create_span(f"Task Execution {task.name} by {self.name}"):
|
||||||
self.specialist.log_tuning("EveAI Agent Task Start",
|
self.specialist.log_tuning(f"EveAI Agent {self.name}, Task {task.name} Start", {})
|
||||||
{"name": self.name,
|
|
||||||
'task': task.name,
|
|
||||||
})
|
|
||||||
self.specialist.update_progress("EveAI Agent Task Start",
|
self.specialist.update_progress("EveAI Agent Task Start",
|
||||||
{"name": self.name,
|
{"name": self.name,
|
||||||
'task': task.name,
|
'task': task.name,
|
||||||
@@ -53,17 +50,11 @@ class EveAICrewAIAgent(Agent):
|
|||||||
|
|
||||||
result = super().execute_task(task, context, tools)
|
result = super().execute_task(task, context, tools)
|
||||||
|
|
||||||
self.specialist.log_tuning("EveAI Agent Task Complete",
|
self.specialist.log_tuning(f"EveAI Agent {self.name}, Task {task.name} Complete", {})
|
||||||
{"name": self.name,
|
|
||||||
'task': task.name,
|
|
||||||
'result': result,
|
|
||||||
})
|
|
||||||
self.specialist.update_progress("EveAI Agent Task Complete",
|
self.specialist.update_progress("EveAI Agent Task Complete",
|
||||||
{"name": self.name,
|
{"name": self.name,
|
||||||
'task': task.name,
|
'task': task.name,
|
||||||
})
|
})
|
||||||
|
|
||||||
current_app.logger.debug(f"Task Execution Ended {task.name} by {self.name}")
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@@ -90,25 +81,6 @@ class EveAICrewAICrew(Crew):
|
|||||||
self.specialist = specialist
|
self.specialist = specialist
|
||||||
self.name = name
|
self.name = name
|
||||||
|
|
||||||
def kickoff(
|
|
||||||
self,
|
|
||||||
inputs: Optional[Dict[str, Any]] = None,
|
|
||||||
) -> CrewOutput:
|
|
||||||
with current_event.create_span(f"Crew {self.name} kickoff"):
|
|
||||||
start_time = time.time()
|
|
||||||
results = super().kickoff(inputs)
|
|
||||||
end_time = time.time()
|
|
||||||
metrics = {
|
|
||||||
"total_tokens": self.usage_metrics.total_tokens,
|
|
||||||
"prompt_tokens": self.usage_metrics.prompt_tokens,
|
|
||||||
"completion_tokens": self.usage_metrics.completion_tokens,
|
|
||||||
"time_elapsed": end_time - start_time,
|
|
||||||
"interaction_type": "Crew Execution"
|
|
||||||
}
|
|
||||||
current_event.log_llm_metrics(metrics)
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
async def kickoff_async(
|
async def kickoff_async(
|
||||||
self,
|
self,
|
||||||
inputs: Optional[Dict[str, Any]] = None,
|
inputs: Optional[Dict[str, Any]] = None,
|
||||||
@@ -124,6 +96,7 @@ class EveAICrewAICrew(Crew):
|
|||||||
"time_elapsed": end_time - start_time,
|
"time_elapsed": end_time - start_time,
|
||||||
"interaction_type": "Crew Execution"
|
"interaction_type": "Crew Execution"
|
||||||
}
|
}
|
||||||
|
self.specialist.log_tuning(f"Crew {self.name} async metrics", metrics)
|
||||||
current_event.log_llm_metrics(metrics)
|
current_event.log_llm_metrics(metrics)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
@@ -141,12 +114,21 @@ class EveAICrewAIFlow(Flow):
|
|||||||
self.specialist.log_tuning("Initializing EveAICrewAIFlow", {"name": self.name})
|
self.specialist.log_tuning("Initializing EveAICrewAIFlow", {"name": self.name})
|
||||||
self.specialist.update_progress("EveAI Flow Initialisation", {"name": self.name})
|
self.specialist.update_progress("EveAI Flow Initialisation", {"name": self.name})
|
||||||
|
|
||||||
def kickoff(self, inputs=None):
|
async def kickoff_async(self, inputs=None) -> Any:
|
||||||
result = asyncio.run(self.execute_async(inputs=inputs))
|
"""Properly override the library's async method"""
|
||||||
|
async with current_event.create_span_async(f"Flow {self.name} execution"):
|
||||||
|
self.specialist.log_tuning("Inputs retrieved", inputs)
|
||||||
|
self.specialist.update_progress("EveAI Flow Start", {"name": self.name})
|
||||||
|
|
||||||
@abstractmethod
|
try:
|
||||||
async def execute_async(self, inputs=None):
|
# Call parent's kickoff_async to handle start methods
|
||||||
raise NotImplementedError
|
result = await super().kickoff_async(inputs)
|
||||||
|
except Exception as e:
|
||||||
|
current_app.logger.error(f"Error in Flow kickoff_async: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
self.specialist.update_progress("EveAI Flow End", {"name": self.name})
|
||||||
|
return self.state
|
||||||
|
|
||||||
|
|
||||||
class EveAIFlowState(BaseModel):
|
class EveAIFlowState(BaseModel):
|
||||||
|
|||||||
Reference in New Issue
Block a user