- Improvements to EntitlementsDomain & Services - Prechecks in Document domain - Add audit information to LicenseUsage
125 lines
4.5 KiB
Python
125 lines
4.5 KiB
Python
import json
|
|
from datetime import datetime as dt
|
|
|
|
import requests
|
|
from flask import current_app
|
|
from eveai_client.platform.extensions import ui
|
|
|
|
|
|
def process_stream_updates(api_client, stream_url, task_id):
|
|
"""Process SSE stream updates and generate data for UI updates."""
|
|
print("Processing SSE stream updates...")
|
|
try:
|
|
# Initial update
|
|
yield {
|
|
'type': 'info',
|
|
'message': 'Connected to stream...',
|
|
'connected': True
|
|
}
|
|
|
|
# Get the SSE stream response
|
|
response = api_client.session.get(
|
|
stream_url,
|
|
headers=api_client._get_headers(),
|
|
stream=True,
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
|
|
for line in response.iter_lines():
|
|
if not line:
|
|
continue
|
|
|
|
line = line.decode('utf-8')
|
|
if not line.startswith('data: '):
|
|
continue
|
|
|
|
# Extract the data part
|
|
data = line[6:] # Skip 'data: '
|
|
print(f"SSE Line Received: {data}")
|
|
try:
|
|
update = json.loads(data)
|
|
update_type = update.get('processing_type', 'Unknown')
|
|
update_data = update.get('data', {})
|
|
|
|
# Create a log message
|
|
message = f"{update_type}"
|
|
if isinstance(update_data, dict):
|
|
if 'name' in update_data:
|
|
message += f" - {update_data.get('name', '')}"
|
|
if 'task' in update_data:
|
|
message += f" - {update_data.get('task', '')}"
|
|
|
|
# Determine log level
|
|
log_type = 'error' if update_type.lower().endswith('error') else 'info'
|
|
|
|
# Yield status update for logging
|
|
yield {
|
|
'type': log_type,
|
|
'message': message,
|
|
'task_id': task_id, # Make sure task_id is included
|
|
'processing_status': {
|
|
'message': 'Processing request',
|
|
'state': 'processing'
|
|
}
|
|
}
|
|
|
|
# If processing complete, send the answer
|
|
if update_type == 'EveAI Specialist Complete':
|
|
result = update_data.get('result', {})
|
|
rag_output = result.get('rag_output', {})
|
|
answer = rag_output.get('answer', "I'm sorry, I couldn't generate a response.")
|
|
|
|
yield {
|
|
'type': 'success',
|
|
'message': 'Processing complete',
|
|
'content': answer,
|
|
'complete': True, # Signal completion
|
|
'task_id': task_id, # Make sure task_id is included
|
|
'processing_status': {
|
|
'message': 'Complete',
|
|
'state': 'ready'
|
|
}
|
|
}
|
|
|
|
elif update_type == 'EveAI Specialist Error':
|
|
error_message = update_data.get('message', "An error occurred while processing your request.")
|
|
|
|
yield {
|
|
'type': 'error',
|
|
'message': f'Error: {error_message}',
|
|
'content': f"Error: {error_message}",
|
|
'complete': True, # Signal completion
|
|
'task_id': task_id, # Make sure task_id is included
|
|
'processing_status': {
|
|
'message': 'Complete',
|
|
'state': 'ready'
|
|
}
|
|
}
|
|
|
|
except json.JSONDecodeError as je:
|
|
yield {
|
|
'type': 'error',
|
|
'message': f'Error decoding message: {je}',
|
|
'task_id': task_id,
|
|
}
|
|
except Exception as e:
|
|
yield {
|
|
'type': 'error',
|
|
'message': f'Error processing message: {str(e)}',
|
|
'task_id': task_id
|
|
}
|
|
|
|
except Exception as e:
|
|
yield {
|
|
'type': 'error',
|
|
'message': f'Error: {str(e)}',
|
|
'connected': False,
|
|
'content': f"Error: {str(e)}",
|
|
'complete': True, # Signal completion even on error
|
|
'task_id': task_id,
|
|
'processing_status': {
|
|
'message': 'Complete',
|
|
'state': 'ready'
|
|
}
|
|
} |