- Improvements to EntitlementsDomain & Services - Prechecks in Document domain - Add audit information to LicenseUsage
101 lines
3.5 KiB
Python
101 lines
3.5 KiB
Python
"""API client for the EveAI client."""
|
|
import json
|
|
from typing import Dict, Any, Optional
|
|
import requests
|
|
from datetime import datetime
|
|
from flask import Flask
|
|
|
|
|
|
class ApiClient:
|
|
"""Handles all API communications with the EveAI backend."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the API client."""
|
|
self.session = requests.Session()
|
|
self.base_url = None
|
|
self.auth_manager = None
|
|
|
|
def init_app(self, app, auth_manager) -> None:
|
|
self.base_url = app.config["API_BASE_URL"]
|
|
self.auth_manager = auth_manager
|
|
|
|
def _get_headers(self) -> Dict[str, str]:
|
|
"""Get headers with current authentication token."""
|
|
return {
|
|
'Authorization': f'Bearer {self.auth_manager.get_token()}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
def get_specialist_config(self, specialist_id: int) -> Dict[str, Any]:
|
|
"""Get specialist configuration from the API."""
|
|
response = self.session.get(
|
|
f"{self.base_url}/api/v1/specialist-execution/specialist_arguments",
|
|
headers=self._get_headers(),
|
|
json={'specialist_id': specialist_id}
|
|
)
|
|
response.raise_for_status()
|
|
response_dict = response.json()
|
|
return response_dict.get('arguments', {})
|
|
|
|
def start_session(self) -> str:
|
|
"""Start a new specialist execution session."""
|
|
response = self.session.get(
|
|
f"{self.base_url}/api/v1/specialist-execution/start_session",
|
|
headers=self._get_headers()
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()['session_id']
|
|
|
|
def execute_specialist(
|
|
self,
|
|
specialist_id: int,
|
|
arguments: Dict[str, Any],
|
|
session_id: str
|
|
) -> Dict[str, str]:
|
|
"""Start specialist execution."""
|
|
response = self.session.post(
|
|
f"{self.base_url}/api/v1/specialist-execution",
|
|
headers=self._get_headers(),
|
|
json={
|
|
'specialist_id': specialist_id,
|
|
'arguments': arguments,
|
|
'session_id': session_id,
|
|
'user_timezone': 'UTC' # TODO: Get from system
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
response_json = response.json()
|
|
|
|
# Return both task_id and stream_url if available
|
|
return {
|
|
'task_id': response_json['task_id'],
|
|
'stream_url': response_json.get('stream_url',
|
|
f"{self.base_url}/specialist-execution/{response_json['task_id']}/stream")
|
|
}
|
|
|
|
def handle_request_error(self, error: requests.RequestException) -> Dict[str, Any]:
|
|
"""Handle request errors and return appropriate error information."""
|
|
if isinstance(error, requests.HTTPError):
|
|
try:
|
|
error_data = error.response.json()
|
|
return {
|
|
'error': error_data.get('error', 'Unknown error'),
|
|
'message': error_data.get('message', str(error)),
|
|
'status_code': error.response.status_code
|
|
}
|
|
except ValueError:
|
|
return {
|
|
'error': 'HTTP Error',
|
|
'message': str(error),
|
|
'status_code': error.response.status_code if error.response else 500
|
|
}
|
|
return {
|
|
'error': error.__class__.__name__,
|
|
'message': str(error),
|
|
'status_code': 500
|
|
}
|
|
|
|
def close(self) -> None:
|
|
"""Close the API client session."""
|
|
self.session.close()
|