Files
eveAI/common/utils/business_event_context.py
Josako b6ee7182de - Adding Prometheus and grafana services in development
- Adding Prometheus metrics to the business events
- Ensure asynchronous behaviour of crewai specialists.
- Adapt Business events to working in mixed synchronous / asynchronous contexts
- Extend business events with specialist information
- Started adding a grafana dashboard (TBC)
2025-03-24 16:39:22 +01:00

52 lines
1.6 KiB
Python

from werkzeug.local import LocalProxy, LocalStack
import asyncio
from contextvars import ContextVar
import contextvars
# Keep existing stack for backward compatibility
_business_event_stack = LocalStack()
# Add contextvar for async support
_business_event_contextvar = ContextVar('business_event', default=None)
def _get_current_event():
# Try contextvar first (for async)
event = _business_event_contextvar.get()
if event is not None:
return event
# Fall back to the stack-based approach (for sync)
top = _business_event_stack.top
if top is None:
raise RuntimeError("No business event context found. Are you sure you're in a business event?")
return top
current_event = LocalProxy(_get_current_event)
class BusinessEventContext:
def __init__(self, event):
self.event = event
self._token = None # For storing contextvar token
def __enter__(self):
_business_event_stack.push(self.event)
self._token = _business_event_contextvar.set(self.event)
return self.event
def __exit__(self, exc_type, exc_val, exc_tb):
_business_event_stack.pop()
if self._token is not None:
_business_event_contextvar.reset(self._token)
async def __aenter__(self):
_business_event_stack.push(self.event)
self._token = _business_event_contextvar.set(self.event)
return self.event
async def __aexit__(self, exc_type, exc_val, exc_tb):
_business_event_stack.pop()
if self._token is not None:
_business_event_contextvar.reset(self._token)