260 lines
11 KiB
Python
260 lines
11 KiB
Python
# views/security_views.py
|
|
from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session, jsonify
|
|
from flask_security import current_user, login_required, login_user, logout_user
|
|
from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password
|
|
from flask_security.forms import LoginForm
|
|
from flask_wtf.csrf import CSRFError, generate_csrf
|
|
from urllib.parse import urlparse
|
|
from datetime import datetime as dt, timezone as tz
|
|
|
|
from itsdangerous import URLSafeTimedSerializer
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from common.models.user import User, ConsentStatus
|
|
from common.services.user import TenantServices, UserServices
|
|
from common.utils.eveai_exceptions import EveAIException, EveAINoActiveLicense, EveAIUserExpired
|
|
from common.utils.nginx_utils import prefixed_url_for
|
|
from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, ForgotPasswordForm
|
|
from common.extensions import db
|
|
from common.utils.security_utils import confirm_token, send_confirmation_email, send_reset_email
|
|
from common.utils.security import set_tenant_session_data, is_valid_tenant
|
|
|
|
security_bp = Blueprint('security_bp', __name__)
|
|
|
|
|
|
@security_bp.before_request
|
|
def log_before_request():
|
|
pass
|
|
|
|
|
|
@security_bp.after_request
|
|
def log_after_request(response):
|
|
return response
|
|
|
|
|
|
@security_bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(prefixed_url_for('basic_bp.index', for_redirect=True))
|
|
|
|
form = LoginForm()
|
|
|
|
if request.method == 'POST':
|
|
try:
|
|
if form.validate_on_submit():
|
|
try:
|
|
user = User.query.filter_by(email=form.email.data).first()
|
|
if user is None or not verify_and_update_password(form.password.data, user):
|
|
raise EveAIException('Invalid email or password')
|
|
# Check if the user's account is still valid based on valid_to
|
|
today = dt.now(tz=tz.utc).date()
|
|
if user.valid_to is not None and today > user.valid_to:
|
|
current_app.logger.warning(
|
|
f"Login blocked for expired user {user.id} ({user.email}); "
|
|
f"today={today}, valid_to={user.valid_to}"
|
|
)
|
|
raise EveAIUserExpired()
|
|
is_valid_tenant(user.tenant_id)
|
|
except EveAIException as e:
|
|
flash(f'Failed to login user: {str(e)}', 'danger')
|
|
current_app.logger.error(f'Failed to login user: {str(e)}')
|
|
abort(401)
|
|
|
|
if login_user(user):
|
|
current_app.logger.info(f'Login successful! Current User is {current_user.email}')
|
|
db.session.commit()
|
|
if current_user.has_roles('Super User'):
|
|
return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True))
|
|
if current_user.has_roles('Partner Admin'):
|
|
return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True))
|
|
# After login, rely on global consent guard; just go to default start
|
|
return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True))
|
|
else:
|
|
flash('Invalid username or password', 'danger')
|
|
current_app.logger.error(f'Invalid username or password for given email: {user.email}')
|
|
abort(401)
|
|
else:
|
|
current_app.logger.error(f'Invalid login form: {form.errors}')
|
|
|
|
except CSRFError:
|
|
current_app.logger.warning('CSRF token mismatch during login attempt')
|
|
flash('Your session has expired. Please try logging in again.', 'danger')
|
|
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
|
|
|
|
if request.method == 'GET':
|
|
csrf_token = generate_csrf()
|
|
|
|
return render_template('security/login_user.html', login_user_form=form)
|
|
|
|
|
|
@security_bp.route('/logout', methods=['GET', 'POST'])
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(prefixed_url_for('basic_bp.index', for_redirect=True))
|
|
|
|
|
|
@security_bp.route('/confirm_email/<token>', methods=['GET', 'POST'])
|
|
def confirm_email(token):
|
|
try:
|
|
email = confirm_token(token)
|
|
except Exception as e:
|
|
flash('The confirmation link is invalid or has expired.', 'danger')
|
|
return redirect(prefixed_url_for('basic_bp.confirm_email_fail', for_redirect=True))
|
|
|
|
user = User.query.filter_by(email=email).first_or_404()
|
|
if user.active:
|
|
flash('Account already confirmed. Please login.', 'success')
|
|
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
|
|
else:
|
|
user.active = True
|
|
user.updated_at = dt.now(tz.utc)
|
|
user.confirmed_at = dt.now(tz.utc)
|
|
|
|
try:
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
except SQLAlchemyError as e:
|
|
db.session.rollback()
|
|
return redirect(prefixed_url_for('basic_bp.confirm_email_fail', for_redirect=True))
|
|
|
|
send_reset_email(user)
|
|
return redirect(prefixed_url_for('basic_bp.confirm_email_ok', for_redirect=True))
|
|
|
|
|
|
@security_bp.route('/forgot_password', methods=['GET', 'POST'])
|
|
def forgot_password():
|
|
form = ForgotPasswordForm()
|
|
if form.validate_on_submit():
|
|
user = User.query.filter_by(email=form.email.data).first()
|
|
if user:
|
|
send_reset_email(user)
|
|
flash('An email with instructions to reset your password has been sent.', 'info')
|
|
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
|
|
return render_template('security/forgot_password.html', form=form)
|
|
|
|
|
|
@security_bp.route('/reset_password/<token>', methods=['GET', 'POST'])
|
|
def reset_password(token):
|
|
try:
|
|
email = confirm_token(token)
|
|
except Exception as e:
|
|
flash('The reset link is invalid or has expired.', 'danger')
|
|
current_app.logger.error(f'Invalid reset link detected: {token} - error: {e}')
|
|
return redirect(prefixed_url_for('security_bp.reset_password_request', for_redirect=True))
|
|
|
|
user = User.query.filter_by(email=email).first_or_404()
|
|
form = ResetPasswordForm()
|
|
if form.validate_on_submit():
|
|
user.password = hash_password(form.password.data)
|
|
user.updated_at = dt.now(tz.utc)
|
|
db.session.commit()
|
|
flash('Your password has been updated.', 'success')
|
|
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
|
|
return render_template('security/reset_password.html', reset_password_form=form)
|
|
|
|
|
|
@security_bp.route('/consent/sign', methods=['POST'])
|
|
@login_required
|
|
def consent_sign():
|
|
try:
|
|
# Determine tenant context
|
|
tenant_id = None
|
|
# Payload may provide a tenant_id for admins signing for others
|
|
if request.is_json:
|
|
payload = request.get_json(silent=True) or {}
|
|
tenant_id = payload.get('tenant_id')
|
|
consent_data = payload.get('consent_data', {})
|
|
else:
|
|
tenant_id = request.form.get('tenant_id')
|
|
consent_data = {}
|
|
if tenant_id is None:
|
|
# default to user's tenant (Tenant Admin)
|
|
tenant_id = current_user.tenant_id
|
|
tenant_id = int(tenant_id)
|
|
|
|
# Authorization
|
|
allowed = False
|
|
if current_user.has_roles('Super User'):
|
|
allowed = True
|
|
elif current_user.has_roles('Partner Admin') and UserServices.can_user_edit_tenant(tenant_id):
|
|
allowed = True
|
|
elif current_user.has_roles('Tenant Admin') and getattr(current_user, 'tenant_id', None) == tenant_id:
|
|
allowed = True
|
|
if not allowed:
|
|
abort(403)
|
|
|
|
# Determine consent versions/types to record
|
|
cts = current_app.config.get('CONSENT_TYPES', [])
|
|
from common.models.user import TenantConsent, ConsentVersion, PartnerService
|
|
from common.services.user.partner_services import PartnerServices
|
|
|
|
# Resolve partner and management service if available in session (for Partner Admin)
|
|
partner_id = None
|
|
partner_service_id = None
|
|
try:
|
|
if 'partner' in session and session['partner'].get('services'):
|
|
partner_id = session['partner'].get('id')
|
|
mgmt = PartnerServices.get_management_service()
|
|
if mgmt:
|
|
partner_service_id = mgmt.get('id')
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallbacks if not Partner Admin context
|
|
if partner_id is None:
|
|
# Try find partner by tenant (one-to-one in model)
|
|
from common.models.user import Partner, Tenant
|
|
t = Tenant.query.get(tenant_id)
|
|
if t and t.partner:
|
|
partner_id = t.partner.id
|
|
if partner_service_id is None and partner_id is not None:
|
|
ps = PartnerService.query.filter_by(partner_id=partner_id, type='MANAGEMENT_SERVICE').first()
|
|
if ps:
|
|
partner_service_id = ps.id
|
|
|
|
# For each consent type, record acceptance of latest version
|
|
now = dt.now(tz.utc)
|
|
for ct in cts:
|
|
cv = ConsentVersion.query.filter_by(consent_type=ct).order_by(ConsentVersion.consent_valid_from.desc()).first()
|
|
if not cv:
|
|
current_app.logger.error(f'No ConsentVersion found for type {ct}; skipping')
|
|
continue
|
|
tc = TenantConsent(
|
|
tenant_id=tenant_id,
|
|
partner_id=partner_id or 0,
|
|
partner_service_id=partner_service_id or 0,
|
|
user_id=current_user.id,
|
|
consent_type=ct,
|
|
consent_version=cv.consent_version,
|
|
consent_data=consent_data or {}
|
|
)
|
|
db.session.add(tc)
|
|
db.session.commit()
|
|
|
|
if request.is_json or 'application/json' in request.headers.get('Accept', ''):
|
|
return jsonify({'ok': True, 'tenant_id': tenant_id}), 200
|
|
# Default UX: go to overview
|
|
return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True))
|
|
except CSRFError:
|
|
if request.is_json:
|
|
return jsonify({'ok': False, 'error': 'csrf_error'}), 400
|
|
flash('Session expired. Please retry.', 'danger')
|
|
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
|
|
except Exception as e:
|
|
current_app.logger.error(f'Consent signing failed: {e}')
|
|
db.session.rollback()
|
|
if request.is_json:
|
|
return jsonify({'ok': False, 'error': str(e)}), 400
|
|
flash('Failed to sign consent.', 'danger')
|
|
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|