import uuid from datetime import datetime as dt, timezone as tz from flask import request, redirect, flash, render_template, Blueprint, session, current_app from flask_security import roles_accepted, current_user from sqlalchemy.exc import SQLAlchemyError, IntegrityError import ast from common.models.user import User, Tenant, Role, TenantDomain, TenantProject, PartnerTenant from common.extensions import db, security, minio_client, simple_encryption from common.services.user_services import UserServices from common.utils.security_utils import send_confirmation_email, send_reset_email from config.type_defs.service_types import SERVICE_TYPES from .user_forms import TenantForm, CreateUserForm, EditUserForm, TenantDomainForm, TenantSelectionForm, \ TenantProjectForm, EditTenantProjectForm from common.utils.database import Database from common.utils.view_assistants import prepare_table_for_macro, form_validation_failed from common.utils.simple_encryption import generate_api_key from common.utils.nginx_utils import prefixed_url_for from common.utils.eveai_exceptions import EveAIException from common.utils.document_utils import set_logging_information, update_logging_information from common.services.tenant_services import TenantServices from common.services.user_services import UserServices from common.utils.mail_utils import send_email user_bp = Blueprint('user_bp', __name__, url_prefix='/user') @user_bp.before_request def log_before_request(): current_app.logger.debug(f'Before request: {request.path} =====================================') @user_bp.after_request def log_after_request(response): return response @user_bp.route('/tenant', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin') def tenant(): if not UserServices.can_user_create_tenant(): current_app.logger.error(f'User {current_user.email} cannot create tenant') flash(f"You don't have the appropriate permissions to create a tenant", 'danger') return redirect(prefixed_url_for('user_bp.select_tenant')) form = TenantForm() if request.method == 'GET': code = f"TENANT-{str(uuid.uuid4())}" form.code.data = code if form.validate_on_submit(): # Handle the required attributes new_tenant = Tenant() form.populate_obj(new_tenant) timestamp = dt.now(tz.utc) new_tenant.created_at = timestamp new_tenant.updated_at = timestamp # Add the new tenant to the database and commit the changes try: db.session.add(new_tenant) db.session.commit() if current_user.has_roles('Partner Admin') and 'partner' in session: # Always associate with the partner for Partner Admins TenantServices.associate_tenant_with_partner(new_tenant.id) elif current_user.has_roles('Super User') and form.assign_to_partner.data and 'partner' in session: # Super User chose to associate with partner TenantServices.associate_tenant_with_partner(new_tenant.id) except IntegrityError as e: db.session.rollback() # Check for the specific error about duplicate tenant name if "tenant_name_key" in str(e) or "duplicate key value" in str(e): flash(f"A tenant with the name '{form.name.data}' already exists. Please choose a different name.", 'danger') else: current_app.logger.error(f'Failed to add tenant to database. Error: {str(e)}') flash(f'Failed to add tenant to database. Error: {str(e)}', 'danger') return render_template('user/tenant.html', form=form) except SQLAlchemyError as e: current_app.logger.error(f'Failed to add tenant to database. Error: {str(e)}') flash(f'Failed to add tenant to database. Error: {str(e)}', 'danger') return render_template('user/tenant.html', form=form) except EveAIException as e: current_app.logger.error(f'Error associating Tenant {new_tenant.id} to Partner. Error: {str(e)}') flash(f'Error associating Tenant to Partner. Error: {str(e)}', 'danger') return render_template('user/tenant.html', form=form) current_app.logger.info(f"Successfully created tenant {new_tenant.id} in Database") flash(f"Successfully created tenant {new_tenant.id} in Database", 'success') # Create schema for new tenant current_app.logger.info(f"Creating schema for tenant {new_tenant.id}") Database(new_tenant.id).create_tenant_schema() # Create MinIO bucket for new tenant current_app.logger.info(f"Creating MinIO bucket for tenant {new_tenant.id}") minio_client.create_tenant_bucket(new_tenant.id) return redirect(prefixed_url_for('user_bp.select_tenant')) else: form_validation_failed(request, form) return render_template('user/tenant.html', form=form) @user_bp.route('/tenant/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin') def edit_tenant(tenant_id): tenant = Tenant.query.get_or_404(tenant_id) # This will return a 404 if no tenant is found form = TenantForm(obj=tenant) if form.validate_on_submit(): # Populate the tenant with form data form.populate_obj(tenant) db.session.commit() flash('Tenant updated successfully.', 'success') if session.get('tenant'): if session['tenant'].get('id') == tenant_id: session['tenant'] = tenant.to_dict() # return redirect(url_for(f"user/tenant/tenant_id")) else: form_validation_failed(request, form) return render_template('user/tenant.html', form=form, tenant_id=tenant_id) @user_bp.route('/user', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin', 'Partner Admin') def user(): tenant_id = session.get('tenant').get('id') form = CreateUserForm() form.tenant_id.data = session.get('tenant').get('id') # It is only possible to create users for the session tenant if form.validate_on_submit(): current_app.logger.info(f"Adding User for tenant {session['tenant']['id']} ") new_user = User(user_name=form.user_name.data, email=form.email.data, first_name=form.first_name.data, last_name=form.last_name.data, valid_to=form.valid_to.data, tenant_id=form.tenant_id.data, fs_uniquifier=uuid.uuid4().hex, ) timestamp = dt.now(tz.utc) new_user.created_at = timestamp new_user.updated_at = timestamp # Add roles for role_id in form.roles.data: the_role = Role.query.get(role_id) new_user.roles.append(the_role) # Add the new user to the database and commit the changes try: db.session.add(new_user) db.session.commit() # security.datastore.set_uniquifier(new_user) try: send_confirmation_email(new_user) current_app.logger.info(f'User {new_user.id} with name {new_user.user_name} added to database' f'Confirmation email sent to {new_user.email}') flash('User added successfully and confirmation email sent.', 'success') except Exception as e: current_app.logger.error(f'Failed to send confirmation email to {new_user.email}. Error: {str(e)}') flash('User added successfully, but failed to send confirmation email. ' 'Please contact the administrator.', 'warning') return redirect(prefixed_url_for('user_bp.view_users')) except Exception as e: current_app.logger.error(f'Failed to add user with name {new_user.user_name}. Error: {str(e)}') db.session.rollback() flash(f'Failed to add user. Email or user name already exists.', 'danger') else: form_validation_failed(request, form) return render_template('user/user.html', form=form) @user_bp.route('/user/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin', 'Partner Admin') def edit_user(user_id): user = User.query.get_or_404(user_id) # This will return a 404 if no user is found form = EditUserForm(obj=user) if form.validate_on_submit(): # Populate the user with form data user.first_name = form.first_name.data user.last_name = form.last_name.data user.valid_to = form.valid_to.data user.updated_at = dt.now(tz.utc) # Update roles current_roles = set(role.id for role in user.roles) selected_roles = set(form.roles.data) if UserServices.validate_role_assignments(selected_roles): # Add new roles for role_id in selected_roles - current_roles: role = Role.query.get(role_id) if role: user.roles.append(role) # Remove unselected roles for role_id in current_roles - selected_roles: role = Role.query.get(role_id) if role: user.roles.remove(role) else: flash('Trying to assign unauthorized roles', 'danger') current_app.logger.error(f"Trying to assign unauthorized roles by user {user_id}," f"tenant {session['tenant']['id']}") return redirect(prefixed_url_for('user_bp.edit_user', user_id=user_id)) db.session.commit() flash('User updated successfully.', 'success') return redirect( prefixed_url_for('user_bp.edit_user', user_id=user.id)) # Assuming there's a user profile view to redirect to else: form_validation_failed(request, form) form.roles.data = [role.id for role in user.roles] return render_template('user/edit_user.html', form=form, user_id=user_id) @user_bp.route('/select_tenant', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin') # Allow both roles def select_tenant(): filter_form = TenantSelectionForm(request.form) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # Start with a base query query = Tenant.query current_app.logger.debug("We proberen het scherm op te bouwen") current_app.logger.debug(f"Session: {session}") # Apply different filters based on user role if current_user.has_roles('Partner Admin') and 'partner' in session: current_app.logger.debug("We zitten in partner mode") # Get the partner's management service management_service = next((service for service in session['partner']['services'] if service.get('type') == 'MANAGEMENT_SERVICE'), None) if management_service: # Get the partner's own tenant partner_tenant_id = session['partner']['tenant_id'] # Get tenants managed by this partner through PartnerTenant relationships managed_tenant_ids = db.session.query(PartnerTenant.tenant_id).filter_by( partner_service_id=management_service['id'] ).all() # Convert list of tuples to flat list managed_tenant_ids = [tenant_id for (tenant_id,) in managed_tenant_ids] # Include partner's own tenant in the list allowed_tenant_ids = [partner_tenant_id] + managed_tenant_ids # Filter query to only show allowed tenants query = query.filter(Tenant.id.in_(allowed_tenant_ids)) current_app.logger.debug("We zitten na partner service selectie") # Apply form filters (for both Super User and Partner Admin) if filter_form.validate_on_submit(): if filter_form.types.data: query = query.filter(Tenant.type.in_(filter_form.types.data)) if filter_form.search.data: search = f"%{filter_form.search.data}%" query = query.filter(Tenant.name.ilike(search)) # Finalize query query = query.order_by(Tenant.name) pagination = query.paginate(page=page, per_page=per_page, error_out=False) tenants = pagination.items rows = prepare_table_for_macro(tenants, [('id', ''), ('name', ''), ('website', ''), ('type', '')]) return render_template('user/select_tenant.html', rows=rows, pagination=pagination, filter_form=filter_form) @user_bp.route('/handle_tenant_selection', methods=['POST']) @roles_accepted('Super User', 'Partner Admin') def handle_tenant_selection(): action = request.form['action'] if action == 'create_tenant': return redirect(prefixed_url_for('user_bp.tenant')) tenant_identification = request.form['selected_row'] tenant_id = ast.literal_eval(tenant_identification).get('value') if not UserServices.can_user_edit_tenant(tenant_id): current_app.logger.info(f"User not authenticated to edit tenant {tenant_id}.") flash(f"You are not authenticated to manage tenant {tenant_id}", 'danger') return redirect(prefixed_url_for('user_bp.select_tenant')) the_tenant = Tenant.query.get(tenant_id) # set tenant information in the session session['tenant'] = the_tenant.to_dict() session['default_language'] = the_tenant.default_language session['llm_model'] = the_tenant.llm_model # remove catalog-related items from the session session.pop('catalog_id', None) session.pop('catalog_name', None) match action: case 'edit_tenant': return redirect(prefixed_url_for('user_bp.edit_tenant', tenant_id=tenant_id)) case 'select_tenant': return redirect(prefixed_url_for('user_bp.tenant_overview')) # Add more conditions for other actions return redirect(prefixed_url_for('select_tenant')) @user_bp.route('/view_users') @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def view_users(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) tenant_id = session.get('tenant').get('id') query = User.query.filter_by(tenant_id=tenant_id).order_by(User.user_name) pagination = query.paginate(page=page, per_page=per_page) users = pagination.items # prepare table data rows = prepare_table_for_macro(users, [('id', ''), ('user_name', ''), ('email', '')]) # Render the users in a template return render_template('user/view_users.html', rows=rows, pagination=pagination) @user_bp.route('/handle_user_action', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_user_action(): action = request.form['action'] if action == 'create_user': return redirect(prefixed_url_for('user_bp.user')) user_identification = request.form['selected_row'] user_id = ast.literal_eval(user_identification).get('value') user = User.query.get_or_404(user_id) if action == 'edit_user': return redirect(prefixed_url_for('user_bp.edit_user', user_id=user_id)) elif action == 'resend_confirmation_email': send_confirmation_email(user) flash(f'Confirmation email sent to {user.email}.', 'success') elif action == 'send_password_reset_email': send_reset_email(user) flash(f'Password reset email sent to {user.email}.', 'success') elif action == 'reset_uniquifier': reset_uniquifier(user) flash(f'Uniquifier reset for {user.user_name}.', 'success') return redirect(prefixed_url_for('user_bp.view_users')) @user_bp.route('/view_tenant_domains') @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def view_tenant_domains(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) tenant_id = session.get('tenant').get('id') query = TenantDomain.query.filter_by(tenant_id=tenant_id).order_by(TenantDomain.domain) pagination = query.paginate(page=page, per_page=per_page) tenant_domains = pagination.items # prepare table data rows = prepare_table_for_macro(tenant_domains, [('id', ''), ('domain', ''), ('valid_to', '')]) # Render the users in a template return render_template('user/view_tenant_domains.html', rows=rows, pagination=pagination) @user_bp.route('/handle_tenant_domain_action', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_tenant_domain_action(): action = request.form['action'] if action == 'create_tenant_domain': return redirect(prefixed_url_for('user_bp.tenant_domain')) tenant_domain_identification = request.form['selected_row'] tenant_domain_id = ast.literal_eval(tenant_domain_identification).get('value') if action == 'edit_tenant_domain': return redirect(prefixed_url_for('user_bp.edit_tenant_domain', tenant_domain_id=tenant_domain_id)) # Add more conditions for other actions return redirect(prefixed_url_for('view_tenant_domains')) @user_bp.route('/tenant_domain', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def tenant_domain(): form = TenantDomainForm() if form.validate_on_submit(): new_tenant_domain = TenantDomain() form.populate_obj(new_tenant_domain) new_tenant_domain.tenant_id = session['tenant']['id'] set_logging_information(new_tenant_domain, dt.now(tz.utc)) # Add the new user to the database and commit the changes try: db.session.add(new_tenant_domain) db.session.commit() flash('Tenant Domain added successfully.', 'success') current_app.logger.info( f'Tenant Domain {new_tenant_domain.domain} added for tenant {session["tenant"]["id"]}') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to add Tenant Domain. Error: {str(e)}', 'danger') current_app.logger.error(f'Failed to create Tenant Domain {new_tenant_domain.domain}. ' f'for tenant {session["tenant"]["id"]}' f'Error: {str(e)}') else: flash('Please fill in all required fields.', 'information') return render_template('user/tenant_domain.html', form=form) @user_bp.route('/tenant_domain/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def edit_tenant_domain(tenant_domain_id): tenant_domain = TenantDomain.query.get_or_404(tenant_domain_id) # This will return a 404 if no user is found form = TenantDomainForm(obj=tenant_domain) if request.method == 'POST' and form.validate_on_submit(): form.populate_obj(tenant_domain) update_logging_information(tenant_domain, dt.now(tz.utc)) try: db.session.add(tenant_domain) db.session.commit() flash('Tenant Domain updated successfully.', 'success') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to update Tenant Domain. Error: {str(e)}', 'danger') current_app.logger.error(f'Failed to update Tenant Domain {tenant_domain.id}. ' f'for tenant {session["tenant"]["id"]}' f'Error: {str(e)}') return redirect( prefixed_url_for('user_bp.view_tenant_domains', tenant_id=session['tenant']['id'])) # Assuming there's a user profile view to redirect to else: form_validation_failed(request, form) return render_template('user/edit_tenant_domain.html', form=form, tenant_domain_id=tenant_domain_id) @user_bp.route('/tenant_overview', methods=['GET']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def tenant_overview(): tenant_id = session['tenant']['id'] tenant = Tenant.query.get_or_404(tenant_id) form = TenantForm(obj=tenant) return render_template('user/tenant_overview.html', form=form) @user_bp.route('/tenant_project', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def tenant_project(): form = TenantProjectForm() if request.method == 'GET': # Initialize the API key new_api_key = generate_api_key(prefix="EveAI") form.unencrypted_api_key.data = new_api_key form.visual_api_key.data = f"EVEAI-...{new_api_key[-4:]}" if form.validate_on_submit(): new_tenant_project = TenantProject() form.populate_obj(new_tenant_project) new_tenant_project.tenant_id = session['tenant']['id'] new_tenant_project.encrypted_api_key = simple_encryption.encrypt_api_key(new_tenant_project.unencrypted_api_key) set_logging_information(new_tenant_project, dt.now(tz.utc)) # Add new Tenant Project to the database try: db.session.add(new_tenant_project) db.session.commit() # Send email notification services = [SERVICE_TYPES[service]['name'] for service in form.services.data if service in SERVICE_TYPES] email_sent = send_api_key_notification( tenant_id=session['tenant']['id'], tenant_name=session['tenant']['name'], project_name=new_tenant_project.name, api_key=new_tenant_project.unencrypted_api_key, services=services, responsible_email=form.responsible_email.data ) if email_sent: flash('Tenant Project created successfully and notification email sent.', 'success') else: flash('Tenant Project created successfully but failed to send notification email.', 'warning') current_app.logger.info(f'Tenant Project {new_tenant_project.name} added for tenant ' f'{session['tenant']['id']}.') return redirect(prefixed_url_for('user_bp.tenant_projects')) except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to create Tenant Project. Error: {str(e)}', 'danger') current_app.logger.error(f"Failed to create Tenant Project for tenant {session['tenant']['id']}. " f"Error: {str(e)}") return render_template('user/tenant_project.html', form=form) @user_bp.route('/tenant_projects', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def tenant_projects(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) tenant_id = session['tenant']['id'] query = TenantProject.query.filter_by(tenant_id=tenant_id).order_by(TenantProject.id) pagination = query.paginate(page=page, per_page=per_page) the_tenant_projects = pagination.items # prepare table data rows = prepare_table_for_macro(the_tenant_projects, [('id', ''), ('name', ''), ('visual_api_key', ''), ('responsible_email', ''), ('active', '')]) # Render the catalogs in a template return render_template('user/tenant_projects.html', rows=rows, pagination=pagination) @user_bp.route('/handle_tenant_project_selection', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_tenant_project_selection(): action = request.form.get('action') if action == 'create_tenant_project': return redirect(prefixed_url_for('user_bp.tenant_project')) tenant_project_identification = request.form.get('selected_row') tenant_project_id = ast.literal_eval(tenant_project_identification).get('value') tenant_project = TenantProject.query.get_or_404(tenant_project_id) if action == 'edit_tenant_project': return redirect(prefixed_url_for('user_bp.edit_tenant_project', tenant_project_id=tenant_project_id)) elif action == 'invalidate_tenant_project': tenant_project.active = False try: db.session.add(tenant_project) db.session.commit() flash('Tenant Project invalidated successfully.', 'success') current_app.logger.info(f'Tenant Project {tenant_project.name} invalidated for tenant ' f'{session['tenant']['id']}.') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to invalidate Tenant Project {tenant_project.name}. Error: {str(e)}', 'danger') current_app.logger.error(f"Failed to invalidate Tenant Project for tenant {session['tenant']['id']}. " f"Error: {str(e)}") elif action == 'delete_tenant_project': return redirect(prefixed_url_for('user_bp.delete_tenant_project', tenant_project_id=tenant_project_id)) return redirect(prefixed_url_for('user_bp.tenant_projects')) @user_bp.route('/tenant_project/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def edit_tenant_project(tenant_project_id): tenant_project = TenantProject.query.get_or_404(tenant_project_id) tenant_id = session['tenant']['id'] form = EditTenantProjectForm(obj=tenant_project) if form.validate_on_submit(): form.populate_obj(tenant_project) update_logging_information(tenant_project, dt.now(tz.utc)) try: db.session.add(tenant_project) db.session.commit() flash('Tenant Project updated successfully.', 'success') current_app.logger.info(f'Tenant Project {tenant_project.name} updated for tenant {tenant_id}.') return redirect(prefixed_url_for('user_bp.tenant_projects')) except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to update Tenant Project. Error: {str(e)}', 'danger') current_app.logger.error(f"Failed to update Tenant Project {tenant_project.name} for tenant {tenant_id}. ") return render_template('user/edit_tenant.html', form=form, tenant_project_id=tenant_project_id) @user_bp.route('/tenant_project/delete/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def delete_tenant_project(tenant_project_id): tenant_id = session['tenant']['id'] tenant_project = TenantProject.query.get_or_404(tenant_project_id) # Ensure project belongs to current tenant if tenant_project.tenant_id != tenant_id: flash('You do not have permission to delete this project.', 'danger') return redirect(prefixed_url_for('user_bp.tenant_projects')) if request.method == 'GET': return render_template('user/confirm_delete_tenant_project.html', tenant_project=tenant_project) try: project_name = tenant_project.name db.session.delete(tenant_project) db.session.commit() flash(f'Tenant Project "{project_name}" successfully deleted.', 'success') current_app.logger.info(f'Tenant Project {project_name} deleted for tenant {tenant_id}') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to delete Tenant Project. Error: {str(e)}', 'danger') current_app.logger.error(f'Failed to delete Tenant Project {tenant_project_id}. Error: {str(e)}') return redirect(prefixed_url_for('user_bp.tenant_projects')) def reset_uniquifier(user): security.datastore.set_uniquifier(user) db.session.add(user) db.session.commit() send_reset_email(user) def get_notification_email(tenant_id, user_email=None): """ Determine which email address to use for notification. Priority: Provided email > Primary contact > Default email """ if user_email: return user_email # Try to find primary contact primary_contact = User.query.filter_by( tenant_id=tenant_id, is_primary_contact=True ).first() if primary_contact: return primary_contact.email return "pieter@askeveai.com" def send_api_key_notification(tenant_id, tenant_name, project_name, api_key, services, responsible_email=None): """ Send API key notification email """ recipient_email = get_notification_email(tenant_id, responsible_email) # Prepare email content context = { 'tenant_id': tenant_id, 'tenant_name': tenant_name, 'project_name': project_name, 'api_key': api_key, 'services': services, 'year': dt.now(tz.utc).year, 'promo_image_url': current_app.config.get('PROMOTIONAL_IMAGE_URL', 'https://static.askeveai.com/promo/default.jpg') } try: # Create email message msg = send_email( subject='Your new API-key from Ask Eve AI (Evie)', html=render_template('email/api_key_notification.html', **context), to_email=recipient_email, to_name=recipient_email, ) current_app.logger.info(f"API key notification sent to {recipient_email} for tenant {tenant_id}") return True except Exception as e: current_app.logger.error(f"Failed to send API key notification email: {str(e)}") return False