Files
eveAI/eveai_app/views/user_views.py
2025-07-14 18:58:54 +02:00

750 lines
32 KiB
Python

import json
import uuid
from datetime import datetime as dt, timezone as tz
from flask import request, redirect, flash, render_template, Blueprint, session, current_app
from flask_security import roles_accepted, current_user
from sqlalchemy.exc import SQLAlchemyError, IntegrityError
import ast
from common.models.user import User, Tenant, Role, TenantDomain, TenantProject, PartnerTenant, TenantMake
from common.extensions import db, security, minio_client, simple_encryption, cache_manager
from common.utils.dynamic_field_utils import create_default_config_from_type_config
from common.utils.security_utils import send_confirmation_email, send_reset_email
from config.type_defs.service_types import SERVICE_TYPES
from .user_forms import TenantForm, CreateUserForm, EditUserForm, TenantDomainForm, TenantSelectionForm, \
TenantProjectForm, EditTenantProjectForm, TenantMakeForm, EditTenantForm, EditTenantMakeForm
from common.utils.database import Database
from common.utils.view_assistants import prepare_table_for_macro, form_validation_failed
from common.utils.simple_encryption import generate_api_key
from common.utils.nginx_utils import prefixed_url_for
from common.utils.eveai_exceptions import EveAIException
from common.utils.document_utils import set_logging_information, update_logging_information
from common.services.user import TenantServices
from common.services.user import UserServices
from common.utils.mail_utils import send_email
from eveai_app.views.list_views.user_list_views import get_tenants_list_view, get_users_list_view, \
get_tenant_domains_list_view, get_tenant_projects_list_view, get_tenant_makes_list_view
from eveai_app.views.list_views.list_view_utils import render_list_view
user_bp = Blueprint('user_bp', __name__, url_prefix='/user')
@user_bp.before_request
def log_before_request():
current_app.logger.debug(f'Before request: {request.path} =====================================')
@user_bp.after_request
def log_after_request(response):
return response
# Tenant Management -------------------------------------------------------------------------------
@user_bp.route('/tenant', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin')
def tenant():
if not UserServices.can_user_create_tenant():
current_app.logger.error(f'User {current_user.email} cannot create tenant')
flash(f"You don't have the appropriate permissions to create a tenant", 'danger')
return redirect(prefixed_url_for('user_bp.tenants'))
form = TenantForm()
if request.method == 'GET':
code = f"TENANT-{str(uuid.uuid4())}"
form.code.data = code
if form.validate_on_submit():
# Handle the required attributes
new_tenant = Tenant()
form.populate_obj(new_tenant)
timestamp = dt.now(tz.utc)
new_tenant.created_at = timestamp
new_tenant.updated_at = timestamp
# Add the new tenant to the database and commit the changes
try:
db.session.add(new_tenant)
db.session.commit()
if current_user.has_roles('Partner Admin') and 'partner' in session:
# Always associate with the partner for Partner Admins
TenantServices.associate_tenant_with_partner(new_tenant.id)
elif current_user.has_roles('Super User') and form.assign_to_partner.data and 'partner' in session:
# Super User chose to associate with partner
TenantServices.associate_tenant_with_partner(new_tenant.id)
except IntegrityError as e:
db.session.rollback()
# Check for the specific error about duplicate tenant name
if "tenant_name_key" in str(e) or "duplicate key value" in str(e):
flash(f"A tenant with the name '{form.name.data}' already exists. Please choose a different name.",
'danger')
else:
current_app.logger.error(f'Failed to add tenant to database. Error: {str(e)}')
flash(f'Failed to add tenant to database. Error: {str(e)}', 'danger')
return render_template('user/tenant.html', form=form)
except SQLAlchemyError as e:
current_app.logger.error(f'Failed to add tenant to database. Error: {str(e)}')
flash(f'Failed to add tenant to database. Error: {str(e)}', 'danger')
return render_template('user/tenant.html', form=form)
except EveAIException as e:
current_app.logger.error(f'Error associating Tenant {new_tenant.id} to Partner. Error: {str(e)}')
flash(f'Error associating Tenant to Partner. Error: {str(e)}', 'danger')
return render_template('user/tenant.html', form=form)
current_app.logger.info(f"Successfully created tenant {new_tenant.id} in Database")
flash(f"Successfully created tenant {new_tenant.id} in Database", 'success')
# Create schema for new tenant
current_app.logger.info(f"Creating schema for tenant {new_tenant.id}")
Database(new_tenant.id).create_tenant_schema()
# Create MinIO bucket for new tenant
current_app.logger.info(f"Creating MinIO bucket for tenant {new_tenant.id}")
minio_client.create_tenant_bucket(new_tenant.id)
return redirect(prefixed_url_for('user_bp.tenants'))
else:
form_validation_failed(request, form)
return render_template('user/tenant.html', form=form)
@user_bp.route('/tenant/<int:tenant_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin')
def edit_tenant(tenant_id):
tenant = Tenant.query.get_or_404(tenant_id) # This will return a 404 if no tenant is found
form = EditTenantForm(obj=tenant)
if form.validate_on_submit():
# Populate the tenant with form data
form.populate_obj(tenant)
# Convert default_tenant_make_id to integer if not empty
if form.default_tenant_make_id.data:
tenant.default_tenant_make_id = int(form.default_tenant_make_id.data)
else:
tenant.default_tenant_make_id = None
db.session.commit()
flash('Tenant updated successfully.', 'success')
if session.get('tenant'):
if session['tenant'].get('id') == tenant_id:
session['tenant'] = tenant.to_dict()
# return redirect(url_for(f"user/tenant/tenant_id"))
else:
form_validation_failed(request, form)
return render_template('user/tenant.html', form=form, tenant_id=tenant_id)
@user_bp.route('/tenants', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin') # Allow both roles
def tenants():
# Get configuration and render the list view
config = get_tenants_list_view()
return render_list_view('list_view.html', **config)
@user_bp.route('/handle_tenant_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin')
def handle_tenant_selection():
action = request.form['action']
if action == 'create_tenant':
return redirect(prefixed_url_for('user_bp.tenant'))
tenant_identification = request.form['selected_row']
tenant_id = ast.literal_eval(tenant_identification).get('value')
if not UserServices.can_user_edit_tenant(tenant_id):
current_app.logger.info(f"User not authenticated to edit tenant {tenant_id}.")
flash(f"You are not authenticated to manage tenant {tenant_id}", 'danger')
return redirect(prefixed_url_for('user_bp.tenants'))
the_tenant = Tenant.query.get(tenant_id)
# set tenant information in the session
session['tenant'] = the_tenant.to_dict()
# remove catalog-related items from the session
session.pop('catalog_id', None)
session.pop('catalog_name', None)
match action:
case 'edit_tenant':
return redirect(prefixed_url_for('user_bp.edit_tenant', tenant_id=tenant_id))
case 'select_tenant':
return redirect(prefixed_url_for('user_bp.tenant_overview'))
# Add more conditions for other actions
return redirect(prefixed_url_for('tenants'))
@user_bp.route('/tenant_overview', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_overview():
tenant_id = session['tenant']['id']
tenant = Tenant.query.get_or_404(tenant_id)
form = EditTenantForm(obj=tenant)
# Zet de waarde van default_tenant_make_id
if tenant.default_tenant_make_id:
form.default_tenant_make_id.data = str(tenant.default_tenant_make_id)
# Haal de naam van de default make op als deze bestaat
default_make_name = None
if tenant.default_tenant_make:
default_make_name = tenant.default_tenant_make.name
return render_template('user/tenant_overview.html', form=form, default_make_name=default_make_name)
# User Management ---------------------------------------------------------------------------------
@user_bp.route('/user', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin', 'Partner Admin')
def user():
tenant_id = session.get('tenant').get('id')
form = CreateUserForm()
form.tenant_id.data = session.get('tenant').get('id') # It is only possible to create users for the session tenant
if form.validate_on_submit():
current_app.logger.info(f"Adding User for tenant {session['tenant']['id']} ")
new_user = User(user_name=form.user_name.data,
email=form.email.data,
first_name=form.first_name.data,
last_name=form.last_name.data,
valid_to=form.valid_to.data,
tenant_id=form.tenant_id.data,
fs_uniquifier=uuid.uuid4().hex,
)
timestamp = dt.now(tz.utc)
new_user.created_at = timestamp
new_user.updated_at = timestamp
# Add roles
for role_id in form.roles.data:
the_role = Role.query.get(role_id)
new_user.roles.append(the_role)
# Add the new user to the database and commit the changes
try:
db.session.add(new_user)
db.session.commit()
# security.datastore.set_uniquifier(new_user)
try:
send_confirmation_email(new_user)
current_app.logger.info(f'User {new_user.id} with name {new_user.user_name} added to database'
f'Confirmation email sent to {new_user.email}')
flash('User added successfully and confirmation email sent.', 'success')
except Exception as e:
current_app.logger.error(f'Failed to send confirmation email to {new_user.email}. Error: {str(e)}')
flash('User added successfully, but failed to send confirmation email. '
'Please contact the administrator.', 'warning')
return redirect(prefixed_url_for('user_bp.view_users'))
except Exception as e:
current_app.logger.error(f'Failed to add user with name {new_user.user_name}. Error: {str(e)}')
db.session.rollback()
flash(f'Failed to add user. Email or user name already exists.', 'danger')
else:
form_validation_failed(request, form)
return render_template('user/user.html', form=form)
@user_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin', 'Partner Admin')
def edit_user(user_id):
user = User.query.get_or_404(user_id) # This will return a 404 if no user is found
form = EditUserForm(obj=user)
if form.validate_on_submit():
# Populate the user with form data
user.first_name = form.first_name.data
user.last_name = form.last_name.data
user.valid_to = form.valid_to.data
user.updated_at = dt.now(tz.utc)
# Update roles
current_roles = set(role.id for role in user.roles)
selected_roles = set(form.roles.data)
if UserServices.validate_role_assignments(selected_roles):
# Add new roles
for role_id in selected_roles - current_roles:
role = Role.query.get(role_id)
if role:
user.roles.append(role)
# Remove unselected roles
for role_id in current_roles - selected_roles:
role = Role.query.get(role_id)
if role:
user.roles.remove(role)
else:
flash('Trying to assign unauthorized roles', 'danger')
current_app.logger.error(f"Trying to assign unauthorized roles by user {user_id},"
f"tenant {session['tenant']['id']}")
return redirect(prefixed_url_for('user_bp.edit_user', user_id=user_id))
db.session.commit()
flash('User updated successfully.', 'success')
return redirect(
prefixed_url_for('user_bp.edit_user',
user_id=user.id)) # Assuming there's a user profile view to redirect to
else:
form_validation_failed(request, form)
form.roles.data = [role.id for role in user.roles]
return render_template('user/edit_user.html', form=form, user_id=user_id)
@user_bp.route('/view_users')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def view_users():
tenant_id = session.get('tenant').get('id')
config = get_users_list_view(tenant_id)
return render_list_view('list_view.html', **config)
@user_bp.route('/handle_user_action', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_user_action():
action = request.form['action']
if action == 'create_user':
return redirect(prefixed_url_for('user_bp.user'))
user_identification = request.form['selected_row']
user_id = ast.literal_eval(user_identification).get('value')
user = User.query.get_or_404(user_id)
if action == 'edit_user':
return redirect(prefixed_url_for('user_bp.edit_user', user_id=user_id))
elif action == 'resend_confirmation_email':
send_confirmation_email(user)
flash(f'Confirmation email sent to {user.email}.', 'success')
elif action == 'send_password_reset_email':
send_reset_email(user)
flash(f'Password reset email sent to {user.email}.', 'success')
elif action == 'reset_uniquifier':
reset_uniquifier(user)
flash(f'Uniquifier reset for {user.user_name}.', 'success')
return redirect(prefixed_url_for('user_bp.view_users'))
# Tenant Domain Management (Probably obsolete )------------------------------------------------------------------------
@user_bp.route('/tenant_domains')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_domains():
tenant_id = session['tenant']['id']
config = get_tenant_domains_list_view(tenant_id)
return render_list_view('list_view.html', **config)
@user_bp.route('/handle_tenant_domain_action', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_tenant_domain_action():
action = request.form['action']
if action == 'create_tenant_domain':
return redirect(prefixed_url_for('user_bp.tenant_domain'))
tenant_domain_identification = request.form['selected_row']
tenant_domain_id = ast.literal_eval(tenant_domain_identification).get('value')
if action == 'edit_tenant_domain':
return redirect(prefixed_url_for('user_bp.edit_tenant_domain', tenant_domain_id=tenant_domain_id))
# Add more conditions for other actions
return redirect(prefixed_url_for('tenant_domains'))
@user_bp.route('/tenant_domain', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_domain():
form = TenantDomainForm()
if form.validate_on_submit():
new_tenant_domain = TenantDomain()
form.populate_obj(new_tenant_domain)
new_tenant_domain.tenant_id = session['tenant']['id']
set_logging_information(new_tenant_domain, dt.now(tz.utc))
# Add the new user to the database and commit the changes
try:
db.session.add(new_tenant_domain)
db.session.commit()
flash('Tenant Domain added successfully.', 'success')
current_app.logger.info(
f'Tenant Domain {new_tenant_domain.domain} added for tenant {session["tenant"]["id"]}')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add Tenant Domain. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to create Tenant Domain {new_tenant_domain.domain}. '
f'for tenant {session["tenant"]["id"]}'
f'Error: {str(e)}')
else:
flash('Please fill in all required fields.', 'information')
return render_template('user/tenant_domain.html', form=form)
@user_bp.route('/tenant_domain/<int:tenant_domain_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_tenant_domain(tenant_domain_id):
tenant_domain = TenantDomain.query.get_or_404(tenant_domain_id) # This will return a 404 if no user is found
form = TenantDomainForm(obj=tenant_domain)
if request.method == 'POST' and form.validate_on_submit():
form.populate_obj(tenant_domain)
update_logging_information(tenant_domain, dt.now(tz.utc))
try:
db.session.add(tenant_domain)
db.session.commit()
flash('Tenant Domain updated successfully.', 'success')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update Tenant Domain. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update Tenant Domain {tenant_domain.id}. '
f'for tenant {session["tenant"]["id"]}'
f'Error: {str(e)}')
return redirect(
prefixed_url_for('user_bp.tenant_domains',
tenant_id=session['tenant']['id'])) # Assuming there's a user profile view to redirect to
else:
form_validation_failed(request, form)
return render_template('user/edit_tenant_domain.html', form=form, tenant_domain_id=tenant_domain_id)
# Tenant Project Management -----------------------------------------------------------------------
@user_bp.route('/tenant_project', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_project():
form = TenantProjectForm()
if request.method == 'GET':
# Initialize the API key
new_api_key = generate_api_key(prefix="EveAI")
form.unencrypted_api_key.data = new_api_key
form.visual_api_key.data = f"EVEAI-...{new_api_key[-4:]}"
if form.validate_on_submit():
new_tenant_project = TenantProject()
form.populate_obj(new_tenant_project)
new_tenant_project.tenant_id = session['tenant']['id']
new_tenant_project.encrypted_api_key = simple_encryption.encrypt_api_key(new_tenant_project.unencrypted_api_key)
set_logging_information(new_tenant_project, dt.now(tz.utc))
# Add new Tenant Project to the database
try:
db.session.add(new_tenant_project)
db.session.commit()
# Send email notification
services = [SERVICE_TYPES[service]['name']
for service in form.services.data
if service in SERVICE_TYPES]
email_sent = send_api_key_notification(
tenant_id=session['tenant']['id'],
tenant_name=session['tenant']['name'],
project_name=new_tenant_project.name,
api_key=new_tenant_project.unencrypted_api_key,
services=services,
responsible_email=form.responsible_email.data
)
if email_sent:
flash('Tenant Project created successfully and notification email sent.', 'success')
else:
flash('Tenant Project created successfully but failed to send notification email.', 'warning')
current_app.logger.info(f'Tenant Project {new_tenant_project.name} added for tenant '
f'{session['tenant']['id']}.')
return redirect(prefixed_url_for('user_bp.tenant_projects'))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to create Tenant Project. Error: {str(e)}', 'danger')
current_app.logger.error(f"Failed to create Tenant Project for tenant {session['tenant']['id']}. "
f"Error: {str(e)}")
return render_template('user/tenant_project.html', form=form)
@user_bp.route('/tenant_projects', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_projects():
tenant_id = session['tenant']['id']
config = get_tenant_projects_list_view(tenant_id)
return render_list_view('list_view.html', **config)
@user_bp.route('/handle_tenant_project_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_tenant_project_selection():
action = request.form.get('action')
if action == 'create_tenant_project':
return redirect(prefixed_url_for('user_bp.tenant_project'))
tenant_project_identification = request.form.get('selected_row')
tenant_project_id = ast.literal_eval(tenant_project_identification).get('value')
tenant_project = TenantProject.query.get_or_404(tenant_project_id)
if action == 'edit_tenant_project':
return redirect(prefixed_url_for('user_bp.edit_tenant_project', tenant_project_id=tenant_project_id))
elif action == 'invalidate_tenant_project':
tenant_project.active = False
try:
db.session.add(tenant_project)
db.session.commit()
flash('Tenant Project invalidated successfully.', 'success')
current_app.logger.info(f'Tenant Project {tenant_project.name} invalidated for tenant '
f'{session['tenant']['id']}.')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to invalidate Tenant Project {tenant_project.name}. Error: {str(e)}', 'danger')
current_app.logger.error(f"Failed to invalidate Tenant Project for tenant {session['tenant']['id']}. "
f"Error: {str(e)}")
elif action == 'delete_tenant_project':
return redirect(prefixed_url_for('user_bp.delete_tenant_project', tenant_project_id=tenant_project_id))
return redirect(prefixed_url_for('user_bp.tenant_projects'))
@user_bp.route('/tenant_project/<int:tenant_project_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_tenant_project(tenant_project_id):
tenant_project = TenantProject.query.get_or_404(tenant_project_id)
tenant_id = session['tenant']['id']
form = EditTenantProjectForm(obj=tenant_project)
if form.validate_on_submit():
form.populate_obj(tenant_project)
update_logging_information(tenant_project, dt.now(tz.utc))
try:
db.session.add(tenant_project)
db.session.commit()
flash('Tenant Project updated successfully.', 'success')
current_app.logger.info(f'Tenant Project {tenant_project.name} updated for tenant {tenant_id}.')
return redirect(prefixed_url_for('user_bp.tenant_projects'))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update Tenant Project. Error: {str(e)}', 'danger')
current_app.logger.error(f"Failed to update Tenant Project {tenant_project.name} for tenant {tenant_id}. ")
return render_template('user/edit_tenant.html', form=form, tenant_project_id=tenant_project_id)
@user_bp.route('/tenant_project/delete/<int:tenant_project_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def delete_tenant_project(tenant_project_id):
tenant_id = session['tenant']['id']
tenant_project = TenantProject.query.get_or_404(tenant_project_id)
# Ensure project belongs to current tenant
if tenant_project.tenant_id != tenant_id:
flash('You do not have permission to delete this project.', 'danger')
return redirect(prefixed_url_for('user_bp.tenant_projects'))
if request.method == 'GET':
return render_template('user/confirm_delete_tenant_project.html',
tenant_project=tenant_project)
try:
project_name = tenant_project.name
db.session.delete(tenant_project)
db.session.commit()
flash(f'Tenant Project "{project_name}" successfully deleted.', 'success')
current_app.logger.info(f'Tenant Project {project_name} deleted for tenant {tenant_id}')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to delete Tenant Project. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to delete Tenant Project {tenant_project_id}. Error: {str(e)}')
return redirect(prefixed_url_for('user_bp.tenant_projects'))
# Tenant Make Management --------------------------------------------------------------------------
@user_bp.route('/tenant_make', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_make():
form = TenantMakeForm()
customisation_config = cache_manager.customisations_config_cache.get_config("CHAT_CLIENT_CUSTOMISATION")
default_customisation_options = create_default_config_from_type_config(customisation_config["configuration"])
form.add_dynamic_fields("configuration", customisation_config, default_customisation_options)
if form.validate_on_submit():
tenant_id = session['tenant']['id']
new_tenant_make = TenantMake()
form.populate_obj(new_tenant_make)
new_tenant_make.tenant_id = tenant_id
customisation_options = form.get_dynamic_data("configuration")
new_tenant_make.chat_customisation_options = json.dumps(customisation_options)
# Verwerk allowed_languages als array
new_tenant_make.allowed_languages = form.allowed_languages.data if form.allowed_languages.data else None
set_logging_information(new_tenant_make, dt.now(tz.utc))
try:
db.session.add(new_tenant_make)
db.session.commit()
flash('Tenant Make successfully added!', 'success')
current_app.logger.info(f'Tenant Make {new_tenant_make.name} successfully added for tenant {tenant_id}!')
# Enable step 2 of creation of retriever - add configuration of the retriever (dependent on type)
return redirect(prefixed_url_for('user_bp.tenant_makes', tenant_make_id=new_tenant_make.id))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add Tenant Make. Error: {e}', 'danger')
current_app.logger.error(f'Failed to add Tenant Make {new_tenant_make.name}'
f'for tenant {tenant_id}. Error: {str(e)}')
return render_template('user/tenant_make.html', form=form)
@user_bp.route('/tenant_makes', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_makes():
tenant_id = session['tenant']['id']
config = get_tenant_makes_list_view(tenant_id)
return render_list_view('list_view.html', **config)
@user_bp.route('/tenant_make/<int:tenant_make_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_tenant_make(tenant_make_id):
"""Edit an existing tenant make configuration."""
# Get the tenant make or return 404
tenant_make = TenantMake.query.get_or_404(tenant_make_id)
# Create form instance with the tenant make
form = EditTenantMakeForm(request.form, obj=tenant_make)
# Initialiseer de allowed_languages selectie met huidige waarden
if request.method == 'GET':
if tenant_make.allowed_languages:
form.allowed_languages.data = tenant_make.allowed_languages
customisation_config = cache_manager.customisations_config_cache.get_config("CHAT_CLIENT_CUSTOMISATION")
form.add_dynamic_fields("configuration", customisation_config, tenant_make.chat_customisation_options)
if form.validate_on_submit():
# Update basic fields
form.populate_obj(tenant_make)
tenant_make.chat_customisation_options = form.get_dynamic_data("configuration")
# Verwerk allowed_languages als array
tenant_make.allowed_languages = form.allowed_languages.data if form.allowed_languages.data else None
# Update logging information
update_logging_information(tenant_make, dt.now(tz.utc))
# Save changes to database
try:
db.session.add(tenant_make)
db.session.commit()
flash('Tenant Make updated successfully!', 'success')
current_app.logger.info(f'Tenant Make {tenant_make.id} updated successfully')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update tenant make. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update tenant make {tenant_make_id}. Error: {str(e)}')
return render_template('user/edit_tenant_make.html', form=form, tenant_make_id=tenant_make_id)
return redirect(prefixed_url_for('user_bp.tenant_makes'))
else:
form_validation_failed(request, form)
return render_template('user/edit_tenant_make.html', form=form, tenant_make_id=tenant_make_id)
@user_bp.route('/handle_tenant_make_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_tenant_make_selection():
action = request.form['action']
if action == 'create_tenant_make':
return redirect(prefixed_url_for('user_bp.tenant_make'))
tenant_make_identification = request.form.get('selected_row')
tenant_make_id = ast.literal_eval(tenant_make_identification).get('value')
if action == 'edit_tenant_make':
return redirect(prefixed_url_for('user_bp.edit_tenant_make', tenant_make_id=tenant_make_id))
elif action == 'set_as_default':
# Set this make as the default for the tenant
tenant_id = session['tenant']['id']
tenant = Tenant.query.get(tenant_id)
tenant.default_tenant_make_id = tenant_make_id
try:
db.session.commit()
flash(f'Default tenant make updated successfully.', 'success')
# Update session data if necessary
if 'tenant' in session:
session['tenant'] = tenant.to_dict()
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update default tenant make. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update default tenant make. Error: {str(e)}')
# Altijd teruggaan naar de tenant_makes pagina
return redirect(prefixed_url_for('user_bp.tenant_makes'))
def reset_uniquifier(user):
security.datastore.set_uniquifier(user)
db.session.add(user)
db.session.commit()
send_reset_email(user)
def get_notification_email(tenant_id, user_email=None):
"""
Determine which email address to use for notification.
Priority: Provided email > Primary contact > Default email
"""
if user_email:
return user_email
# Try to find primary contact
primary_contact = User.query.filter_by(
tenant_id=tenant_id,
is_primary_contact=True
).first()
if primary_contact:
return primary_contact.email
return "pieter@askeveai.com"
def send_api_key_notification(tenant_id, tenant_name, project_name, api_key, services, responsible_email=None):
"""
Send API key notification email
"""
recipient_email = get_notification_email(tenant_id, responsible_email)
# Prepare email content
context = {
'tenant_id': tenant_id,
'tenant_name': tenant_name,
'project_name': project_name,
'api_key': api_key,
'services': services,
'year': dt.now(tz.utc).year,
'promo_image_url': current_app.config.get('PROMOTIONAL_IMAGE_URL',
'https://static.askeveai.com/promo/default.jpg')
}
try:
# Create email message
msg = send_email(
subject='Your new API-key from Ask Eve AI (Evie)',
html=render_template('email/api_key_notification.html', **context),
to_email=recipient_email,
to_name=recipient_email,
)
current_app.logger.info(f"API key notification sent to {recipient_email} for tenant {tenant_id}")
return True
except Exception as e:
current_app.logger.error(f"Failed to send API key notification email: {str(e)}")
return False