Files
eveAI/eveai_app/views/interaction_views.py

549 lines
24 KiB
Python

import ast
from datetime import datetime as dt, timezone as tz
from flask import request, redirect, flash, render_template, Blueprint, session, current_app, jsonify, url_for
from flask_security import roles_accepted
from langchain.agents import Agent
from sqlalchemy import desc
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
from common.models.document import Embedding, DocumentVersion, Retriever
from common.models.interaction import (ChatSession, Interaction, InteractionEmbedding, Specialist, SpecialistRetriever,
EveAIAgent, EveAITask, EveAITool, EveAIAssetVersion)
from common.extensions import db, cache_manager
from common.utils.asset_utils import create_asset_stack, add_asset_version_file
from common.utils.model_logging_utils import set_logging_information, update_logging_information
from common.utils.middleware import mw_before_request
from common.utils.nginx_utils import prefixed_url_for
from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro
from common.utils.specialist_utils import initialize_specialist
from config.type_defs.specialist_types import SPECIALIST_TYPES
from .interaction_forms import (SpecialistForm, EditSpecialistForm, EditEveAIAgentForm, EditEveAITaskForm,
EditEveAIToolForm, AddEveAIAssetForm, EditEveAIAssetVersionForm)
interaction_bp = Blueprint('interaction_bp', __name__, url_prefix='/interaction')
@interaction_bp.before_request
def log_before_request():
current_app.logger.debug(f'Before request: {request.path} =====================================')
@interaction_bp.after_request
def log_after_request(response):
return response
# Routes for Chat Session Management --------------------------------------------------------------
@interaction_bp.before_request
def before_request():
try:
mw_before_request()
except Exception as e:
current_app.logger.error(f'Error switching schema in Interaction Blueprint: {e}')
raise
@interaction_bp.route('/chat_sessions', methods=['GET', 'POST'])
def chat_sessions():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = ChatSession.query.order_by(desc(ChatSession.session_start))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
docs = pagination.items
rows = prepare_table_for_macro(docs, [('id', ''), ('session_id', ''), ('session_start', ''), ('session_end', '')])
return render_template('interaction/chat_sessions.html', rows=rows, pagination=pagination)
@interaction_bp.route('/handle_chat_session_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_chat_session_selection():
chat_session_identification = request.form['selected_row']
cs_id = ast.literal_eval(chat_session_identification).get('value')
action = request.form['action']
match action:
case 'view_chat_session':
return redirect(prefixed_url_for('interaction_bp.view_chat_session', chat_session_id=cs_id))
# Add more conditions for other actions
return redirect(prefixed_url_for('interaction_bp.chat_sessions'))
@interaction_bp.route('/view_chat_session/<int:chat_session_id>', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
def view_chat_session(chat_session_id):
# Get chat session with user info
chat_session = ChatSession.query.get_or_404(chat_session_id)
# Get interactions with specialist info
interactions = (Interaction.query
.filter_by(chat_session_id=chat_session.id)
.join(Specialist, Interaction.specialist_id == Specialist.id, isouter=True)
.add_columns(
Interaction.id,
Interaction.question_at,
Interaction.specialist_arguments,
Interaction.specialist_results,
Specialist.name.label('specialist_name'),
Specialist.type.label('specialist_type')
).order_by(Interaction.question_at).all())
# Fetch all related embeddings for the interactions in this session
embedding_query = (db.session.query(InteractionEmbedding.interaction_id,
DocumentVersion.url,
DocumentVersion.object_name)
.join(Embedding, InteractionEmbedding.embedding_id == Embedding.id)
.join(DocumentVersion, Embedding.doc_vers_id == DocumentVersion.id)
.filter(InteractionEmbedding.interaction_id.in_([i.id for i, *_ in interactions])))
# Create a dictionary to store embeddings for each interaction
embeddings_dict = {}
for interaction_id, url, object_name in embedding_query:
if interaction_id not in embeddings_dict:
embeddings_dict[interaction_id] = []
embeddings_dict[interaction_id].append({'url': url, 'object_name': object_name})
return render_template('interaction/view_chat_session.html',
chat_session=chat_session,
interactions=interactions,
embeddings_dict=embeddings_dict)
@interaction_bp.route('/view_chat_session_by_session_id/<session_id>', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
def view_chat_session_by_session_id(session_id):
chat_session = ChatSession.query.filter_by(session_id=session_id).first_or_404()
show_chat_session(chat_session)
def show_chat_session(chat_session):
interactions = Interaction.query.filter_by(chat_session_id=chat_session.id).all()
return render_template('interaction/view_chat_session.html', chat_session=chat_session, interactions=interactions)
# Routes for Specialist Management ----------------------------------------------------------------
@interaction_bp.route('/specialist', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def specialist():
form = SpecialistForm()
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
try:
new_specialist = Specialist()
# Populate fields individually instead of using populate_obj (gives problem with QueryMultipleSelectField)
new_specialist.name = form.name.data
new_specialist.type = form.type.data
new_specialist.type_version = cache_manager.specialists_version_tree_cache.get_latest_version(
new_specialist.type)
new_specialist.tuning = form.tuning.data
set_logging_information(new_specialist, dt.now(tz.utc))
db.session.add(new_specialist)
db.session.flush() # This assigns the ID to the specialist without committing the transaction
# Create the retriever associations
selected_retrievers = form.retrievers.data
for retriever in selected_retrievers:
specialist_retriever = SpecialistRetriever(
specialist_id=new_specialist.id,
retriever_id=retriever.id
)
db.session.add(specialist_retriever)
# Commit everything in one transaction
db.session.commit()
flash('Specialist successfully added!', 'success')
current_app.logger.info(f'Specialist {new_specialist.name} successfully added for tenant {tenant_id}!')
# Initialize the newly create specialist
initialize_specialist(new_specialist.id, new_specialist.type, new_specialist.type_version)
return redirect(prefixed_url_for('interaction_bp.edit_specialist', specialist_id=new_specialist.id))
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to add specialist. Error: {str(e)}', exc_info=True)
flash(f'Failed to add specialist. Error: {str(e)}', 'danger')
return render_template('interaction/specialist.html', form=form)
return render_template('interaction/specialist.html', form=form)
@interaction_bp.route('/specialist/<int:specialist_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_specialist(specialist_id):
specialist = Specialist.query.get_or_404(specialist_id)
form = EditSpecialistForm(request.form, obj=specialist)
specialist_config = cache_manager.specialists_config_cache.get_config(specialist.type, specialist.type_version)
configuration_config = specialist_config.get('configuration')
form.add_dynamic_fields("configuration", configuration_config, specialist.configuration)
agent_rows = prepare_table_for_macro(specialist.agents,
[('id', ''), ('name', ''), ('type', ''), ('type_version', '')])
task_rows = prepare_table_for_macro(specialist.tasks,
[('id', ''), ('name', ''), ('type', ''), ('type_version', '')])
tool_rows = prepare_table_for_macro(specialist.tools,
[('id', ''), ('name', ''), ('type', ''), ('type_version', '')])
# Construct the SVG overview path
svg_filename = f"{specialist.type}_{specialist.type_version}_overview.svg"
svg_path = url_for('static', filename=f'assets/specialists/{svg_filename}')
if request.method == 'GET':
# Get the actual Retriever objects for the associated retriever_ids
retriever_objects = Retriever.query.filter(
Retriever.id.in_([sr.retriever_id for sr in specialist.retrievers])
).all()
form.retrievers.data = retriever_objects
if specialist.description is None:
specialist.description = specialist_config.get('metadata').get('description', '')
if form.validate_on_submit():
# Update the basic fields
specialist.name = form.name.data
specialist.description = form.description.data
specialist.tuning = form.tuning.data
# Update the configuration dynamic fields
specialist.configuration = form.get_dynamic_data("configuration")
# Get current and selected retrievers
current_retrievers = {sr.retriever_id: sr for sr in specialist.retrievers}
selected_retrievers = {r.id: r for r in form.retrievers.data}
# Remove unselected retrievers
for retriever_id in set(current_retrievers.keys()) - set(selected_retrievers.keys()):
specialist_retriever = current_retrievers[retriever_id]
db.session.delete(specialist_retriever)
# Add new retrievers
for retriever_id in set(selected_retrievers.keys()) - set(current_retrievers.keys()):
specialist_retriever = SpecialistRetriever(
specialist_id=specialist.id,
retriever_id=retriever_id
)
db.session.add(specialist_retriever)
# Update logging information
update_logging_information(specialist, dt.now(tz.utc))
try:
db.session.commit()
flash('Specialist updated successfully!', 'success')
current_app.logger.info(f'Specialist {specialist.id} updated successfully')
return redirect(prefixed_url_for('interaction_bp.specialists'))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update specialist. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update specialist {specialist_id}. Error: {str(e)}')
return render_template('interaction/edit_specialist.html',
form=form,
specialist_id=specialist_id,
agent_rows=agent_rows,
task_rows=task_rows,
tool_rows=tool_rows,
prefixed_url_for=prefixed_url_for,
svg_path=svg_path, )
else:
form_validation_failed(request, form)
return render_template('interaction/edit_specialist.html',
form=form,
specialist_id=specialist_id,
agent_rows=agent_rows,
task_rows=task_rows,
tool_rows=tool_rows,
prefixed_url_for=prefixed_url_for,
svg_path=svg_path, )
@interaction_bp.route('/specialists', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def specialists():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = Specialist.query.order_by(Specialist.id)
pagination = query.paginate(page=page, per_page=per_page)
the_specialists = pagination.items
# prepare table data
rows = prepare_table_for_macro(the_specialists,
[('id', ''), ('name', ''), ('type', '')])
# Render the catalogs in a template
return render_template('interaction/specialists.html', rows=rows, pagination=pagination)
@interaction_bp.route('/handle_specialist_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_specialist_selection():
action = request.form.get('action')
if action == 'create_specialist':
return redirect(prefixed_url_for('interaction_bp.specialist'))
specialist_identification = request.form.get('selected_row')
specialist_id = ast.literal_eval(specialist_identification).get('value')
if action == "edit_specialist":
return redirect(prefixed_url_for('interaction_bp.edit_specialist', specialist_id=specialist_id))
return redirect(prefixed_url_for('interaction_bp.specialists'))
# Routes for Agent management ---------------------------------------------------------------------
@interaction_bp.route('/agent/<int:agent_id>/edit', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_agent(agent_id):
agent = EveAIAgent.query.get_or_404(agent_id)
form = EditEveAIAgentForm(obj=agent)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# Return just the form portion for AJAX requests
return render_template('interaction/components/edit_agent.html',
form=form,
agent=agent,
title="Edit Agent",
description="Configure the agent with company-specific details if required",
submit_text="Save Agent")
@interaction_bp.route('/agent/<int:agent_id>/save', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def save_agent(agent_id):
agent = EveAIAgent.query.get_or_404(agent_id) if agent_id else EveAIAgent()
tenant_id = session.get('tenant').get('id')
form = EditEveAIAgentForm(obj=agent)
if form.validate_on_submit():
try:
form.populate_obj(agent)
update_logging_information(agent, dt.now(tz.utc))
if not agent_id: # New agent
db.session.add(agent)
db.session.commit()
return jsonify({'success': True, 'message': 'Agent saved successfully'})
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to save agent {agent_id} for tenant {tenant_id}. Error: {str(e)}')
return jsonify({'success': False, 'message': f"Failed to save agent {agent_id}: {str(e)}"})
return jsonify({'success': False, 'message': 'Validation failed'})
# Routes for Task management ----------------------------------------------------------------------
@interaction_bp.route('/task/<int:task_id>/edit', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_task(task_id):
task = EveAITask.query.get_or_404(task_id)
form = EditEveAITaskForm(obj=task)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return render_template('interaction/components/edit_task.html',
form=form,
task=task)
@interaction_bp.route('/task/<int:task_id>/save', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def save_task(task_id):
task = EveAITask.query.get_or_404(task_id) if task_id else EveAITask()
tenant_id = session.get('tenant').get('id')
form = EditEveAITaskForm(obj=task) # Replace with actual task form
if form.validate_on_submit():
try:
form.populate_obj(task)
update_logging_information(task, dt.now(tz.utc))
if not task_id: # New task
db.session.add(task)
db.session.commit()
return jsonify({'success': True, 'message': 'Task saved successfully'})
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to save task {task_id} for tenant {tenant_id}. Error: {str(e)}')
return jsonify({'success': False, 'message': f"Failed to save task {task_id}: {str(e)}"})
return jsonify({'success': False, 'message': 'Validation failed'})
# Routes for Tool management ----------------------------------------------------------------------
@interaction_bp.route('/tool/<int:tool_id>/edit', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_tool(tool_id):
tool = EveAITool.query.get_or_404(tool_id)
form = EditEveAIToolForm(obj=tool)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return render_template('interaction/components/edit_tool.html',
form=form,
tool=tool)
@interaction_bp.route('/tool/<int:tool_id>/save', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def save_tool(tool_id):
tool = EveAITool.query.get_or_404(tool_id) if tool_id else EveAITool()
tenant_id = session.get('tenant').get('id')
form = EditEveAIToolForm(obj=tool) # Replace with actual tool form
if form.validate_on_submit():
try:
form.populate_obj(tool)
update_logging_information(tool, dt.now(tz.utc))
if not tool_id: # New tool
db.session.add(tool)
db.session.commit()
return jsonify({'success': True, 'message': 'Tool saved successfully'})
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to save tool {tool_id} for tenant {tenant_id}. Error: {str(e)}')
return jsonify({'success': False, 'message': f"Failed to save tool {tool_id}: {str(e)}"})
return jsonify({'success': False, 'message': 'Validation failed'})
# Component selection handlers --------------------------------------------------------------------
@interaction_bp.route('/handle_agent_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_agent_selection():
agent_identification = request.form['selected_row']
agent_id = ast.literal_eval(agent_identification).get('value')
action = request.form.get('action')
if action == "edit_agent":
return redirect(prefixed_url_for('interaction_bp.edit_agent', agent_id=agent_id))
return redirect(prefixed_url_for('interaction_bp.edit_specialist'))
@interaction_bp.route('/handle_task_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_task_selection():
task_identification = request.form['selected_row']
task_id = ast.literal_eval(task_identification).get('value')
action = request.form.get('action')
if action == "edit_task":
return redirect(prefixed_url_for('interaction_bp.edit_task', task_id=task_id))
return redirect(prefixed_url_for('interaction_bp.edit_specialist'))
@interaction_bp.route('/handle_tool_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_tool_selection():
tool_identification = request.form['selected_row']
tool_id = ast.literal_eval(tool_identification).get('value')
action = request.form.get('action')
if action == "edit_tool":
return redirect(prefixed_url_for('interaction_bp.edit_tool', tool_id=tool_id))
return redirect(prefixed_url_for('interaction_bp.edit_specialist'))
# Routes for Asset management ---------------------------------------------------------------------
@interaction_bp.route('/add_asset', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def add_asset():
form = AddEveAIAssetForm(request.form)
tenant_id = session.get('tenant').get('id')
if form.validate_on_submit():
try:
current_app.logger.info(f"Adding asset for tenant {tenant_id}")
api_input = {
'name': form.name.data,
'description': form.description.data,
'type': form.type.data,
'valid_from': form.valid_from.data,
'valid_to': form.valid_to.data,
}
new_asset, new_asset_version = create_asset_stack(api_input, tenant_id)
return redirect(prefixed_url_for('interaction_bp.edit_asset_version',
asset_version_id=new_asset_version.id))
except Exception as e:
current_app.logger.error(f'Failed to add asset for tenant {tenant_id}: {str(e)}')
flash('An error occurred while adding asset', 'error')
return render_template('interaction/add_asset.html')
@interaction_bp.route('/edit_asset_version/<int:asset_version_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_asset_version(asset_version_id):
asset_version = EveAIAssetVersion.query.get_or_404(asset_version_id)
form = EditEveAIAssetVersionForm(asset_version)
asset_config = cache_manager.assets_config_cache.get_config(asset_version.asset.type,
asset_version.asset.type_version)
configuration_config = asset_config.get('configuration')
form.add_dynamic_fields("configuration", configuration_config, asset_version.configuration)
if form.validate_on_submit():
# Update the configuration dynamic fields
configuration = form.get_dynamic_data("configuration")
processed_configuration = {}
tenant_id = session.get('tenant').get('id')
# if files are returned, we will store the file_names in the configuration, and add the file to the appropriate
# bucket, in the appropriate location
for field_name, field_value in configuration.items():
# Handle file field - check if the value is a FileStorage instance
if isinstance(field_value, FileStorage) and field_value.filename:
try:
# Upload file and retrieve object_name for the file
object_name = add_asset_version_file(asset_version, field_name, field_value, tenant_id)
# Store object reference in configuration instead of file content
processed_configuration[field_name] = object_name
except Exception as e:
current_app.logger.error(f"Failed to upload file for asset version {asset_version.id}: {str(e)}")
flash(f"Failed to upload file '{field_value.filename}': {str(e)}", "danger")
return render_template('interaction/edit_asset_version.html', form=form,
asset_version=asset_version)
# Handle normal fields
else:
processed_configuration[field_name] = field_value
# Update the asset version with processed configuration
asset_version.configuration = processed_configuration
# Update logging information
update_logging_information(asset_version, dt.now(tz.utc))
try:
db.session.commit()
flash('Asset uploaded successfully!', 'success')
current_app.logger.info(f'Asset Version {asset_version.id} updated successfully')
return redirect(prefixed_url_for('interaction_bp.assets'))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to upload asset. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update asset version {asset_version.id}. Error: {str(e)}')
return render_template('interaction/edit_asset_version.html', form=form)
else:
form_validation_failed(request, form)
return render_template('interaction/edit_asset_version.html', form=form)