# test_specialist_client.py from pathlib import Path import requests import json from datetime import datetime import sseclient from typing import Dict, Any import yaml import os from termcolor import colored import sys project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(project_root) # Configuration Constants API_BASE_URL = "http://macstudio.ask-eve-ai-local.com:8080/api/api/v1" TENANT_ID = 2 # Replace with your tenant ID API_KEY = "EveAI-5096-5466-6143-1487-8085-4174-2080-7208" # Replace with your API key SPECIALIST_ID = 5 # Replace with your specialist ID def get_auth_token() -> str: """Get authentication token from API""" response = requests.post( f"{API_BASE_URL}/auth/token", json={ "tenant_id": TENANT_ID, "api_key": API_KEY } ) print(colored(f"Status Code: {response.status_code}", "cyan")) print(colored(f"Response Headers: {response.headers}", "cyan")) print(colored(f"Response Content: {response.text}", "cyan")) if response.status_code == 200: return response.json()['access_token'] else: raise Exception(f"Authentication failed: {response.text}") def get_session_id(auth_token: str) -> str: """Get a new session ID from the API""" headers = {'Authorization': f'Bearer {auth_token}'} response = requests.get( f"{API_BASE_URL}/specialist-execution/start_session", headers=headers ) response.raise_for_status() return response.json()["session_id"] def get_specialist_config(auth_token: str, specialist_id: int) -> Dict[str, Any]: """Get specialist configuration from API""" headers = { 'Authorization': f'Bearer {auth_token}', 'Content-Type': 'application/json' } response = requests.get( f"{API_BASE_URL}/specialist-execution/specialist_arguments", headers=headers, json={ 'specialist_id': specialist_id } ) print(colored(f"Status Code: {response.status_code}", "cyan")) if response.status_code == 200: config_data = response.json() return config_data.get('arguments', {}) else: raise Exception(f"Failed to get specialist configuration: {response.text}") def get_argument_value(arg_name: str, arg_config: Dict[str, Any], previous_value: Any = None) -> Any: """Get argument value from user input""" arg_type = arg_config.get('type', 'str') description = arg_config.get('description', '') # Show previous value if it exists previous_str = f" (previous: {previous_value})" if previous_value is not None else "" while True: print(colored(f"\n{arg_name}: {description}{previous_str}", "cyan")) value = input(colored("Enter value (or press Enter for previous): ", "yellow")) if not value and previous_value is not None: return previous_value try: if arg_type == 'int': return int(value) elif arg_type == 'float': return float(value) elif arg_type == 'bool': return value.lower() in ('true', 'yes', '1', 't') else: return value except ValueError: print(colored(f"Invalid input for type {arg_type}. Please try again.", "red")) def get_specialist_arguments(config: Dict[str, Any], previous_args: Dict[str, Any] = None) -> Dict[str, Any]: """Get all required arguments for specialist execution""" arguments = {} previous_args = previous_args or {} for arg_name, arg_config in config.get('arguments', {}).items(): previous_value = previous_args.get(arg_name) arguments[arg_name] = get_argument_value(arg_name, arg_config, previous_value) return arguments def process_specialist_updates(task_id: str, auth_token: str): """Process SSE updates from specialist execution""" headers = {'Authorization': f'Bearer {auth_token}'} url = f"{API_BASE_URL}/specialist-execution/{task_id}/stream" print(colored("\nConnecting to execution stream...", "cyan")) with requests.get(url, headers=headers, stream=True) as response: response.raise_for_status() for line in response.iter_lines(): if not line: continue line = line.decode('utf-8') if not line.startswith('data: '): continue # Extract the data part data = line[6:] # Skip 'data: ' try: update = json.loads(data) update_type = update['processing_type'] data = update['data'] timestamp = update.get('timestamp', datetime.now().isoformat()) # Print updates in different colors based on type if update_type.endswith('Start'): print(colored(f"\n[{timestamp}] {update_type}: {data}", "blue")) elif update_type == 'EveAI Specialist Error': print(colored(f"\n[{timestamp}] Error: {data}", "red")) break elif update_type == 'EveAI Specialist Complete': print(colored(f"\n[{timestamp}] {update_type}: {data}", "green")) print(colored(f"\n[{timestamp}] {type(data)}", "green")) print(colored("Full Results:\n", "grey")) formatted_data = json.dumps(data, indent=4) print(colored(formatted_data, "grey")) print(colored("Answer:\n", "cyan")) answer = data.get('result', {}).get('rag_output', {}).get('answer', "") print(colored(answer, "cyan")) break elif update_type.endswith('Complete'): print(colored(f"\n[{timestamp}] {update_type}: {data}", "green")) else: print(colored(f"\n[{timestamp}] {update_type}: {data.get('message', '')}", "white")) except json.JSONDecodeError: print(colored(f"Error decoding message: {data}", "red")) except Exception as e: print(colored(f"Error processing message: {str(e)}", "red")) def main(): try: # Get authentication token print(colored("Getting authentication token...", "cyan")) auth_token = get_auth_token() # Load specialist configuration print(colored(f"Loading specialist configuration", "cyan")) config = { 'arguments': get_specialist_config(auth_token, SPECIALIST_ID) } previous_args = None while True: try: # Get new session ID print(colored("Getting session ID...", "cyan")) session_id = get_session_id(auth_token) print(colored(f"New session ID: {session_id}", "cyan")) # Get arguments arguments = get_specialist_arguments(config, previous_args) previous_args = arguments # Start specialist execution print(colored("\nStarting specialist execution...", "cyan")) headers = { 'Authorization': f'Bearer {auth_token}', 'Content-Type': 'application/json' } response = requests.post( f"{API_BASE_URL}/specialist-execution", headers=headers, json={ 'specialist_id': SPECIALIST_ID, 'arguments': arguments, 'session_id': session_id, 'user_timezone': 'UTC' } ) response.raise_for_status() execution_data = response.json() task_id = execution_data['task_id'] print(colored(f"Execution queued with Task ID: {task_id}", "cyan")) # Process updates process_specialist_updates(task_id, auth_token) # Ask if user wants to continue if input(colored("\nRun another execution? (y/n): ", "yellow")).lower() != 'y': break except KeyboardInterrupt: print(colored("\nExecution cancelled by user", "yellow")) if input(colored("Run another execution? (y/n): ", "yellow")).lower() != 'y': break except requests.exceptions.HTTPError as e: print(colored(f"\nHTTP Error: {e.response.status_code} - {e.response.text}", "red")) if input(colored("Try again? (y/n): ", "yellow")).lower() != 'y': break except Exception as e: print(colored(f"\nError: {str(e)}", "red")) sys.exit(1) if __name__ == "__main__": main()