Optimizing admin interface for user domain, completing security views

This commit is contained in:
Josako
2024-06-03 09:37:59 +02:00
parent e5a36798bf
commit fcc0caeb09
24 changed files with 523 additions and 174 deletions

View File

@@ -1,13 +1,20 @@
# views/security_views.py
from flask import Blueprint, render_template, redirect, request, flash, current_app
from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session
from flask_security import current_user, login_required, login_user, logout_user
from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value
from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password
from flask_security.forms import LoginForm
from urllib.parse import urlparse
import datetime as dt
from datetime import datetime as dt, timezone as tz
from itsdangerous import URLSafeTimedSerializer
from sqlalchemy.exc import SQLAlchemyError
from common.models.user import User
from common.utils.nginx_utils import prefixed_url_for
from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, RequestResetForm
from common.extensions import db
from common.utils.security_utils import confirm_token, send_confirmation_email, send_reset_email
from common.utils.security import set_tenant_session_data
security_bp = Blueprint('security_bp', __name__)
@@ -15,11 +22,19 @@ security_bp = Blueprint('security_bp', __name__)
@security_bp.before_request
def log_before_request():
current_app.logger.debug(f"Before request (security_bp): {request.method} {request.url}")
if current_user and current_user.is_authenticated:
current_app.logger.debug(f"After request (security_bp): Current User: {current_user.email}")
else:
current_app.logger.debug(f"After request (security_bp): No user logged in")
@security_bp.after_request
def log_after_request(response):
current_app.logger.debug(f"After request (security_bp): {request.method} {request.url} - Status: {response.status}")
if current_user and current_user.is_authenticated:
current_app.logger.debug(f"After request (security_bp): Current User: {current_user.email}")
else:
current_app.logger.debug(f"After request (security_bp): No user logged in")
return response
@@ -34,11 +49,19 @@ def login():
current_app.logger.debug(f'Validating login form: {form.email.data}')
user = User.query.filter_by(email=form.email.data).first()
if user is None or not verify_and_update_password(form.password.data, user):
flash('Invalid username or password')
flash('Invalid username or password', 'danger')
return redirect(prefixed_url_for('security_bp.login'))
login_user(user, remember=form.remember.data)
return redirect(prefixed_url_for('user_bp.tenant_overview'))
if login_user(user):
current_app.logger.info(f'Login successful! Current User is {current_user.email}')
db.session.commit()
return redirect(prefixed_url_for('user_bp.tenant_overview'))
else:
flash('Invalid username or password', 'danger')
current_app.logger.debug(f'Failed to login user {user.email}')
abort(401)
else:
current_app.logger.debug(f'Invalid login form: {form.errors}')
return render_template('security/login_user.html', login_user_form=form)
@@ -50,3 +73,76 @@ def logout():
logout_user()
current_app.logger.debug('After Logout')
return redirect(prefixed_url_for('basic_bp.index'))
@security_bp.route('/confirm_email/<token>', methods=['GET', 'POST'])
def confirm_email(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The confirmation link is invalid or has expired.', 'danger')
current_app.logger.debug(f'Invalid confirmation link detected: {token} - error: {e}')
return redirect(prefixed_url_for('basic_bp.confirm_email_fail'))
user = User.query.filter_by(email=email).first_or_404()
current_app.logger.debug(f'Trying to confirm email for user {user.email}')
if user.active:
flash('Account already confirmed. Please login.', 'success')
current_app.logger.debug(f'Account for user {user.email} was already activated')
return redirect(prefixed_url_for('security_bp.login'))
else:
current_app.logger.debug(f'Trying to confirm email for user {user.email}')
user.active = True
user.updated_at = dt.now(tz.utc)
user.confirmed_at = dt.now(tz.utc)
try:
db.session.add(user)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.debug(f'Failed to confirm email for user {user.email}: {e}')
return redirect(prefixed_url_for('basic_bp.confirm_email_fail'))
current_app.logger.debug(f'Account for user {user.email} was confirmed.')
send_reset_email(user)
return redirect(prefixed_url_for('basic_bp.confirm_email_ok'))
@security_bp.route('/reset_password_request', methods=['GET', 'POST'])
def reset_password_request():
form = RequestResetForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
send_reset_email(user)
flash('An email with instructions to reset your password has been sent.', 'info')
return redirect(prefixed_url_for('security_bp.login'))
return render_template('security/reset_password_request.html', form=form)
@security_bp.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The reset link is invalid or has expired.', 'danger')
current_app.logger.debug(f'Invalid reset link detected: {token} - error: {e}')
return redirect(prefixed_url_for('security_bp.reset_password_request'))
user = User.query.filter_by(email=email).first_or_404()
form = ResetPasswordForm()
if form.validate_on_submit():
user.password = hash_password(form.password.data)
user.updated_at = dt.now(tz.utc)
db.session.commit()
flash('Your password has been updated.', 'success')
return redirect(prefixed_url_for('security_bp.login'))
return render_template('security/reset_password.html', reset_password_form=form)