Files
eveAI/eveai_app/views/security_views.py
Josako 1807435339 - Introduction of dynamic Retrievers & Specialists
- Introduction of dynamic Processors
- Introduction of caching system
- Introduction of a better template manager
- Adaptation of ModelVariables to support dynamic Processors / Retrievers / Specialists
- Start adaptation of chat client
2024-11-15 10:00:53 +01:00

148 lines
5.4 KiB
Python

# views/security_views.py
from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session
from flask_security import current_user, login_required, login_user, logout_user
from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password
from flask_security.forms import LoginForm
from flask_wtf.csrf import CSRFError, generate_csrf
from urllib.parse import urlparse
from datetime import datetime as dt, timezone as tz
from itsdangerous import URLSafeTimedSerializer
from sqlalchemy.exc import SQLAlchemyError
from common.models.user import User
from common.utils.nginx_utils import prefixed_url_for
from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, RequestResetForm
from common.extensions import db
from common.utils.security_utils import confirm_token, send_confirmation_email, send_reset_email
from common.utils.security import set_tenant_session_data
security_bp = Blueprint('security_bp', __name__)
@security_bp.before_request
def log_before_request():
pass
@security_bp.after_request
def log_after_request(response):
return response
@security_bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(prefixed_url_for('basic_bp.index'))
form = LoginForm()
if request.method == 'POST':
try:
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user is None or not verify_and_update_password(form.password.data, user):
flash('Invalid username or password', 'danger')
current_app.logger.error(f'Failed to login user')
return redirect(prefixed_url_for('security_bp.login'))
if login_user(user):
current_app.logger.info(f'Login successful! Current User is {current_user.email}')
db.session.commit()
if current_user.has_roles('Super User'):
return redirect(prefixed_url_for('user_bp.select_tenant'))
else:
return redirect(prefixed_url_for('user_bp.tenant_overview'))
else:
flash('Invalid username or password', 'danger')
current_app.logger.error(f'Failed to login user {user.email}')
abort(401)
else:
current_app.logger.error(f'Invalid login form: {form.errors}')
except CSRFError:
current_app.logger.warning('CSRF token mismatch during login attempt')
flash('Your session has expired. Please try logging in again.', 'danger')
return redirect(prefixed_url_for('security_bp.login'))
if request.method == 'GET':
csrf_token = generate_csrf()
return render_template('security/login_user.html', login_user_form=form)
@security_bp.route('/logout', methods=['GET', 'POST'])
@login_required
def logout():
logout_user()
return redirect(prefixed_url_for('basic_bp.index'))
@security_bp.route('/confirm_email/<token>', methods=['GET', 'POST'])
def confirm_email(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The confirmation link is invalid or has expired.', 'danger')
return redirect(prefixed_url_for('basic_bp.confirm_email_fail'))
user = User.query.filter_by(email=email).first_or_404()
if user.active:
flash('Account already confirmed. Please login.', 'success')
return redirect(prefixed_url_for('security_bp.login'))
else:
user.active = True
user.updated_at = dt.now(tz.utc)
user.confirmed_at = dt.now(tz.utc)
try:
db.session.add(user)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
return redirect(prefixed_url_for('basic_bp.confirm_email_fail'))
send_reset_email(user)
return redirect(prefixed_url_for('basic_bp.confirm_email_ok'))
@security_bp.route('/reset_password_request', methods=['GET', 'POST'])
def reset_password_request():
form = RequestResetForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
send_reset_email(user)
flash('An email with instructions to reset your password has been sent.', 'info')
return redirect(prefixed_url_for('security_bp.login'))
return render_template('security/reset_password_request.html', form=form)
@security_bp.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The reset link is invalid or has expired.', 'danger')
current_app.logger.error(f'Invalid reset link detected: {token} - error: {e}')
return redirect(prefixed_url_for('security_bp.reset_password_request'))
user = User.query.filter_by(email=email).first_or_404()
form = ResetPasswordForm()
if form.validate_on_submit():
user.password = hash_password(form.password.data)
user.updated_at = dt.now(tz.utc)
db.session.commit()
flash('Your password has been updated.', 'success')
return redirect(prefixed_url_for('security_bp.login'))
return render_template('security/reset_password.html', reset_password_form=form)