- Add 'Partner Admin' role to actual functionality in eveai_app

This commit is contained in:
Josako
2025-04-15 17:12:46 +02:00
parent 3eed546879
commit 5f58417d24
12 changed files with 281 additions and 135 deletions

View File

@@ -1,14 +1,12 @@
# from . import user_bp
import uuid
from datetime import datetime as dt, timezone as tz
from flask import request, redirect, flash, render_template, Blueprint, session, current_app, jsonify
from flask import request, redirect, flash, render_template, Blueprint, session, current_app
from flask_mailman import EmailMessage
from flask_security import hash_password, roles_required, roles_accepted, current_user
from itsdangerous import URLSafeTimedSerializer
from flask_security import roles_accepted, current_user
from sqlalchemy.exc import SQLAlchemyError
import ast
from common.models.user import User, Tenant, Role, TenantDomain, TenantProject, Partner
from common.models.user import User, Tenant, Role, TenantDomain, TenantProject, PartnerTenant
from common.extensions import db, security, minio_client, simple_encryption
from common.services.user_service import UserService
from common.utils.security_utils import send_confirmation_email, send_reset_email
@@ -19,8 +17,9 @@ from common.utils.database import Database
from common.utils.view_assistants import prepare_table_for_macro, form_validation_failed
from common.utils.simple_encryption import generate_api_key
from common.utils.nginx_utils import prefixed_url_for
from common.utils.eveai_exceptions import EveAIDoublePartner, EveAIException
from common.utils.eveai_exceptions import EveAIException
from common.utils.document_utils import set_logging_information, update_logging_information
from common.services.tenant_service import TenantService
user_bp = Blueprint('user_bp', __name__, url_prefix='/user')
@@ -36,7 +35,7 @@ def log_after_request(response):
@user_bp.route('/tenant', methods=['GET', 'POST'])
@roles_required('Super User')
@roles_accepted('Super User', 'Partner Admin')
def tenant():
form = TenantForm()
if request.method == 'GET':
@@ -48,7 +47,6 @@ def tenant():
new_tenant = Tenant()
form.populate_obj(new_tenant)
# Handle Timestamps
timestamp = dt.now(tz.utc)
new_tenant.created_at = timestamp
new_tenant.updated_at = timestamp
@@ -57,11 +55,24 @@ def tenant():
try:
db.session.add(new_tenant)
db.session.commit()
if current_user.has_roles('Partner Admin') and 'partner' in session:
# Always associate with the partner for Partner Admins
TenantService.associate_tenant_with_partner(new_tenant.id)
elif current_user.has_roles('Super User') and form.assign_to_partner.data and 'partner' in session:
# Super User chose to associate with partner
TenantService.associate_tenant_with_partner(new_tenant.id)
except SQLAlchemyError as e:
current_app.logger.error(f'Failed to add tenant to database. Error: {str(e)}')
flash(f'Failed to add tenant to database. Error: {str(e)}', 'danger')
return render_template('user/tenant.html', form=form)
except EveAIException as e:
current_app.logger.error(f'Error associating Tenant {new_tenant.id} to Partner. Error: {str(e)}')
flash(f'Error associating Tenant to Partner. Error: {str(e)}', 'danger')
return render_template('user/tenant.html', form=form)
current_app.logger.info(f"Successfully created tenant {new_tenant.id} in Database")
flash(f"Successfully created tenant {new_tenant.id} in Database", 'success')
@@ -81,15 +92,11 @@ def tenant():
@user_bp.route('/tenant/<int:tenant_id>', methods=['GET', 'POST'])
@roles_required('Super User')
@roles_accepted('Super User', 'Partner Admin')
def edit_tenant(tenant_id):
tenant = Tenant.query.get_or_404(tenant_id) # This will return a 404 if no tenant is found
form = TenantForm(obj=tenant)
if request.method == 'GET':
# Populate the form with tenant data
form.populate_obj(tenant)
if form.validate_on_submit():
# Populate the tenant with form data
form.populate_obj(tenant)
@@ -109,6 +116,7 @@ def edit_tenant(tenant_id):
@user_bp.route('/user', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin', 'Partner Admin')
def user():
tenant_id = session.get('tenant').get('id')
form = CreateUserForm()
form.tenant_id.data = session.get('tenant').get('id') # It is only possible to create users for the session tenant
if form.validate_on_submit():
@@ -141,7 +149,7 @@ def user():
try:
send_confirmation_email(new_user)
current_app.logger.info(f'User {new_user.id} with name {new_user.user_name} added to database'
f'Confirmation email sent to {new_user.email}')
f'Confirmation email sent to {new_user.email}')
flash('User added successfully and confirmation email sent.', 'success')
except Exception as e:
current_app.logger.error(f'Failed to send confirmation email to {new_user.email}. Error: {str(e)}')
@@ -205,14 +213,40 @@ def edit_user(user_id):
@user_bp.route('/select_tenant', methods=['GET', 'POST'])
@roles_required('Super User')
@roles_accepted('Super User', 'Partner Admin') # Allow both roles
def select_tenant():
filter_form = TenantSelectionForm(request.form)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# Start with a base query
query = Tenant.query
# Apply different filters based on user role
if current_user.has_roles('Partner Admin') and 'partner' in session:
# Get the partner's management service
management_service = next((service for service in session['partner']['services']
if service.get('type') == 'MANAGEMENT_SERVICE'), None)
if management_service:
# Get the partner's own tenant
partner_tenant_id = session['partner']['tenant_id']
# Get tenants managed by this partner through PartnerTenant relationships
managed_tenant_ids = db.session.query(PartnerTenant.tenant_id).filter_by(
partner_service_id=management_service['id']
).all()
# Convert list of tuples to flat list
managed_tenant_ids = [tenant_id for (tenant_id,) in managed_tenant_ids]
# Include partner's own tenant in the list
allowed_tenant_ids = [partner_tenant_id] + managed_tenant_ids
# Filter query to only show allowed tenants
query = query.filter(Tenant.id.in_(allowed_tenant_ids))
# Apply form filters (for both Super User and Partner Admin)
if filter_form.validate_on_submit():
if filter_form.types.data:
query = query.filter(Tenant.type.in_(filter_form.types.data))
@@ -220,6 +254,7 @@ def select_tenant():
search = f"%{filter_form.search.data}%"
query = query.filter(Tenant.name.ilike(search))
# Finalize query
query = query.order_by(Tenant.name)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
tenants = pagination.items
@@ -230,7 +265,7 @@ def select_tenant():
@user_bp.route('/handle_tenant_selection', methods=['POST'])
@roles_required('Super User')
@roles_accepted('Super User', 'Partner Admin')
def handle_tenant_selection():
action = request.form['action']
if action == 'create_tenant':
@@ -238,6 +273,10 @@ def handle_tenant_selection():
tenant_identification = request.form['selected_row']
tenant_id = ast.literal_eval(tenant_identification).get('value')
if not TenantService.can_user_edit_tenant(tenant_id):
current_app.logger.info(f"User not authenticated to edit tenant {tenant_id}.")
flash(f"You are not authenticated to manage tenant {tenant_id}", 'danger')
return redirect(prefixed_url_for('select_tenant'))
the_tenant = Tenant.query.get(tenant_id)
# set tenant information in the session
@@ -259,7 +298,7 @@ def handle_tenant_selection():
@user_bp.route('/view_users')
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def view_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
@@ -279,7 +318,7 @@ def view_users():
@user_bp.route('/handle_user_action', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_user_action():
action = request.form['action']
if action == 'create_user':
@@ -305,7 +344,7 @@ def handle_user_action():
@user_bp.route('/view_tenant_domains')
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def view_tenant_domains():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
@@ -324,7 +363,7 @@ def view_tenant_domains():
@user_bp.route('/handle_tenant_domain_action', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_tenant_domain_action():
action = request.form['action']
if action == 'create_tenant_domain':
@@ -340,7 +379,7 @@ def handle_tenant_domain_action():
@user_bp.route('/tenant_domain', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_domain():
form = TenantDomainForm()
if form.validate_on_submit():
@@ -354,7 +393,8 @@ def tenant_domain():
db.session.add(new_tenant_domain)
db.session.commit()
flash('Tenant Domain added successfully.', 'success')
current_app.logger.info(f'Tenant Domain {new_tenant_domain.domain} added for tenant {session["tenant"]["id"]}')
current_app.logger.info(
f'Tenant Domain {new_tenant_domain.domain} added for tenant {session["tenant"]["id"]}')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add Tenant Domain. Error: {str(e)}', 'danger')
@@ -368,7 +408,7 @@ def tenant_domain():
@user_bp.route('/tenant_domain/<int:tenant_domain_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_tenant_domain(tenant_domain_id):
tenant_domain = TenantDomain.query.get_or_404(tenant_domain_id) # This will return a 404 if no user is found
form = TenantDomainForm(obj=tenant_domain)
@@ -396,7 +436,7 @@ def edit_tenant_domain(tenant_domain_id):
@user_bp.route('/tenant_overview', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_overview():
tenant_id = session['tenant']['id']
tenant = Tenant.query.get_or_404(tenant_id)
@@ -405,7 +445,7 @@ def tenant_overview():
@user_bp.route('/tenant_project', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_project():
form = TenantProjectForm()
if request.method == 'GET':
@@ -458,7 +498,7 @@ def tenant_project():
@user_bp.route('/tenant_projects', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def tenant_projects():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
@@ -478,7 +518,7 @@ def tenant_projects():
@user_bp.route('/handle_tenant_project_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_tenant_project_selection():
action = request.form.get('action')
if action == 'create_tenant_project':
@@ -508,8 +548,8 @@ def handle_tenant_project_selection():
return redirect(prefixed_url_for('user_bp.tenant_projects'))
@user_bp.route('/tenant_project/<int:tenant_project_id>', methods=['GET','POST'])
@roles_accepted('Super User', 'Tenant Admin')
@user_bp.route('/tenant_project/<int:tenant_project_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_tenant_project(tenant_project_id):
tenant_project = TenantProject.query.get_or_404(tenant_project_id)
tenant_id = session['tenant']['id']
@@ -535,7 +575,7 @@ def edit_tenant_project(tenant_project_id):
@user_bp.route('/tenant_project/delete/<int:tenant_project_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def delete_tenant_project(tenant_project_id):
tenant_id = session['tenant']['id']
tenant_project = TenantProject.query.get_or_404(tenant_project_id)
@@ -570,18 +610,6 @@ def reset_uniquifier(user):
send_reset_email(user)
def set_logging_information(obj, timestamp):
obj.created_at = timestamp
obj.updated_at = timestamp
obj.created_by = current_user.id
obj.updated_by = current_user.id
def update_logging_information(obj, timestamp):
obj.updated_at = timestamp
obj.updated_by = current_user.id
def get_notification_email(tenant_id, user_email=None):
"""
Determine which email address to use for notification.