# views/security_views.py from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session from flask_security import current_user, login_required, login_user, logout_user from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password from flask_security.forms import LoginForm from flask_wtf.csrf import CSRFError, generate_csrf from urllib.parse import urlparse from datetime import datetime as dt, timezone as tz from itsdangerous import URLSafeTimedSerializer from sqlalchemy.exc import SQLAlchemyError from common.models.user import User, ConsentStatus from common.services.user import TenantServices from common.utils.eveai_exceptions import EveAIException, EveAINoActiveLicense from common.utils.nginx_utils import prefixed_url_for from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, ForgotPasswordForm from common.extensions import db from common.utils.security_utils import confirm_token, send_confirmation_email, send_reset_email from common.utils.security import set_tenant_session_data, is_valid_tenant security_bp = Blueprint('security_bp', __name__) @security_bp.before_request def log_before_request(): pass @security_bp.after_request def log_after_request(response): return response @security_bp.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(prefixed_url_for('basic_bp.index', for_redirect=True)) form = LoginForm() if request.method == 'POST': try: if form.validate_on_submit(): try: user = User.query.filter_by(email=form.email.data).first() if user is None or not verify_and_update_password(form.password.data, user): raise EveAIException('Invalid email or password') is_valid_tenant(user.tenant_id) except EveAIException as e: flash(f'Failed to login user: {str(e)}', 'danger') current_app.logger.error(f'Failed to login user: {str(e)}') abort(401) if login_user(user): current_app.logger.info(f'Login successful! Current User is {current_user.email}') db.session.commit() if current_user.has_roles('Super User'): return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True)) if current_user.has_roles('Partner Admin'): return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True)) consent_status = TenantServices.get_consent_status(user.tenant_id) match consent_status: case ConsentStatus.CONSENTED: return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True)) case ConsentStatus.NOT_CONSENTED: if current_user.has_roles('Tenant Admin'): return redirect(prefixed_url_for('user_bp.tenant_consent', for_redirect=True)) else: return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True)) case ConsentStatus.RENEWAL_REQUIRED: if current_user.has_roles('Tenant Admin'): return redirect(prefixed_url_for('user_bp.tenant_consent_renewal', for_redirect=True)) else: return redirect(prefixed_url_for('user_bp.consent_renewal', for_redirect=True)) case _: return redirect(prefixed_url_for('basic_bp.index', for_redirect=True)) else: flash('Invalid username or password', 'danger') current_app.logger.error(f'Invalid username or password for given email: {user.email}') abort(401) else: current_app.logger.error(f'Invalid login form: {form.errors}') except CSRFError: current_app.logger.warning('CSRF token mismatch during login attempt') flash('Your session has expired. Please try logging in again.', 'danger') return redirect(prefixed_url_for('security_bp.login', for_redirect=True)) if request.method == 'GET': csrf_token = generate_csrf() return render_template('security/login_user.html', login_user_form=form) @security_bp.route('/logout', methods=['GET', 'POST']) @login_required def logout(): logout_user() return redirect(prefixed_url_for('basic_bp.index', for_redirect=True)) @security_bp.route('/confirm_email/', methods=['GET', 'POST']) def confirm_email(token): try: email = confirm_token(token) except Exception as e: flash('The confirmation link is invalid or has expired.', 'danger') return redirect(prefixed_url_for('basic_bp.confirm_email_fail', for_redirect=True)) user = User.query.filter_by(email=email).first_or_404() if user.active: flash('Account already confirmed. Please login.', 'success') return redirect(prefixed_url_for('security_bp.login', for_redirect=True)) else: user.active = True user.updated_at = dt.now(tz.utc) user.confirmed_at = dt.now(tz.utc) try: db.session.add(user) db.session.commit() except SQLAlchemyError as e: db.session.rollback() return redirect(prefixed_url_for('basic_bp.confirm_email_fail', for_redirect=True)) send_reset_email(user) return redirect(prefixed_url_for('basic_bp.confirm_email_ok', for_redirect=True)) @security_bp.route('/forgot_password', methods=['GET', 'POST']) def forgot_password(): form = ForgotPasswordForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user: send_reset_email(user) flash('An email with instructions to reset your password has been sent.', 'info') return redirect(prefixed_url_for('security_bp.login', for_redirect=True)) return render_template('security/forgot_password.html', form=form) @security_bp.route('/reset_password/', methods=['GET', 'POST']) def reset_password(token): try: email = confirm_token(token) except Exception as e: flash('The reset link is invalid or has expired.', 'danger') current_app.logger.error(f'Invalid reset link detected: {token} - error: {e}') return redirect(prefixed_url_for('security_bp.reset_password_request', for_redirect=True)) user = User.query.filter_by(email=email).first_or_404() form = ResetPasswordForm() if form.validate_on_submit(): user.password = hash_password(form.password.data) user.updated_at = dt.now(tz.utc) db.session.commit() flash('Your password has been updated.', 'success') return redirect(prefixed_url_for('security_bp.login', for_redirect=True)) return render_template('security/reset_password.html', reset_password_form=form)