from flask import current_app, render_template, request, redirect, session, flash from flask_security import current_user from itsdangerous import URLSafeTimedSerializer from common.models.user import Role, ConsentStatus from common.utils.nginx_utils import prefixed_url_for from common.utils.mail_utils import send_email def confirm_token(token, expiration=3600): serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) try: email = serializer.loads(token, salt=current_app.config['SECURITY_PASSWORD_SALT'], max_age=expiration) except Exception as e: current_app.logger.error(f'Error confirming token: {e}') raise return email def generate_reset_token(email): serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT']) def generate_confirmation_token(email): serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT']) def send_confirmation_email(user): token = generate_confirmation_token(user.email) confirm_url = prefixed_url_for('security_bp.confirm_email', token=token, _external=True) html = render_template('email/activate.html', confirm_url=confirm_url) subject = "Please confirm your email" try: send_email(user.email, f"{user.first_name} {user.last_name}", "Confirm your email", html) current_app.logger.info(f'Confirmation email sent to {user.email} with url: {confirm_url}') except Exception as e: current_app.logger.error(f'Failed to send confirmation email to {user.email}. Error: {str(e)}') raise def send_reset_email(user): token = generate_reset_token(user.email) reset_url = prefixed_url_for('security_bp.reset_password', token=token, _external=True) html = render_template('email/reset_password.html', reset_url=reset_url) subject = "Reset Your Password" try: send_email(user.email, f"{user.first_name} {user.last_name}", subject, html) current_app.logger.info(f'Reset email sent to {user.email} with url: {reset_url}') except Exception as e: current_app.logger.error(f'Failed to send reset email to {user.email}. Error: {str(e)}') raise def get_current_user_roles(): """Get the roles of the currently authenticated user. Returns: List of Role objects or empty list if no user is authenticated """ if current_user.is_authenticated: return current_user.roles return [] def current_user_has_role(role_name): """Check if the current user has the specified role. Args: role_name (str): Name of the role to check Returns: bool: True if user has the role, False otherwise """ if not current_user.is_authenticated: return False return any(role.name == role_name for role in current_user.roles) def current_user_roles(): """Get the roles of the currently authenticated user. Returns: List of Role objects or empty list if no user is authenticated """ if current_user.is_authenticated: return current_user.roles return [] def all_user_roles(): roles = [(role.id, role.name) for role in Role.query.all()] def is_exempt_endpoint(endpoint: str) -> bool: """Check if the endpoint is exempt from consent guard""" if not endpoint: return False cfg = current_app.config or {} endpoints_cfg = set(cfg.get('CONSENT_GUARD_EXEMPT_ENDPOINTS', [])) prefix_cfg = list(cfg.get('CONSENT_GUARD_EXEMPT_PREFIXES', [])) default_endpoints = { 'security_bp.login', 'security_bp.logout', 'security_bp.confirm_email', 'security_bp.forgot_password', 'security_bp.reset_password', 'security_bp.reset_password_request', 'user_bp.tenant_consent', 'user_bp.no_consent', 'user_bp.tenant_consent_renewal', 'user_bp.consent_renewal', 'user_bp.view_tenant_consents', 'user_bp.accept_tenant_consent', 'user_bp.view_consent_markdown', 'basic_bp.view_content', } default_prefixes = [ 'security_bp.', 'healthz_bp.', ] endpoints = default_endpoints.union(endpoints_cfg) prefixes = default_prefixes + [p for p in prefix_cfg if isinstance(p, str)] for p in prefixes: if endpoint.startswith(p): return True if endpoint in endpoints: return True return False def enforce_tenant_consent_ui(): """Check if the user has consented to the terms of service""" path = getattr(request, 'path', '') or '' if path.startswith('/healthz') or path.startswith('/_healthz'): return None if not current_user.is_authenticated: return None endpoint = request.endpoint or '' if is_exempt_endpoint(endpoint) or request.method == 'OPTIONS': return None # Global bypass: Super User and Partner Admin always allowed if current_user.has_roles('Super User') or current_user.has_roles('Partner Admin'): return None tenant_id = getattr(current_user, 'tenant_id', None) if not tenant_id: tenant_id = session.get('tenant', {}).get('id') if session.get('tenant') else None if not tenant_id: return redirect(prefixed_url_for('security_bp.login', for_redirect=True)) raw_status = session.get('consent_status', ConsentStatus.NOT_CONSENTED) # Coerce string to ConsentStatus enum if needed status = raw_status try: if isinstance(raw_status, str): # Accept formats like 'CONSENTED' or 'ConsentStatus.CONSENTED' name = raw_status.split('.')[-1] from common.models.user import ConsentStatus as CS status = getattr(CS, name, CS.NOT_CONSENTED) except Exception: status = ConsentStatus.NOT_CONSENTED if status == ConsentStatus.CONSENTED: return None if status == ConsentStatus.NOT_CONSENTED: if current_user.has_roles('Tenant Admin'): return redirect(prefixed_url_for('user_bp.tenant_consent', for_redirect=True)) return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True)) if status == ConsentStatus.RENEWAL_REQUIRED: if current_user.has_roles('Tenant Admin'): flash( "You need to renew your consent to our DPA or T&Cs. Failing to do so in time will stop you from accessing our services.", "danger") elif current_user.has_roles('Partner Admin'): flash( "Please ensure renewal of our DPA or T&Cs for the current Tenant. Failing to do so in time will stop the tenant from accessing our services.", "danger") else: flash( "Please inform your administrator or partner to renew your consent to our DPA or T&Cs. Failing to do so in time will stop you from accessing our services.", "danger") return None current_app.logger.debug('Unknown consent status') return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))