- Possibility to view the document version the consent is given to - Blocking functionality is no valid consent
204 lines
7.5 KiB
Python
204 lines
7.5 KiB
Python
from flask import current_app, render_template, request, redirect, session, flash
|
|
from flask_security import current_user
|
|
from itsdangerous import URLSafeTimedSerializer
|
|
|
|
from common.models.user import Role, ConsentStatus
|
|
from common.utils.nginx_utils import prefixed_url_for
|
|
from common.utils.mail_utils import send_email
|
|
|
|
|
|
def confirm_token(token, expiration=3600):
|
|
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
|
try:
|
|
email = serializer.loads(token, salt=current_app.config['SECURITY_PASSWORD_SALT'], max_age=expiration)
|
|
except Exception as e:
|
|
current_app.logger.error(f'Error confirming token: {e}')
|
|
raise
|
|
return email
|
|
|
|
|
|
def generate_reset_token(email):
|
|
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
|
return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT'])
|
|
|
|
|
|
def generate_confirmation_token(email):
|
|
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
|
return serializer.dumps(email, salt=current_app.config['SECURITY_PASSWORD_SALT'])
|
|
|
|
|
|
def send_confirmation_email(user):
|
|
token = generate_confirmation_token(user.email)
|
|
confirm_url = prefixed_url_for('security_bp.confirm_email', token=token, _external=True)
|
|
|
|
html = render_template('email/activate.html', confirm_url=confirm_url)
|
|
subject = "Please confirm your email"
|
|
|
|
try:
|
|
send_email(user.email, f"{user.first_name} {user.last_name}", "Confirm your email", html)
|
|
current_app.logger.info(f'Confirmation email sent to {user.email} with url: {confirm_url}')
|
|
except Exception as e:
|
|
current_app.logger.error(f'Failed to send confirmation email to {user.email}. Error: {str(e)}')
|
|
raise
|
|
|
|
|
|
def send_reset_email(user):
|
|
token = generate_reset_token(user.email)
|
|
reset_url = prefixed_url_for('security_bp.reset_password', token=token, _external=True)
|
|
|
|
html = render_template('email/reset_password.html', reset_url=reset_url)
|
|
subject = "Reset Your Password"
|
|
|
|
try:
|
|
send_email(user.email, f"{user.first_name} {user.last_name}", subject, html)
|
|
current_app.logger.info(f'Reset email sent to {user.email} with url: {reset_url}')
|
|
except Exception as e:
|
|
current_app.logger.error(f'Failed to send reset email to {user.email}. Error: {str(e)}')
|
|
raise
|
|
|
|
|
|
def get_current_user_roles():
|
|
"""Get the roles of the currently authenticated user.
|
|
|
|
Returns:
|
|
List of Role objects or empty list if no user is authenticated
|
|
"""
|
|
if current_user.is_authenticated:
|
|
return current_user.roles
|
|
return []
|
|
|
|
|
|
def current_user_has_role(role_name):
|
|
"""Check if the current user has the specified role.
|
|
|
|
Args:
|
|
role_name (str): Name of the role to check
|
|
|
|
Returns:
|
|
bool: True if user has the role, False otherwise
|
|
"""
|
|
if not current_user.is_authenticated:
|
|
return False
|
|
|
|
return any(role.name == role_name for role in current_user.roles)
|
|
|
|
|
|
def current_user_roles():
|
|
"""Get the roles of the currently authenticated user.
|
|
|
|
Returns:
|
|
List of Role objects or empty list if no user is authenticated
|
|
"""
|
|
if current_user.is_authenticated:
|
|
return current_user.roles
|
|
return []
|
|
|
|
|
|
def all_user_roles():
|
|
roles = [(role.id, role.name) for role in Role.query.all()]
|
|
|
|
|
|
def is_exempt_endpoint(endpoint: str) -> bool:
|
|
"""Check if the endpoint is exempt from consent guard"""
|
|
if not endpoint:
|
|
return False
|
|
cfg = current_app.config or {}
|
|
endpoints_cfg = set(cfg.get('CONSENT_GUARD_EXEMPT_ENDPOINTS', []))
|
|
prefix_cfg = list(cfg.get('CONSENT_GUARD_EXEMPT_PREFIXES', []))
|
|
|
|
default_endpoints = {
|
|
'security_bp.login',
|
|
'security_bp.logout',
|
|
'security_bp.confirm_email',
|
|
'security_bp.forgot_password',
|
|
'security_bp.reset_password',
|
|
'security_bp.reset_password_request',
|
|
'user_bp.tenant_consent',
|
|
'user_bp.no_consent',
|
|
'user_bp.tenant_consent_renewal',
|
|
'user_bp.consent_renewal',
|
|
'user_bp.view_tenant_consents',
|
|
'user_bp.accept_tenant_consent',
|
|
'user_bp.view_consent_markdown',
|
|
'basic_bp.view_content',
|
|
}
|
|
default_prefixes = [
|
|
'security_bp.',
|
|
'healthz_bp.',
|
|
]
|
|
endpoints = default_endpoints.union(endpoints_cfg)
|
|
prefixes = default_prefixes + [p for p in prefix_cfg if isinstance(p, str)]
|
|
for p in prefixes:
|
|
if endpoint.startswith(p):
|
|
return True
|
|
if endpoint in endpoints:
|
|
return True
|
|
return False
|
|
|
|
|
|
def enforce_tenant_consent_ui():
|
|
"""Check if the user has consented to the terms of service"""
|
|
path = getattr(request, 'path', '') or ''
|
|
if path.startswith('/healthz') or path.startswith('/_healthz'):
|
|
current_app.logger.debug(f'Health check request, bypassing consent guard: {path}')
|
|
return None
|
|
|
|
if not current_user.is_authenticated:
|
|
current_app.logger.debug('Not authenticated, bypassing consent guard')
|
|
return None
|
|
|
|
endpoint = request.endpoint or ''
|
|
if is_exempt_endpoint(endpoint) or request.method == 'OPTIONS':
|
|
current_app.logger.debug(f'Endpoint exempt from consent guard: {endpoint}')
|
|
return None
|
|
|
|
# Global bypass: Super User and Partner Admin always allowed
|
|
if current_user.has_roles('Super User') or current_user.has_roles('Partner Admin'):
|
|
current_app.logger.debug('Global bypass: Super User or Partner Admin')
|
|
return None
|
|
|
|
tenant_id = getattr(current_user, 'tenant_id', None)
|
|
if not tenant_id:
|
|
tenant_id = session.get('tenant', {}).get('id') if session.get('tenant') else None
|
|
if not tenant_id:
|
|
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
|
|
|
|
raw_status = session.get('consent_status', ConsentStatus.NOT_CONSENTED)
|
|
# Coerce string to ConsentStatus enum if needed
|
|
status = raw_status
|
|
try:
|
|
if isinstance(raw_status, str):
|
|
# Accept formats like 'CONSENTED' or 'ConsentStatus.CONSENTED'
|
|
name = raw_status.split('.')[-1]
|
|
from common.models.user import ConsentStatus as CS
|
|
status = getattr(CS, name, CS.NOT_CONSENTED)
|
|
except Exception:
|
|
status = ConsentStatus.NOT_CONSENTED
|
|
|
|
if status == ConsentStatus.CONSENTED:
|
|
current_app.logger.debug('User has consented')
|
|
return None
|
|
|
|
if status == ConsentStatus.NOT_CONSENTED:
|
|
current_app.logger.debug('User has not consented')
|
|
if current_user.has_roles('Tenant Admin'):
|
|
return redirect(prefixed_url_for('user_bp.tenant_consent', for_redirect=True))
|
|
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
|
|
if status == ConsentStatus.RENEWAL_REQUIRED:
|
|
current_app.logger.debug('Consent renewal required')
|
|
if current_user.has_roles('Tenant Admin'):
|
|
flash(
|
|
"You need to renew your consent to our DPA or T&Cs. Failing to do so in time will stop you from accessing our services.",
|
|
"danger")
|
|
elif current_user.has_roles('Partner Admin'):
|
|
flash(
|
|
"Please ensure renewal of our DPA or T&Cs for the current Tenant. Failing to do so in time will stop the tenant from accessing our services.",
|
|
"danger")
|
|
else:
|
|
flash(
|
|
"Please inform your administrator or partner to renew your consent to our DPA or T&Cs. Failing to do so in time will stop you from accessing our services.",
|
|
"danger")
|
|
return None
|
|
current_app.logger.debug('Unknown consent status')
|
|
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
|