# eveai_api/api/specialist_execution_api.py import uuid from flask import Response, stream_with_context, current_app from flask_restx import Namespace, Resource, fields from flask_jwt_extended import jwt_required, get_jwt_identity from common.extensions import cache_manager from common.utils.celery_utils import current_celery from common.utils.execution_progress import ExecutionProgressTracker from eveai_api.api.auth import requires_service from common.models.interaction import Specialist specialist_execution_ns = Namespace('specialist-execution', description='Specialist execution operations') specialist_start_session_response = specialist_execution_ns.model('StartSessionResponse', { 'session_id': fields.String(required=True, description='A new Chat session ID'), }) @specialist_execution_ns.route('/start_session', methods=['GET']) class StartSession(Resource): @jwt_required() @requires_service("SPECIALIST_API") @specialist_execution_ns.response(201, 'New Session ID created Successfully', specialist_start_session_response) def get(self): new_session_id = f"{uuid.uuid4()}" return { 'session_id': new_session_id, }, 201 specialist_execution_input = specialist_execution_ns.model('SpecialistExecutionInput', { 'specialist_id': fields.Integer(required=True, description='ID of the specialist to use'), 'arguments': fields.Raw(required=True, description='Dynamic arguments for specialist and retrievers'), 'session_id': fields.String(required=True, description='Chat session ID'), 'user_timezone': fields.String(required=True, description='User timezone') }) specialist_execution_response = specialist_execution_ns.model('SpecialistExecutionResponse', { 'task_id': fields.String(description='ID of specialist execution task, to be used to retrieve execution stream'), 'status': fields.String(description='Status of the execution'), 'stream_url': fields.String(description='Stream URL'), }) @specialist_execution_ns.route('') class StartExecution(Resource): @jwt_required() @requires_service('SPECIALIST_API') @specialist_execution_ns.expect(specialist_execution_input) @specialist_execution_ns.response(201, 'Specialist execution successfully queued.', specialist_execution_response) def post(self): """Start execution of a specialist""" tenant_id = get_jwt_identity() data = specialist_execution_ns.payload # Send task to queue task = current_celery.send_task( 'execute_specialist', args=[tenant_id, data['specialist_id'], data['arguments'], data['session_id'], data['user_timezone'], ], queue='llm_interactions' ) return { 'task_id': task.id, 'status': 'queued', 'stream_url': f'/api/v1/specialist-execution/{task.id}/stream' }, 201 @specialist_execution_ns.route('//stream') class ExecutionStream(Resource): @jwt_required() @requires_service('SPECIALIST_API') def get(self, task_id: str): """Get streaming updates for a specialist execution""" progress_tracker = ExecutionProgressTracker() return Response( stream_with_context(progress_tracker.get_updates(task_id)), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } ) specialist_arguments_input = specialist_execution_ns.model('SpecialistArgumentsInput', { 'specialist_id': fields.Integer(required=True, description='ID of the specialist to use'), }) specialist_arguments_response = specialist_execution_ns.model('SpecialistArgumentsResponse', { 'arguments': fields.Raw(description='Dynamic list of attributes for the specialist.'), }) @specialist_execution_ns.route('/specialist_arguments', methods=['GET']) class SpecialistArgument(Resource): @jwt_required() @requires_service('SPECIALIST_API') @specialist_execution_ns.expect(specialist_arguments_input) @specialist_execution_ns.response(200, 'Specialist configuration fetched.', specialist_arguments_response) @specialist_execution_ns.response(404, 'Specialist configuration not found.') @specialist_execution_ns.response(500, 'Internal Server Error') def get(self): """Start execution of a specialist""" tenant_id = get_jwt_identity() data = specialist_execution_ns.payload specialist_id = data['specialist_id'] try: specialist = Specialist.query.get(specialist_id) if specialist: configuration = cache_manager.specialists_config_cache.get_config(specialist.type, specialist.type_version) current_app.logger.debug(f"Configuration returned: {configuration}") if configuration: if 'arguments' in configuration: return { 'arguments': configuration['arguments'], }, 200 else: specialist_execution_ns.abort(404, 'No arguments found in specialist configuration.') else: specialist_execution_ns.abort(404, 'Error fetching Specialist configuration.') else: specialist_execution_ns.abort(404, 'Error fetching Specialist') except Exception as e: current_app.logger.error(f"Error while retrieving Specialist configuration: {str(e)}") specialist_execution_ns.abort(500, 'Unexpected Error while fetching Specialist configuration.')