Files
eveAI/eveai_app/views/security_views.py

152 lines
6.1 KiB
Python

# views/security_views.py
from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session
from flask_security import current_user, login_required, login_user, logout_user
from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password
from flask_security.forms import LoginForm
from urllib.parse import urlparse
from datetime import datetime as dt, timezone as tz
from itsdangerous import URLSafeTimedSerializer
from sqlalchemy.exc import SQLAlchemyError
from common.models.user import User
from common.utils.nginx_utils import prefixed_url_for
from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, RequestResetForm
from common.extensions import db
from common.utils.security_utils import confirm_token, send_confirmation_email, send_reset_email
from common.utils.security import set_tenant_session_data
security_bp = Blueprint('security_bp', __name__)
@security_bp.before_request
def log_before_request():
current_app.logger.debug(f"Before request (security_bp): {request.method} {request.url}")
if current_user and current_user.is_authenticated:
current_app.logger.debug(f"After request (security_bp): Current User: {current_user.email}")
else:
current_app.logger.debug(f"After request (security_bp): No user logged in")
@security_bp.after_request
def log_after_request(response):
current_app.logger.debug(f"After request (security_bp): {request.method} {request.url} - Status: {response.status}")
if current_user and current_user.is_authenticated:
current_app.logger.debug(f"After request (security_bp): Current User: {current_user.email}")
else:
current_app.logger.debug(f"After request (security_bp): No user logged in")
return response
@security_bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(prefixed_url_for('basic_bp.index'))
form = LoginForm()
if form.validate_on_submit():
current_app.logger.debug(f'Validating login form: {form.email.data}')
user = User.query.filter_by(email=form.email.data).first()
if user is None or not verify_and_update_password(form.password.data, user):
flash('Invalid username or password', 'danger')
return redirect(prefixed_url_for('security_bp.login'))
if login_user(user):
current_app.logger.info(f'Login successful! Current User is {current_user.email}')
db.session.commit()
if current_user.has_roles('Super User'):
return redirect(prefixed_url_for('user_bp.select_tenant'))
else:
return redirect(prefixed_url_for('user_bp.tenant_overview'))
else:
flash('Invalid username or password', 'danger')
current_app.logger.debug(f'Failed to login user {user.email}')
abort(401)
else:
current_app.logger.debug(f'Invalid login form: {form.errors}')
return render_template('security/login_user.html', login_user_form=form)
@security_bp.route('/logout', methods=['GET', 'POST'])
@login_required
def logout():
current_app.logger.debug('Logging out')
logout_user()
current_app.logger.debug('After Logout')
return redirect(prefixed_url_for('basic_bp.index'))
@security_bp.route('/confirm_email/<token>', methods=['GET', 'POST'])
def confirm_email(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The confirmation link is invalid or has expired.', 'danger')
current_app.logger.debug(f'Invalid confirmation link detected: {token} - error: {e}')
return redirect(prefixed_url_for('basic_bp.confirm_email_fail'))
user = User.query.filter_by(email=email).first_or_404()
current_app.logger.debug(f'Trying to confirm email for user {user.email}')
if user.active:
flash('Account already confirmed. Please login.', 'success')
current_app.logger.debug(f'Account for user {user.email} was already activated')
return redirect(prefixed_url_for('security_bp.login'))
else:
current_app.logger.debug(f'Trying to confirm email for user {user.email}')
user.active = True
user.updated_at = dt.now(tz.utc)
user.confirmed_at = dt.now(tz.utc)
try:
db.session.add(user)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.debug(f'Failed to confirm email for user {user.email}: {e}')
return redirect(prefixed_url_for('basic_bp.confirm_email_fail'))
current_app.logger.debug(f'Account for user {user.email} was confirmed.')
send_reset_email(user)
return redirect(prefixed_url_for('basic_bp.confirm_email_ok'))
@security_bp.route('/reset_password_request', methods=['GET', 'POST'])
def reset_password_request():
form = RequestResetForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
send_reset_email(user)
flash('An email with instructions to reset your password has been sent.', 'info')
return redirect(prefixed_url_for('security_bp.login'))
return render_template('security/reset_password_request.html', form=form)
@security_bp.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The reset link is invalid or has expired.', 'danger')
current_app.logger.debug(f'Invalid reset link detected: {token} - error: {e}')
return redirect(prefixed_url_for('security_bp.reset_password_request'))
user = User.query.filter_by(email=email).first_or_404()
form = ResetPasswordForm()
if form.validate_on_submit():
user.password = hash_password(form.password.data)
user.updated_at = dt.now(tz.utc)
db.session.commit()
flash('Your password has been updated.', 'success')
return redirect(prefixed_url_for('security_bp.login'))
return render_template('security/reset_password.html', reset_password_form=form)