- Check for consent before allowing users to perform activities in the administrative app.

This commit is contained in:
Josako
2025-10-14 16:20:30 +02:00
parent 37819cd7e5
commit 3ea3a06de6
11 changed files with 316 additions and 23 deletions

View File

@@ -1,5 +1,5 @@
# views/security_views.py
from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session
from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session, jsonify
from flask_security import current_user, login_required, login_user, logout_user
from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password
from flask_security.forms import LoginForm
@@ -11,7 +11,7 @@ from itsdangerous import URLSafeTimedSerializer
from sqlalchemy.exc import SQLAlchemyError
from common.models.user import User, ConsentStatus
from common.services.user import TenantServices
from common.services.user import TenantServices, UserServices
from common.utils.eveai_exceptions import EveAIException, EveAINoActiveLicense
from common.utils.nginx_utils import prefixed_url_for
from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, ForgotPasswordForm
@@ -59,22 +59,8 @@ def login():
return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True))
if current_user.has_roles('Partner Admin'):
return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True))
consent_status = TenantServices.get_consent_status(user.tenant_id)
match consent_status:
case ConsentStatus.CONSENTED:
return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True))
case ConsentStatus.NOT_CONSENTED:
if current_user.has_roles('Tenant Admin'):
return redirect(prefixed_url_for('user_bp.tenant_consent', for_redirect=True))
else:
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
case ConsentStatus.RENEWAL_REQUIRED:
if current_user.has_roles('Tenant Admin'):
return redirect(prefixed_url_for('user_bp.tenant_consent_renewal', for_redirect=True))
else:
return redirect(prefixed_url_for('user_bp.consent_renewal', for_redirect=True))
case _:
return redirect(prefixed_url_for('basic_bp.index', for_redirect=True))
# After login, rely on global consent guard; just go to default start
return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True))
else:
flash('Invalid username or password', 'danger')
current_app.logger.error(f'Invalid username or password for given email: {user.email}')
@@ -160,6 +146,102 @@ def reset_password(token):
return render_template('security/reset_password.html', reset_password_form=form)
@security_bp.route('/consent/sign', methods=['POST'])
@login_required
def consent_sign():
try:
# Determine tenant context
tenant_id = None
# Payload may provide a tenant_id for admins signing for others
if request.is_json:
payload = request.get_json(silent=True) or {}
tenant_id = payload.get('tenant_id')
consent_data = payload.get('consent_data', {})
else:
tenant_id = request.form.get('tenant_id')
consent_data = {}
if tenant_id is None:
# default to user's tenant (Tenant Admin)
tenant_id = current_user.tenant_id
tenant_id = int(tenant_id)
# Authorization
allowed = False
if current_user.has_roles('Super User'):
allowed = True
elif current_user.has_roles('Partner Admin') and UserServices.can_user_edit_tenant(tenant_id):
allowed = True
elif current_user.has_roles('Tenant Admin') and getattr(current_user, 'tenant_id', None) == tenant_id:
allowed = True
if not allowed:
abort(403)
# Determine consent versions/types to record
cts = current_app.config.get('CONSENT_TYPES', [])
from common.models.user import TenantConsent, ConsentVersion, PartnerService
from common.services.user.partner_services import PartnerServices
# Resolve partner and management service if available in session (for Partner Admin)
partner_id = None
partner_service_id = None
try:
if 'partner' in session and session['partner'].get('services'):
partner_id = session['partner'].get('id')
mgmt = PartnerServices.get_management_service()
if mgmt:
partner_service_id = mgmt.get('id')
except Exception:
pass
# Fallbacks if not Partner Admin context
if partner_id is None:
# Try find partner by tenant (one-to-one in model)
from common.models.user import Partner, Tenant
t = Tenant.query.get(tenant_id)
if t and t.partner:
partner_id = t.partner.id
if partner_service_id is None and partner_id is not None:
ps = PartnerService.query.filter_by(partner_id=partner_id, type='MANAGEMENT_SERVICE').first()
if ps:
partner_service_id = ps.id
# For each consent type, record acceptance of latest version
now = dt.now(tz.utc)
for ct in cts:
cv = ConsentVersion.query.filter_by(consent_type=ct).order_by(ConsentVersion.consent_valid_from.desc()).first()
if not cv:
current_app.logger.error(f'No ConsentVersion found for type {ct}; skipping')
continue
tc = TenantConsent(
tenant_id=tenant_id,
partner_id=partner_id or 0,
partner_service_id=partner_service_id or 0,
user_id=current_user.id,
consent_type=ct,
consent_version=cv.consent_version,
consent_data=consent_data or {}
)
db.session.add(tc)
db.session.commit()
if request.is_json or 'application/json' in request.headers.get('Accept', ''):
return jsonify({'ok': True, 'tenant_id': tenant_id}), 200
# Default UX: go to overview
return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True))
except CSRFError:
if request.is_json:
return jsonify({'ok': False, 'error': 'csrf_error'}), 400
flash('Session expired. Please retry.', 'danger')
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
except Exception as e:
current_app.logger.error(f'Consent signing failed: {e}')
db.session.rollback()
if request.is_json:
return jsonify({'ok': False, 'error': str(e)}), 400
flash('Failed to sign consent.', 'danger')
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))