Files
eveAI/eveai_api/api/specialist_execution_api.py
Josako 1fdbd2ff45 - Move global config files to globals iso global folder, as the name global conflicts with python language
- Creation of Traicie Vancancy Definition specialist
- Allow to invoke non-interaction specialists from withing Evie's mgmt interface (eveai_app)
- Improvements to crewai specialized classes
- Introduction to json editor for showing specialists arguments and results in a better way
- Introduction of more complex pagination (adding extra arguments) by adding a global 'get_pagination_html'
- Allow follow-up of ChatSession / Specialist execution
- Improvement in logging of Specialists (but needs to be finished)
2025-05-26 11:26:03 +02:00

130 lines
5.6 KiB
Python

# eveai_api/api/specialist_execution_api.py
import uuid
from flask import Response, stream_with_context, current_app
from flask_restx import Namespace, Resource, fields
from flask_jwt_extended import jwt_required, get_jwt_identity
from common.extensions import cache_manager
from common.utils.celery_utils import current_celery
from common.utils.execution_progress import ExecutionProgressTracker
from eveai_api.api.auth import requires_service
from common.models.interaction import Specialist
from common.services.interaction.specialist_services import SpecialistServices
specialist_execution_ns = Namespace('specialist-execution', description='Specialist execution operations')
specialist_start_session_response = specialist_execution_ns.model('StartSessionResponse', {
'session_id': fields.String(required=True, description='A new Chat session ID'),
})
@specialist_execution_ns.route('/start_session', methods=['GET'])
class StartSession(Resource):
@jwt_required()
@requires_service("SPECIALIST_API")
@specialist_execution_ns.response(201, 'New Session ID created Successfully', specialist_start_session_response)
def get(self):
new_session_id = SpecialistServices.start_session()
return {
'session_id': new_session_id,
}, 201
specialist_execution_input = specialist_execution_ns.model('SpecialistExecutionInput', {
'specialist_id': fields.Integer(required=True, description='ID of the specialist to use'),
'arguments': fields.Raw(required=True, description='Dynamic arguments for specialist and retrievers'),
'session_id': fields.String(required=True, description='Chat session ID'),
'user_timezone': fields.String(required=True, description='User timezone')
})
specialist_execution_response = specialist_execution_ns.model('SpecialistExecutionResponse', {
'task_id': fields.String(description='ID of specialist execution task, to be used to retrieve execution stream'),
'status': fields.String(description='Status of the execution'),
'stream_url': fields.String(description='Stream URL'),
})
@specialist_execution_ns.route('')
class StartExecution(Resource):
@jwt_required()
@requires_service('SPECIALIST_API')
@specialist_execution_ns.expect(specialist_execution_input)
@specialist_execution_ns.response(201, 'Specialist execution successfully queued.', specialist_execution_response)
def post(self):
"""Start execution of a specialist"""
tenant_id = get_jwt_identity()
data = specialist_execution_ns.payload
# Send task to queue
result = SpecialistServices.execute_specialist(
tenant_id=tenant_id,
specialist_id=data['specialist_id'],
specialist_arguments=data['arguments'],
session_id=data['session_id'],
user_timezone=data['user_timezone'])
result['stream_url'] = f"/api/v1/specialist-execution/{result['task_id']}/stream"
return result, 201
@specialist_execution_ns.route('/<string:task_id>/stream')
class ExecutionStream(Resource):
@jwt_required()
@requires_service('SPECIALIST_API')
def get(self, task_id: str):
"""Get streaming updates for a specialist execution"""
progress_tracker = ExecutionProgressTracker()
return Response(
stream_with_context(progress_tracker.get_updates(task_id)),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
specialist_arguments_input = specialist_execution_ns.model('SpecialistArgumentsInput', {
'specialist_id': fields.Integer(required=True, description='ID of the specialist to use'),
})
specialist_arguments_response = specialist_execution_ns.model('SpecialistArgumentsResponse', {
'arguments': fields.Raw(description='Dynamic list of attributes for the specialist.'),
})
@specialist_execution_ns.route('/specialist_arguments', methods=['GET'])
class SpecialistArgument(Resource):
@jwt_required()
@requires_service('SPECIALIST_API')
@specialist_execution_ns.expect(specialist_arguments_input)
@specialist_execution_ns.response(200, 'Specialist configuration fetched.', specialist_arguments_response)
@specialist_execution_ns.response(404, 'Specialist configuration not found.')
@specialist_execution_ns.response(500, 'Internal Server Error')
def get(self):
"""Start execution of a specialist"""
tenant_id = get_jwt_identity()
data = specialist_execution_ns.payload
specialist_id = data['specialist_id']
try:
specialist = Specialist.query.get(specialist_id)
if specialist:
configuration = cache_manager.specialists_config_cache.get_config(specialist.type,
specialist.type_version)
if configuration:
if 'arguments' in configuration:
return {
'arguments': configuration['arguments'],
}, 200
else:
specialist_execution_ns.abort(404, 'No arguments found in specialist configuration.')
else:
specialist_execution_ns.abort(404, 'Error fetching Specialist configuration.')
else:
specialist_execution_ns.abort(404, 'Error fetching Specialist')
except Exception as e:
current_app.logger.error(f"Error while retrieving Specialist configuration: {str(e)}")
specialist_execution_ns.abort(500, 'Unexpected Error while fetching Specialist configuration.')