Files
eveAI/eveai_app/views/interaction_views.py
Josako 50773fe602 - Adding functionality for listing and editing assets
- Started adding functionality for creating a 'full_documents' list view.
2025-07-03 11:14:10 +02:00

889 lines
39 KiB
Python

import ast
import json
import uuid
from datetime import datetime as dt, timezone as tz
import time
from common.extensions import minio_client
from flask import request, redirect, flash, render_template, Blueprint, session, current_app, jsonify, url_for
from flask_security import roles_accepted, current_user
from langchain.agents import Agent
from sqlalchemy import desc
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
from common.models.document import Embedding, DocumentVersion, Retriever
from common.models.interaction import (ChatSession, Interaction, InteractionEmbedding, Specialist, SpecialistRetriever,
EveAIAgent, EveAITask, EveAITool, EveAIAsset, SpecialistMagicLink)
from common.extensions import db, cache_manager
from common.models.user import SpecialistMagicLinkTenant
from common.services.interaction.specialist_services import SpecialistServices
from common.utils.execution_progress import ExecutionProgressTracker
from common.utils.model_logging_utils import set_logging_information, update_logging_information
from common.utils.middleware import mw_before_request
from common.utils.nginx_utils import prefixed_url_for
from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro
from .interaction_forms import (SpecialistForm, EditSpecialistForm, EditEveAIAgentForm, EditEveAITaskForm,
EditEveAIToolForm, ExecuteSpecialistForm,
SpecialistMagicLinkForm, EditSpecialistMagicLinkForm)
interaction_bp = Blueprint('interaction_bp', __name__, url_prefix='/interaction')
@interaction_bp.before_request
def log_before_request():
current_app.logger.debug(f'Before request: {request.path} =====================================')
@interaction_bp.after_request
def log_after_request(response):
return response
# Routes for Chat Session Management --------------------------------------------------------------
@interaction_bp.before_request
def before_request():
try:
mw_before_request()
except Exception as e:
current_app.logger.error(f'Error switching schema in Interaction Blueprint: {e}')
raise
@interaction_bp.route('/chat_sessions', methods=['GET', 'POST'])
def chat_sessions():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = ChatSession.query.order_by(desc(ChatSession.session_start))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
docs = pagination.items
rows = prepare_table_for_macro(docs, [('id', ''), ('session_id', ''), ('session_start', ''), ('session_end', '')])
return render_template('interaction/chat_sessions.html', rows=rows, pagination=pagination)
@interaction_bp.route('/handle_chat_session_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_chat_session_selection():
chat_session_identification = request.form['selected_row']
cs_id = ast.literal_eval(chat_session_identification).get('value')
action = request.form['action']
current_app.logger.debug(f'Handle Chat Session Selection Action: {action}')
match action:
case 'view_chat_session':
return redirect(prefixed_url_for('interaction_bp.view_chat_session', chat_session_id=cs_id))
case 'chat_session_interactions':
return redirect(prefixed_url_for('interaction_bp.session_interactions', chat_session_id=cs_id))
# Add more conditions for other actions
return redirect(prefixed_url_for('interaction_bp.chat_sessions'))
@interaction_bp.route('/view_chat_session/<int:chat_session_id>', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def view_chat_session(chat_session_id):
# Get chat session with user info
chat_session = ChatSession.query.get_or_404(chat_session_id)
# Get interactions with specialist info
interactions = (Interaction.query
.filter_by(chat_session_id=chat_session.id)
.join(Specialist, Interaction.specialist_id == Specialist.id, isouter=True)
.add_columns(
Interaction.id,
Interaction.question_at,
Interaction.specialist_arguments,
Interaction.specialist_results,
Specialist.name.label('specialist_name'),
Specialist.type.label('specialist_type')
).order_by(Interaction.question_at).all())
# Fetch all related embeddings for the interactions in this session
embedding_query = (db.session.query(InteractionEmbedding.interaction_id,
DocumentVersion.url,
DocumentVersion.object_name)
.join(Embedding, InteractionEmbedding.embedding_id == Embedding.id)
.join(DocumentVersion, Embedding.doc_vers_id == DocumentVersion.id)
.filter(InteractionEmbedding.interaction_id.in_([i.id for i, *_ in interactions])))
# Create a dictionary to store embeddings for each interaction
embeddings_dict = {}
for interaction_id, url, object_name in embedding_query:
if interaction_id not in embeddings_dict:
embeddings_dict[interaction_id] = []
embeddings_dict[interaction_id].append({'url': url, 'object_name': object_name})
return render_template('interaction/view_chat_session.html',
chat_session=chat_session,
interactions=interactions,
embeddings_dict=embeddings_dict)
@interaction_bp.route('/view_chat_session_by_session_id/<session_id>', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def view_chat_session_by_session_id(session_id):
"""
Deze route accepteert een session_id (string) en stuurt door naar view_chat_session met het juiste chat_session_id (integer)
"""
# Vind de chat session op basis van session_id
chat_session = ChatSession.query.filter_by(session_id=session_id).first_or_404()
# Nu we het chat_session object hebben, kunnen we de bestaande functie hergebruiken
return view_chat_session(chat_session.id)
def show_chat_session(chat_session):
interactions = Interaction.query.filter_by(chat_session_id=chat_session.id).all()
return render_template('interaction/view_chat_session.html', chat_session=chat_session, interactions=interactions)
# Routes for Specialist Management ----------------------------------------------------------------
@interaction_bp.route('/specialist', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def specialist():
form = SpecialistForm()
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
try:
new_specialist = Specialist()
# Populate fields individually instead of using populate_obj (gives problem with QueryMultipleSelectField)
new_specialist.name = form.name.data
new_specialist.type = form.type.data
new_specialist.type_version = cache_manager.specialists_version_tree_cache.get_latest_version(
new_specialist.type)
new_specialist.active = form.active.data
new_specialist.tuning = form.tuning.data
set_logging_information(new_specialist, dt.now(tz.utc))
db.session.add(new_specialist)
db.session.flush() # This assigns the ID to the specialist without committing the transaction
# Create the retriever associations
selected_retrievers = form.retrievers.data
for retriever in selected_retrievers:
specialist_retriever = SpecialistRetriever(
specialist_id=new_specialist.id,
retriever_id=retriever.id
)
db.session.add(specialist_retriever)
# Commit everything in one transaction
db.session.commit()
flash('Specialist successfully added!', 'success')
current_app.logger.info(f'Specialist {new_specialist.name} successfully added for tenant {tenant_id}!')
# Initialize the newly create specialist
SpecialistServices.initialize_specialist(new_specialist.id, new_specialist.type, new_specialist.type_version)
return redirect(prefixed_url_for('interaction_bp.edit_specialist', specialist_id=new_specialist.id))
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to add specialist. Error: {str(e)}', exc_info=True)
flash(f'Failed to add specialist. Error: {str(e)}', 'danger')
return render_template('interaction/specialist.html', form=form)
return render_template('interaction/specialist.html', form=form)
@interaction_bp.route('/specialist/<int:specialist_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_specialist(specialist_id):
specialist = Specialist.query.get_or_404(specialist_id)
form = EditSpecialistForm(request.form, obj=specialist)
specialist_config = cache_manager.specialists_config_cache.get_config(specialist.type, specialist.type_version)
form.add_dynamic_fields("configuration", specialist_config, specialist.configuration)
agent_rows = prepare_table_for_macro(specialist.agents,
[('id', ''), ('name', ''), ('type', ''), ('type_version', '')])
task_rows = prepare_table_for_macro(specialist.tasks,
[('id', ''), ('name', ''), ('type', ''), ('type_version', '')])
tool_rows = prepare_table_for_macro(specialist.tools,
[('id', ''), ('name', ''), ('type', ''), ('type_version', '')])
# Construct the SVG overview path
svg_filename = f"{specialist.type}_{specialist.type_version}_overview.svg"
svg_path = url_for('static', filename=f'assets/specialists/{svg_filename}')
if request.method == 'GET':
# Get the actual Retriever objects for the associated retriever_ids
retriever_objects = Retriever.query.filter(
Retriever.id.in_([sr.retriever_id for sr in specialist.retrievers])
).all()
form.retrievers.data = retriever_objects
if specialist.description is None:
specialist.description = specialist_config.get('metadata').get('description', '')
if form.validate_on_submit():
# Update the basic fields
specialist.name = form.name.data
specialist.description = form.description.data
specialist.tuning = form.tuning.data
specialist.active = form.active.data
# Update the configuration dynamic fields
specialist.configuration = form.get_dynamic_data("configuration")
# Get current and selected retrievers
current_retrievers = {sr.retriever_id: sr for sr in specialist.retrievers}
selected_retrievers = {r.id: r for r in form.retrievers.data}
# Remove unselected retrievers
for retriever_id in set(current_retrievers.keys()) - set(selected_retrievers.keys()):
specialist_retriever = current_retrievers[retriever_id]
db.session.delete(specialist_retriever)
# Add new retrievers
for retriever_id in set(selected_retrievers.keys()) - set(current_retrievers.keys()):
specialist_retriever = SpecialistRetriever(
specialist_id=specialist.id,
retriever_id=retriever_id
)
db.session.add(specialist_retriever)
# Update logging information
update_logging_information(specialist, dt.now(tz.utc))
try:
db.session.commit()
flash('Specialist updated successfully!', 'success')
current_app.logger.info(f'Specialist {specialist.id} updated successfully')
return redirect(prefixed_url_for('interaction_bp.specialists'))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update specialist. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update specialist {specialist_id}. Error: {str(e)}')
return render_template('interaction/edit_specialist.html',
form=form,
specialist_id=specialist_id,
agent_rows=agent_rows,
task_rows=task_rows,
tool_rows=tool_rows,
prefixed_url_for=prefixed_url_for,
svg_path=svg_path, )
else:
form_validation_failed(request, form)
return render_template('interaction/edit_specialist.html',
form=form,
specialist_id=specialist_id,
agent_rows=agent_rows,
task_rows=task_rows,
tool_rows=tool_rows,
prefixed_url_for=prefixed_url_for,
svg_path=svg_path, )
@interaction_bp.route('/specialists', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def specialists():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = Specialist.query.order_by(Specialist.id)
pagination = query.paginate(page=page, per_page=per_page)
the_specialists = pagination.items
# prepare table data
rows = prepare_table_for_macro(the_specialists,
[('id', ''), ('name', ''), ('type', ''), ('type_version', ''), ('active', ''),])
# Render the catalogs in a template
return render_template('interaction/specialists.html', rows=rows, pagination=pagination)
@interaction_bp.route('/handle_specialist_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_specialist_selection():
action = request.form.get('action')
if action == 'create_specialist':
return redirect(prefixed_url_for('interaction_bp.specialist'))
specialist_identification = request.form.get('selected_row')
specialist_id = ast.literal_eval(specialist_identification).get('value')
if action == "edit_specialist":
return redirect(prefixed_url_for('interaction_bp.edit_specialist', specialist_id=specialist_id))
elif action == "execute_specialist":
return redirect(prefixed_url_for('interaction_bp.execute_specialist', specialist_id=specialist_id))
return redirect(prefixed_url_for('interaction_bp.specialists'))
# Routes for Agent management ---------------------------------------------------------------------
@interaction_bp.route('/agent/<int:agent_id>/edit', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_agent(agent_id):
agent = EveAIAgent.query.get_or_404(agent_id)
form = EditEveAIAgentForm(obj=agent)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# Return just the form portion for AJAX requests
return render_template('interaction/components/edit_agent.html',
form=form,
agent=agent,
title="Edit Agent",
description="Configure the agent with company-specific details if required",
submit_text="Save Agent")
return None
@interaction_bp.route('/agent/<int:agent_id>/save', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def save_agent(agent_id):
agent = EveAIAgent.query.get_or_404(agent_id) if agent_id else EveAIAgent()
tenant_id = session.get('tenant').get('id')
form = EditEveAIAgentForm(obj=agent)
if form.validate_on_submit():
try:
form.populate_obj(agent)
update_logging_information(agent, dt.now(tz.utc))
if not agent_id: # New agent
db.session.add(agent)
db.session.commit()
return jsonify({'success': True, 'message': 'Agent saved successfully'})
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to save agent {agent_id} for tenant {tenant_id}. Error: {str(e)}')
return jsonify({'success': False, 'message': f"Failed to save agent {agent_id}: {str(e)}"})
return jsonify({'success': False, 'message': 'Validation failed'})
# Routes for Task management ----------------------------------------------------------------------
@interaction_bp.route('/task/<int:task_id>/edit', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_task(task_id):
task = EveAITask.query.get_or_404(task_id)
form = EditEveAITaskForm(obj=task)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return render_template('interaction/components/edit_task.html',
form=form,
task=task,
title="Edit Task",
description="Configure the task with company-specific details if required",
submit_text="Save Task"
)
return None
@interaction_bp.route('/task/<int:task_id>/save', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def save_task(task_id):
task = EveAITask.query.get_or_404(task_id) if task_id else EveAITask()
tenant_id = session.get('tenant').get('id')
form = EditEveAITaskForm(obj=task) # Replace with actual task form
if form.validate_on_submit():
try:
form.populate_obj(task)
update_logging_information(task, dt.now(tz.utc))
if not task_id: # New task
db.session.add(task)
db.session.commit()
return jsonify({'success': True, 'message': 'Task saved successfully'})
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to save task {task_id} for tenant {tenant_id}. Error: {str(e)}')
return jsonify({'success': False, 'message': f"Failed to save task {task_id}: {str(e)}"})
return jsonify({'success': False, 'message': 'Validation failed'})
# Routes for Tool management ----------------------------------------------------------------------
@interaction_bp.route('/tool/<int:tool_id>/edit', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_tool(tool_id):
tool = EveAITool.query.get_or_404(tool_id)
form = EditEveAIToolForm(obj=tool)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return render_template('interaction/components/edit_tool.html',
form=form,
tool=tool,
title="Edit Tool",
description="Configure the tool with company-specific details if required",
submit_text="Save Tool"
)
return None
@interaction_bp.route('/tool/<int:tool_id>/save', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def save_tool(tool_id):
tool = EveAITool.query.get_or_404(tool_id) if tool_id else EveAITool()
tenant_id = session.get('tenant').get('id')
form = EditEveAIToolForm(obj=tool) # Replace with actual tool form
if form.validate_on_submit():
try:
form.populate_obj(tool)
update_logging_information(tool, dt.now(tz.utc))
if not tool_id: # New tool
db.session.add(tool)
db.session.commit()
return jsonify({'success': True, 'message': 'Tool saved successfully'})
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to save tool {tool_id} for tenant {tenant_id}. Error: {str(e)}')
return jsonify({'success': False, 'message': f"Failed to save tool {tool_id}: {str(e)}"})
return jsonify({'success': False, 'message': 'Validation failed'})
# Component selection handlers --------------------------------------------------------------------
@interaction_bp.route('/handle_agent_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_agent_selection():
agent_identification = request.form['selected_row']
agent_id = ast.literal_eval(agent_identification).get('value')
action = request.form.get('action')
if action == "edit_agent":
return redirect(prefixed_url_for('interaction_bp.edit_agent', agent_id=agent_id))
return redirect(prefixed_url_for('interaction_bp.edit_specialist'))
@interaction_bp.route('/handle_task_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_task_selection():
task_identification = request.form['selected_row']
task_id = ast.literal_eval(task_identification).get('value')
action = request.form.get('action')
if action == "edit_task":
return redirect(prefixed_url_for('interaction_bp.edit_task', task_id=task_id))
return redirect(prefixed_url_for('interaction_bp.edit_specialist'))
@interaction_bp.route('/handle_tool_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_tool_selection():
tool_identification = request.form['selected_row']
tool_id = ast.literal_eval(tool_identification).get('value')
action = request.form.get('action')
if action == "edit_tool":
return redirect(prefixed_url_for('interaction_bp.edit_tool', tool_id=tool_id))
return redirect(prefixed_url_for('interaction_bp.edit_specialist'))
# Specialist Execution ----------------------------------------------------------------------------
@interaction_bp.route('/execute_specialist/<int:specialist_id>', methods=['GET', 'POST'])
def execute_specialist(specialist_id):
specialist = Specialist.query.get_or_404(specialist_id)
specialist_config = cache_manager.specialists_config_cache.get_config(specialist.type, specialist.type_version)
if specialist_config.get('chat', True):
flash("Only specialists that don't require interactions can be executed this way!", 'error')
return redirect(prefixed_url_for('interaction_bp.specialists'))
form = ExecuteSpecialistForm(request.form, obj=specialist)
if 'arguments' in specialist_config:
form.add_dynamic_fields('arguments', specialist_config)
if form.validate_on_submit():
# We're only interested in gathering the dynamic arguments
arguments = form.get_dynamic_data("arguments")
current_app.logger.debug(f"Executing specialist {specialist.id} with arguments: {arguments}")
session_id = SpecialistServices.start_session()
execution_task = SpecialistServices.execute_specialist(
tenant_id=session.get('tenant').get('id'),
specialist_id=specialist_id,
specialist_arguments=arguments,
session_id=session_id,
user_timezone=session.get('tenant').get('timezone')
)
current_app.logger.debug(f"Execution task for specialist {specialist.id} created: {execution_task}")
return redirect(prefixed_url_for('interaction_bp.session_interactions_by_session_id', session_id=session_id))
return render_template('interaction/execute_specialist.html', form=form)
# Interaction Mgmt --------------------------------------------------------------------------------
@interaction_bp.route('/session_interactions_by_session_id/<session_id>', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def session_interactions_by_session_id(session_id):
"""
This route shows all interactions for a given session_id (string).
If the chat_session doesn't exist yet, it will wait for up to 10 seconds
(with 1 second intervals) until it becomes available.
"""
waiting_message = request.args.get('waiting', 'false') == 'true'
# Try up to 10 times with 1 second pause
max_tries = 10
current_try = 1
while current_try <= max_tries:
chat_session = ChatSession.query.filter_by(session_id=session_id).first()
if chat_session:
# Session found, display the interactions
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = Interaction.query.filter_by(chat_session_id=chat_session.id).order_by(Interaction.question_at)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
interactions = pagination.items
rows = prepare_table_for_macro(interactions, [('id', ''), ('question_at', ''), ('detailed_question_at', ''),
('answer_at', ''), ('processing_error', '')])
# Define a callback to make a URL for a given page and the same session_id
def make_page_url(page_num):
return prefixed_url_for('interaction_bp.session_interactions_by_session_id', session_id=session_id,
page=page_num)
return render_template('interaction/session_interactions.html',
chat_session=chat_session, rows=rows, pagination=pagination,
make_page_url=make_page_url)
# Session not found, wait and try again
if current_try < max_tries:
current_try += 1
time.sleep(1)
else:
# Maximum number of attempts reached
break
# If we're here, the session wasn't found after the maximum number of attempts
flash(f'The chat session with ID {session_id} could not be found after {max_tries} attempts. '
f'The session may still be in the process of being created or the ID might be incorrect.', 'warning')
# Show a waiting page with auto-refresh if we haven't shown a waiting message yet
if not waiting_message:
return render_template('interaction/waiting_for_session.html',
session_id=session_id,
refresh_url=prefixed_url_for('interaction_bp.session_interactions_by_session_id',
session_id=session_id,
waiting='true'))
# If we've already shown a waiting message and still don't have a session, go back to the specialists page
return redirect(prefixed_url_for('interaction_bp.specialists'))
@interaction_bp.route('/session_interactions/<chat_session_id>', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def session_interactions(chat_session_id):
"""
This route shows all interactions for a given chat_session_id (int).
"""
chat_session = ChatSession.query.get_or_404(chat_session_id)
return session_interactions_by_session_id(chat_session.session_id)
# Routes for SpecialistMagicLink Management -------------------------------------------------------
@interaction_bp.route('/specialist_magic_link', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def specialist_magic_link():
form = SpecialistMagicLinkForm()
if request.method == 'GET':
magic_link_code = f"SPECIALIST_ML-{str(uuid.uuid4())}"
form.magic_link_code.data = magic_link_code
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
try:
new_specialist_magic_link = SpecialistMagicLink()
# Populate fields individually instead of using populate_obj
form.populate_obj(new_specialist_magic_link)
set_logging_information(new_specialist_magic_link, dt.now(tz.utc))
# Create 'public' SpecialistMagicLinkTenant
new_spec_ml_tenant = SpecialistMagicLinkTenant()
new_spec_ml_tenant.magic_link_code = new_specialist_magic_link.magic_link_code
new_spec_ml_tenant.tenant_id = tenant_id
# Define the make valid for this magic link
specialist = Specialist.query.get(new_specialist_magic_link.specialist_id)
make_id = specialist.configuration.get('make', None)
current_app.logger.debug(f"make_id defined in specialist: {make_id}")
if make_id:
new_specialist_magic_link.tenant_make_id = make_id
elif session.get('tenant').get('default_tenant_make_id'):
new_specialist_magic_link.tenant_make_id = session.get('tenant').get('default_tenant_make_id')
db.session.add(new_specialist_magic_link)
db.session.add(new_spec_ml_tenant)
db.session.commit()
flash('Specialist Magic Link successfully added!', 'success')
current_app.logger.info(f'Specialist {new_specialist_magic_link.name} successfully added for '
f'tenant {tenant_id}!')
return redirect(prefixed_url_for('interaction_bp.edit_specialist_magic_link',
specialist_magic_link_id=new_specialist_magic_link.id))
except Exception as e:
db.session.rollback()
current_app.logger.error(f'Failed to add specialist magic link. Error: {str(e)}', exc_info=True)
flash(f'Failed to add specialist magic link. Error: {str(e)}', 'danger')
return render_template('interaction/specialist_magic_link.html', form=form)
@interaction_bp.route('/specialist_magic_link/<int:specialist_magic_link_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_specialist_magic_link(specialist_magic_link_id):
specialist_ml = SpecialistMagicLink.query.get_or_404(specialist_magic_link_id)
# We need to pass along the extra kwarg specialist_id, as this id is required to initialize the form
form = EditSpecialistMagicLinkForm(request.form, obj=specialist_ml, specialist_id=specialist_ml.specialist_id)
# Find the Specialist type and type_version to enable to retrieve the arguments
specialist = Specialist.query.get_or_404(specialist_ml.specialist_id)
specialist_config = cache_manager.specialists_config_cache.get_config(specialist.type, specialist.type_version)
form.add_dynamic_fields("arguments", specialist_config, specialist_ml.specialist_args)
# Set the tenant_make_id default value
if request.method == 'GET':
if specialist_ml.tenant_make_id is None:
form.tenant_make_id.data = 0
else:
form.tenant_make_id.data = specialist_ml.tenant_make_id
# Set the chat client URL
tenant_id = session.get('tenant').get('id')
chat_client_prefix = current_app.config.get('CHAT_CLIENT_PREFIX', 'chat_client/chat/')
base_url = request.url_root
magic_link_code = specialist_ml.magic_link_code
# Parse the URL om poortinformatie te behouden als deze afwijkt van de standaard
url_parts = request.url.split('/')
host_port = url_parts[2] # Dit bevat zowel hostname als poort indien aanwezig
# Generate the full URL for chat client with magic link code
chat_client_url = f"{request.scheme}://{host_port}/{chat_client_prefix}{magic_link_code}"
form.chat_client_url.data = chat_client_url
# Generate QR code as data URI for direct embedding in HTML
try:
import qrcode
import io
import base64
# Generate QR code as PNG for better compatibility
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4
)
qr.add_data(chat_client_url)
qr.make(fit=True)
# Generate PNG image in memory
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format='PNG')
img_data = buffer.getvalue()
# Create data URI for direct embedding in HTML
img_base64 = base64.b64encode(img_data).decode('utf-8')
data_uri = f"data:image/png;base64,{img_base64}"
# Store the data URI in the form data
form.qr_code_url.data = data_uri
current_app.logger.debug(f"QR code generated successfully for {magic_link_code}")
current_app.logger.debug(f"QR code data URI starts with: {data_uri[:50]}...")
except Exception as e:
current_app.logger.error(f"Failed to generate QR code: {str(e)}")
form.qr_code_url.data = "Error generating QR code"
if form.validate_on_submit():
# Update the basic fields
form.populate_obj(specialist_ml)
# Update the arguments dynamic fields
specialist_ml.specialist_args = form.get_dynamic_data("arguments")
# Handle the tenant_make_id special case (0 = None)
if form.tenant_make_id.data == 0:
specialist_ml.tenant_make_id = None
# Update logging information
update_logging_information(specialist_ml, dt.now(tz.utc))
try:
db.session.commit()
flash('Specialist Magic Link updated successfully!', 'success')
current_app.logger.info(f'Specialist Magic Link {specialist_ml.id} updated successfully')
return redirect(prefixed_url_for('interaction_bp.specialist_magic_links'))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update specialist Magic Link. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update specialist Magic Link {specialist_ml.id}. Error: {str(e)}')
else:
form_validation_failed(request, form)
return render_template('interaction/edit_specialist_magic_link.html', form=form)
@interaction_bp.route('/specialist_magic_links', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def specialist_magic_links():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = SpecialistMagicLink.query.order_by(SpecialistMagicLink.id)
pagination = query.paginate(page=page, per_page=per_page)
the_specialist_magic_links = pagination.items
# prepare table data
rows = prepare_table_for_macro(the_specialist_magic_links, [('id', ''), ('name', ''), ('magic_link_code', ''),])
# Render the catalogs in a template
return render_template('interaction/specialist_magic_links.html', rows=rows, pagination=pagination)
@interaction_bp.route('/handle_specialist_magic_link_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_specialist_magic_link_selection():
action = request.form.get('action')
if action == 'create_specialist_magic_link':
return redirect(prefixed_url_for('interaction_bp.specialist_magic_link'))
specialist_ml_identification = request.form.get('selected_row')
specialist_ml_id = ast.literal_eval(specialist_ml_identification).get('value')
if action == "edit_specialist_magic_link":
return redirect(prefixed_url_for('interaction_bp.edit_specialist_magic_link',
specialist_magic_link_id=specialist_ml_id))
return redirect(prefixed_url_for('interaction_bp.specialists'))
# Routes for Asset Management ---------------------------------------------------------------------
@interaction_bp.route('/assets', methods=['GET', 'POST'])
def assets():
from eveai_app.views.list_views.assets_list_view import AssetsListView
view = AssetsListView(
model=EveAIAsset,
template='interaction/assets.html',
per_page=10
)
return view.get()
@interaction_bp.route('/handle_asset_selection', methods=['POST'])
def handle_asset_selection():
action = request.form.get('action')
asset_id = request.form.get('selected_row')
current_app.logger.debug(f"Action: {action}, Asset ID: {asset_id}")
if action == 'edit_asset':
return redirect(prefixed_url_for('interaction_bp.edit_asset', asset_id=asset_id))
return redirect(prefixed_url_for('interaction_bp.assets'))
@interaction_bp.route('/edit_asset/<int:asset_id>', methods=['GET', 'POST'])
def edit_asset(asset_id):
asset = EveAIAsset.query.get_or_404(asset_id)
tenant_id = session.get('tenant', {}).get('id')
if not tenant_id:
flash('Geen tenant geselecteerd', 'error')
return redirect(url_for('interaction_bp.assets'))
# Controleer of het bestandstype wordt ondersteund
if asset.file_type != 'json':
flash(
f'Bestandstype "{asset.file_type}" wordt momenteel niet ondersteund voor bewerking. Alleen JSON-bestanden kunnen worden bewerkt.',
'warning')
return redirect(url_for('interaction_bp.assets'))
if request.method == 'GET':
try:
# Haal het bestand op uit MinIO
file_data = minio_client.download_asset_file(
tenant_id,
asset.bucket_name,
asset.object_name
)
# Decodeer JSON data
json_content = json.loads(file_data.decode('utf-8'))
context = {
'asset': asset,
'json_content': json.dumps(json_content, indent=2),
'asset_id': asset_id
}
return render_template('interaction/edit_asset.html', **context)
except json.JSONDecodeError:
flash('Fout bij het laden van het JSON-bestand: ongeldig JSON-formaat', 'error')
return redirect(prefixed_url_for('interaction_bp.assets'))
except Exception as e:
current_app.logger.error(f"Error loading asset {asset_id}: {str(e)}")
flash(f'Fout bij het laden van het asset: {str(e)}', 'error')
return redirect(prefixed_url_for('interaction_bp.assets'))
elif request.method == 'POST':
try:
# Haal de bewerkte JSON data op uit het formulier
json_data = request.form.get('json_content')
if not json_data:
flash('Geen JSON data ontvangen', 'error')
return redirect(prefixed_url_for('interaction_bp.edit_asset', asset_id=asset_id))
# Valideer JSON formaat
try:
parsed_json = json.loads(json_data)
except json.JSONDecodeError as e:
flash(f'Ongeldig JSON-formaat: {str(e)}', 'error')
return redirect(prefixed_url_for('interaction_bp.edit_asset', asset_id=asset_id))
# Upload de bijgewerkte JSON naar MinIO
bucket_name, object_name, file_size = minio_client.upload_asset_file(
tenant_id,
asset.id,
asset.type,
asset.file_type,
json_data
)
# Update asset metadata
asset.file_size = file_size
asset.updated_at = dt.now(tz.utc)
asset.updated_by = current_user.id
asset.last_used_at = dt.now(tz.utc)
db.session.commit()
flash('Asset succesvol bijgewerkt', 'success')
return redirect(prefixed_url_for('interaction_bp.assets'))
except Exception as e:
db.session.rollback()
current_app.logger.error(f"Error saving asset {asset_id}: {str(e)}")
flash(f'Fout bij het opslaan van het asset: {str(e)}', 'error')
return redirect(prefixed_url_for('interaction_bp.edit_asset', asset_id=asset_id))