Files
eveAI/eveai_client/platform/api/api_client.py
Josako 5c982fcc2c - Added EveAI Client to project
- Improvements to EntitlementsDomain & Services
- Prechecks in Document domain
- Add audit information to LicenseUsage
2025-05-17 15:56:14 +02:00

101 lines
3.5 KiB
Python

"""API client for the EveAI client."""
import json
from typing import Dict, Any, Optional
import requests
from datetime import datetime
from flask import Flask
class ApiClient:
"""Handles all API communications with the EveAI backend."""
def __init__(self):
"""Initialize the API client."""
self.session = requests.Session()
self.base_url = None
self.auth_manager = None
def init_app(self, app, auth_manager) -> None:
self.base_url = app.config["API_BASE_URL"]
self.auth_manager = auth_manager
def _get_headers(self) -> Dict[str, str]:
"""Get headers with current authentication token."""
return {
'Authorization': f'Bearer {self.auth_manager.get_token()}',
'Content-Type': 'application/json'
}
def get_specialist_config(self, specialist_id: int) -> Dict[str, Any]:
"""Get specialist configuration from the API."""
response = self.session.get(
f"{self.base_url}/api/v1/specialist-execution/specialist_arguments",
headers=self._get_headers(),
json={'specialist_id': specialist_id}
)
response.raise_for_status()
response_dict = response.json()
return response_dict.get('arguments', {})
def start_session(self) -> str:
"""Start a new specialist execution session."""
response = self.session.get(
f"{self.base_url}/api/v1/specialist-execution/start_session",
headers=self._get_headers()
)
response.raise_for_status()
return response.json()['session_id']
def execute_specialist(
self,
specialist_id: int,
arguments: Dict[str, Any],
session_id: str
) -> Dict[str, str]:
"""Start specialist execution."""
response = self.session.post(
f"{self.base_url}/api/v1/specialist-execution",
headers=self._get_headers(),
json={
'specialist_id': specialist_id,
'arguments': arguments,
'session_id': session_id,
'user_timezone': 'UTC' # TODO: Get from system
}
)
response.raise_for_status()
response_json = response.json()
# Return both task_id and stream_url if available
return {
'task_id': response_json['task_id'],
'stream_url': response_json.get('stream_url',
f"{self.base_url}/specialist-execution/{response_json['task_id']}/stream")
}
def handle_request_error(self, error: requests.RequestException) -> Dict[str, Any]:
"""Handle request errors and return appropriate error information."""
if isinstance(error, requests.HTTPError):
try:
error_data = error.response.json()
return {
'error': error_data.get('error', 'Unknown error'),
'message': error_data.get('message', str(error)),
'status_code': error.response.status_code
}
except ValueError:
return {
'error': 'HTTP Error',
'message': str(error),
'status_code': error.response.status_code if error.response else 500
}
return {
'error': error.__class__.__name__,
'message': str(error),
'status_code': 500
}
def close(self) -> None:
"""Close the API client session."""
self.session.close()