# views/security_views.py from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session from flask_security import current_user, login_required, login_user, logout_user from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password from flask_security.forms import LoginForm from flask_wtf.csrf import CSRFError, generate_csrf from urllib.parse import urlparse from datetime import datetime as dt, timezone as tz from itsdangerous import URLSafeTimedSerializer from sqlalchemy.exc import SQLAlchemyError from common.models.user import User from common.utils.eveai_exceptions import EveAIException, EveAINoActiveLicense from common.utils.nginx_utils import prefixed_url_for from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, ForgotPasswordForm from common.extensions import db from common.utils.security_utils import confirm_token, send_confirmation_email, send_reset_email from common.utils.security import set_tenant_session_data, is_valid_tenant security_bp = Blueprint('security_bp', __name__) @security_bp.before_request def log_before_request(): pass @security_bp.after_request def log_after_request(response): return response @security_bp.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(prefixed_url_for('basic_bp.index')) form = LoginForm() if request.method == 'POST': try: if form.validate_on_submit(): try: user = User.query.filter_by(email=form.email.data).first() if user is None or not verify_and_update_password(form.password.data, user): raise EveAIException('Invalid email or password') is_valid_tenant(user.tenant_id) except EveAIException as e: flash(f'Failed to login user: {str(e)}', 'danger') current_app.logger.error(f'Failed to login user: {str(e)}') abort(401) if login_user(user): current_app.logger.info(f'Login successful! Current User is {current_user.email}') db.session.commit() if current_user.has_roles('Super User'): return redirect(prefixed_url_for('user_bp.select_tenant')) else: return redirect(prefixed_url_for('user_bp.tenant_overview')) else: flash('Invalid username or password', 'danger') current_app.logger.error(f'Invalid username or password for given email: {user.email}') abort(401) else: current_app.logger.error(f'Invalid login form: {form.errors}') except CSRFError: current_app.logger.warning('CSRF token mismatch during login attempt') flash('Your session has expired. Please try logging in again.', 'danger') return redirect(prefixed_url_for('security_bp.login')) if request.method == 'GET': csrf_token = generate_csrf() return render_template('security/login_user.html', login_user_form=form) @security_bp.route('/logout', methods=['GET', 'POST']) @login_required def logout(): logout_user() return redirect(prefixed_url_for('basic_bp.index')) @security_bp.route('/confirm_email/', methods=['GET', 'POST']) def confirm_email(token): try: email = confirm_token(token) except Exception as e: flash('The confirmation link is invalid or has expired.', 'danger') return redirect(prefixed_url_for('basic_bp.confirm_email_fail')) user = User.query.filter_by(email=email).first_or_404() if user.active: flash('Account already confirmed. Please login.', 'success') return redirect(prefixed_url_for('security_bp.login')) else: user.active = True user.updated_at = dt.now(tz.utc) user.confirmed_at = dt.now(tz.utc) try: db.session.add(user) db.session.commit() except SQLAlchemyError as e: db.session.rollback() return redirect(prefixed_url_for('basic_bp.confirm_email_fail')) send_reset_email(user) return redirect(prefixed_url_for('basic_bp.confirm_email_ok')) @security_bp.route('/forgot_password', methods=['GET', 'POST']) def forgot_password(): form = ForgotPasswordForm() if form.validate_on_submit(): user = User.query.filter_by(email=form.email.data).first() if user: send_reset_email(user) flash('An email with instructions to reset your password has been sent.', 'info') return redirect(prefixed_url_for('security_bp.login')) return render_template('security/forgot_password.html', form=form) @security_bp.route('/reset_password/', methods=['GET', 'POST']) def reset_password(token): try: email = confirm_token(token) except Exception as e: flash('The reset link is invalid or has expired.', 'danger') current_app.logger.error(f'Invalid reset link detected: {token} - error: {e}') return redirect(prefixed_url_for('security_bp.reset_password_request')) user = User.query.filter_by(email=email).first_or_404() form = ResetPasswordForm() if form.validate_on_submit(): user.password = hash_password(form.password.data) user.updated_at = dt.now(tz.utc) db.session.commit() flash('Your password has been updated.', 'success') return redirect(prefixed_url_for('security_bp.login')) return render_template('security/reset_password.html', reset_password_form=form)