Files
eveAI/tests/specialist_execution/test_specialist_client.py
Josako 25213f2004 - Implementation of specialist execution api, including SSE protocol
- eveai_chat becomes deprecated and should be replaced with SSE
- Adaptation of STANDARD_RAG specialist
- Base class definition allowing to realise specialists with crewai framework
- Implementation of SPIN_SPECIALIST
- Implementation of test app for testing specialists (test_specialist_client). Also serves as an example for future SSE-based client
- Improvements to startup scripts to better handle and scale multiple connections
- Small improvements to the interaction forms and views
- Caching implementation improved and augmented with additional caches
2025-02-20 05:50:16 +01:00

226 lines
8.4 KiB
Python

# test_specialist_client.py
from pathlib import Path
import requests
import json
from datetime import datetime
import sseclient
from typing import Dict, Any
import yaml
import os
from termcolor import colored
import sys
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(project_root)
# Configuration Constants
API_BASE_URL = "http://macstudio.ask-eve-ai-local.com:8080/api/api/v1"
TENANT_ID = 2 # Replace with your tenant ID
API_KEY = "EveAI-5096-5466-6143-1487-8085-4174-2080-7208" # Replace with your API key
SPECIALIST_TYPE = "SPIN_SPECIALIST" # Replace with your specialist type
SPECIALIST_ID = 5 # Replace with your specialist ID
ROOT_FOLDER = "../.."
def get_auth_token() -> str:
"""Get authentication token from API"""
response = requests.post(
f"{API_BASE_URL}/auth/token",
json={
"tenant_id": TENANT_ID,
"api_key": API_KEY
}
)
print(colored(f"Status Code: {response.status_code}", "cyan"))
print(colored(f"Response Headers: {response.headers}", "cyan"))
print(colored(f"Response Content: {response.text}", "cyan"))
if response.status_code == 200:
return response.json()['access_token']
else:
raise Exception(f"Authentication failed: {response.text}")
def get_session_id(auth_token: str) -> str:
"""Get a new session ID from the API"""
headers = {'Authorization': f'Bearer {auth_token}'}
response = requests.get(
f"{API_BASE_URL}/specialist-execution/start_session",
headers=headers
)
response.raise_for_status()
return response.json()["session_id"]
def load_specialist_config() -> Dict[str, Any]:
"""Load specialist configuration from YAML file"""
config_path = f"{ROOT_FOLDER}/config/specialists/{SPECIALIST_TYPE}/1.0.0.yaml"
if not os.path.exists(config_path):
print(colored(f"Error: Configuration file not found: {config_path}", "red"))
sys.exit(1)
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def get_argument_value(arg_name: str, arg_config: Dict[str, Any], previous_value: Any = None) -> Any:
"""Get argument value from user input"""
arg_type = arg_config.get('type', 'str')
description = arg_config.get('description', '')
# Show previous value if it exists
previous_str = f" (previous: {previous_value})" if previous_value is not None else ""
while True:
print(colored(f"\n{arg_name}: {description}{previous_str}", "cyan"))
value = input(colored("Enter value (or press Enter for previous): ", "yellow"))
if not value and previous_value is not None:
return previous_value
try:
if arg_type == 'int':
return int(value)
elif arg_type == 'float':
return float(value)
elif arg_type == 'bool':
return value.lower() in ('true', 'yes', '1', 't')
else:
return value
except ValueError:
print(colored(f"Invalid input for type {arg_type}. Please try again.", "red"))
def get_specialist_arguments(config: Dict[str, Any], previous_args: Dict[str, Any] = None) -> Dict[str, Any]:
"""Get all required arguments for specialist execution"""
arguments = {}
previous_args = previous_args or {}
for arg_name, arg_config in config.get('arguments', {}).items():
previous_value = previous_args.get(arg_name)
arguments[arg_name] = get_argument_value(arg_name, arg_config, previous_value)
return arguments
def process_specialist_updates(task_id: str, auth_token: str):
"""Process SSE updates from specialist execution"""
headers = {'Authorization': f'Bearer {auth_token}'}
url = f"{API_BASE_URL}/specialist-execution/{task_id}/stream"
print(colored("\nConnecting to execution stream...", "cyan"))
with requests.get(url, headers=headers, stream=True) as response:
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
line = line.decode('utf-8')
if not line.startswith('data: '):
continue
# Extract the data part
data = line[6:] # Skip 'data: '
try:
update = json.loads(data)
update_type = update['processing_type']
data = update['data']
timestamp = update.get('timestamp', datetime.now().isoformat())
# Print updates in different colors based on type
if update_type.endswith('Start'):
print(colored(f"\n[{timestamp}] {update_type}: {data}", "blue"))
elif update_type == 'EveAI Specialist Error':
print(colored(f"\n[{timestamp}] Error: {data}", "red"))
break
elif update_type == 'EveAI Specialist Complete':
print(colored(f"\n[{timestamp}] {update_type}: {data}", "green"))
print(colored(f"\n[{timestamp}] {type(data)}", "green"))
print(colored("Full Results:\n", "grey"))
formatted_data = json.dumps(data, indent=4)
print(colored(formatted_data, "grey"))
print(colored("Answer:\n", "cyan"))
answer = data.get('result', {}).get('rag_output', {}).get('answer', "")
print(colored(answer, "cyan"))
break
elif update_type.endswith('Complete'):
print(colored(f"\n[{timestamp}] {update_type}: {data}", "green"))
else:
print(colored(f"\n[{timestamp}] {update_type}: {data.get('message', '')}", "white"))
except json.JSONDecodeError:
print(colored(f"Error decoding message: {data}", "red"))
except Exception as e:
print(colored(f"Error processing message: {str(e)}", "red"))
def main():
try:
# Get authentication token
print(colored("Getting authentication token...", "cyan"))
auth_token = get_auth_token()
# Load specialist configuration
print(colored(f"Loading specialist configuration {SPECIALIST_TYPE}", "cyan"))
config = load_specialist_config()
previous_args = None
while True:
try:
# Get new session ID
print(colored("Getting session ID...", "cyan"))
session_id = get_session_id(auth_token)
print(colored(f"New session ID: {session_id}", "cyan"))
# Get arguments
arguments = get_specialist_arguments(config, previous_args)
previous_args = arguments
# Start specialist execution
print(colored("\nStarting specialist execution...", "cyan"))
headers = {
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
}
response = requests.post(
f"{API_BASE_URL}/specialist-execution",
headers=headers,
json={
'specialist_id': SPECIALIST_ID,
'arguments': arguments,
'session_id': session_id,
'user_timezone': 'UTC'
}
)
response.raise_for_status()
execution_data = response.json()
task_id = execution_data['task_id']
print(colored(f"Execution queued with Task ID: {task_id}", "cyan"))
# Process updates
process_specialist_updates(task_id, auth_token)
# Ask if user wants to continue
if input(colored("\nRun another execution? (y/n): ", "yellow")).lower() != 'y':
break
except KeyboardInterrupt:
print(colored("\nExecution cancelled by user", "yellow"))
if input(colored("Run another execution? (y/n): ", "yellow")).lower() != 'y':
break
except requests.exceptions.HTTPError as e:
print(colored(f"\nHTTP Error: {e.response.status_code} - {e.response.text}", "red"))
if input(colored("Try again? (y/n): ", "yellow")).lower() != 'y':
break
except Exception as e:
print(colored(f"\nError: {str(e)}", "red"))
sys.exit(1)
if __name__ == "__main__":
main()