- Improvements to EntitlementsDomain & Services - Prechecks in Document domain - Add audit information to LicenseUsage
62 lines
1.8 KiB
Python
62 lines
1.8 KiB
Python
import time
|
|
import requests
|
|
from typing import Dict
|
|
from eveai_client.platform.config.config import Config
|
|
from flask import current_app, Flask
|
|
|
|
|
|
class AuthManager:
|
|
"""Handles authentication and token management for the EveAI client."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the authentication manager."""
|
|
self.token_cache: Dict[str, any] = {}
|
|
self.api_base_url = None
|
|
self.tenant_id = None
|
|
self.api_key = None
|
|
|
|
def init_app(self, app: Flask) -> None:
|
|
self.api_base_url = app.config['API_BASE_URL']
|
|
self.tenant_id = app.config['TENANT_ID']
|
|
self.api_key = app.config['API_KEY']
|
|
|
|
def get_token(self) -> str:
|
|
"""Get a valid authentication token."""
|
|
if self._is_token_valid():
|
|
return self.token_cache['token']
|
|
|
|
return self._fetch_new_token()
|
|
|
|
def _is_token_valid(self) -> bool:
|
|
"""Check if the cached token is still valid."""
|
|
if not self.token_cache:
|
|
return False
|
|
|
|
# Check expiry with 30s buffer
|
|
return time.time() < self.token_cache['expiry'] - 30
|
|
|
|
def _fetch_new_token(self) -> str:
|
|
"""Fetch a new token from the API."""
|
|
response = requests.post(
|
|
f"{self.api_base_url}/api/v1/auth/token",
|
|
json={
|
|
"tenant_id": int(self.tenant_id),
|
|
"api_key": self.api_key
|
|
}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Authentication failed: {response.text}")
|
|
|
|
token_data = response.json()
|
|
self.token_cache = {
|
|
'token': token_data['access_token'],
|
|
'expiry': time.time() + token_data.get('expires_in', 3600) # Default 1 hour
|
|
}
|
|
|
|
return self.token_cache['token']
|
|
|
|
def clear_token(self) -> None:
|
|
"""Clear the cached token."""
|
|
self.token_cache = {}
|