Files
eveAI/eveai_app/views/security_views.py

252 lines
10 KiB
Python

# views/security_views.py
from flask import Blueprint, render_template, redirect, request, flash, current_app, abort, session, jsonify
from flask_security import current_user, login_required, login_user, logout_user
from flask_security.utils import verify_and_update_password, get_message, do_flash, config_value, hash_password
from flask_security.forms import LoginForm
from flask_wtf.csrf import CSRFError, generate_csrf
from urllib.parse import urlparse
from datetime import datetime as dt, timezone as tz
from itsdangerous import URLSafeTimedSerializer
from sqlalchemy.exc import SQLAlchemyError
from common.models.user import User, ConsentStatus
from common.services.user import TenantServices, UserServices
from common.utils.eveai_exceptions import EveAIException, EveAINoActiveLicense
from common.utils.nginx_utils import prefixed_url_for
from eveai_app.views.security_forms import SetPasswordForm, ResetPasswordForm, ForgotPasswordForm
from common.extensions import db
from common.utils.security_utils import confirm_token, send_confirmation_email, send_reset_email
from common.utils.security import set_tenant_session_data, is_valid_tenant
security_bp = Blueprint('security_bp', __name__)
@security_bp.before_request
def log_before_request():
pass
@security_bp.after_request
def log_after_request(response):
return response
@security_bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(prefixed_url_for('basic_bp.index', for_redirect=True))
form = LoginForm()
if request.method == 'POST':
try:
if form.validate_on_submit():
try:
user = User.query.filter_by(email=form.email.data).first()
if user is None or not verify_and_update_password(form.password.data, user):
raise EveAIException('Invalid email or password')
is_valid_tenant(user.tenant_id)
except EveAIException as e:
flash(f'Failed to login user: {str(e)}', 'danger')
current_app.logger.error(f'Failed to login user: {str(e)}')
abort(401)
if login_user(user):
current_app.logger.info(f'Login successful! Current User is {current_user.email}')
db.session.commit()
if current_user.has_roles('Super User'):
return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True))
if current_user.has_roles('Partner Admin'):
return redirect(prefixed_url_for('user_bp.tenants', for_redirect=True))
# After login, rely on global consent guard; just go to default start
return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True))
else:
flash('Invalid username or password', 'danger')
current_app.logger.error(f'Invalid username or password for given email: {user.email}')
abort(401)
else:
current_app.logger.error(f'Invalid login form: {form.errors}')
except CSRFError:
current_app.logger.warning('CSRF token mismatch during login attempt')
flash('Your session has expired. Please try logging in again.', 'danger')
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
if request.method == 'GET':
csrf_token = generate_csrf()
return render_template('security/login_user.html', login_user_form=form)
@security_bp.route('/logout', methods=['GET', 'POST'])
@login_required
def logout():
logout_user()
return redirect(prefixed_url_for('basic_bp.index', for_redirect=True))
@security_bp.route('/confirm_email/<token>', methods=['GET', 'POST'])
def confirm_email(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The confirmation link is invalid or has expired.', 'danger')
return redirect(prefixed_url_for('basic_bp.confirm_email_fail', for_redirect=True))
user = User.query.filter_by(email=email).first_or_404()
if user.active:
flash('Account already confirmed. Please login.', 'success')
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
else:
user.active = True
user.updated_at = dt.now(tz.utc)
user.confirmed_at = dt.now(tz.utc)
try:
db.session.add(user)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
return redirect(prefixed_url_for('basic_bp.confirm_email_fail', for_redirect=True))
send_reset_email(user)
return redirect(prefixed_url_for('basic_bp.confirm_email_ok', for_redirect=True))
@security_bp.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
form = ForgotPasswordForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
send_reset_email(user)
flash('An email with instructions to reset your password has been sent.', 'info')
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
return render_template('security/forgot_password.html', form=form)
@security_bp.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
try:
email = confirm_token(token)
except Exception as e:
flash('The reset link is invalid or has expired.', 'danger')
current_app.logger.error(f'Invalid reset link detected: {token} - error: {e}')
return redirect(prefixed_url_for('security_bp.reset_password_request', for_redirect=True))
user = User.query.filter_by(email=email).first_or_404()
form = ResetPasswordForm()
if form.validate_on_submit():
user.password = hash_password(form.password.data)
user.updated_at = dt.now(tz.utc)
db.session.commit()
flash('Your password has been updated.', 'success')
return redirect(prefixed_url_for('security_bp.login', for_redirect=True))
return render_template('security/reset_password.html', reset_password_form=form)
@security_bp.route('/consent/sign', methods=['POST'])
@login_required
def consent_sign():
try:
# Determine tenant context
tenant_id = None
# Payload may provide a tenant_id for admins signing for others
if request.is_json:
payload = request.get_json(silent=True) or {}
tenant_id = payload.get('tenant_id')
consent_data = payload.get('consent_data', {})
else:
tenant_id = request.form.get('tenant_id')
consent_data = {}
if tenant_id is None:
# default to user's tenant (Tenant Admin)
tenant_id = current_user.tenant_id
tenant_id = int(tenant_id)
# Authorization
allowed = False
if current_user.has_roles('Super User'):
allowed = True
elif current_user.has_roles('Partner Admin') and UserServices.can_user_edit_tenant(tenant_id):
allowed = True
elif current_user.has_roles('Tenant Admin') and getattr(current_user, 'tenant_id', None) == tenant_id:
allowed = True
if not allowed:
abort(403)
# Determine consent versions/types to record
cts = current_app.config.get('CONSENT_TYPES', [])
from common.models.user import TenantConsent, ConsentVersion, PartnerService
from common.services.user.partner_services import PartnerServices
# Resolve partner and management service if available in session (for Partner Admin)
partner_id = None
partner_service_id = None
try:
if 'partner' in session and session['partner'].get('services'):
partner_id = session['partner'].get('id')
mgmt = PartnerServices.get_management_service()
if mgmt:
partner_service_id = mgmt.get('id')
except Exception:
pass
# Fallbacks if not Partner Admin context
if partner_id is None:
# Try find partner by tenant (one-to-one in model)
from common.models.user import Partner, Tenant
t = Tenant.query.get(tenant_id)
if t and t.partner:
partner_id = t.partner.id
if partner_service_id is None and partner_id is not None:
ps = PartnerService.query.filter_by(partner_id=partner_id, type='MANAGEMENT_SERVICE').first()
if ps:
partner_service_id = ps.id
# For each consent type, record acceptance of latest version
now = dt.now(tz.utc)
for ct in cts:
cv = ConsentVersion.query.filter_by(consent_type=ct).order_by(ConsentVersion.consent_valid_from.desc()).first()
if not cv:
current_app.logger.error(f'No ConsentVersion found for type {ct}; skipping')
continue
tc = TenantConsent(
tenant_id=tenant_id,
partner_id=partner_id or 0,
partner_service_id=partner_service_id or 0,
user_id=current_user.id,
consent_type=ct,
consent_version=cv.consent_version,
consent_data=consent_data or {}
)
db.session.add(tc)
db.session.commit()
if request.is_json or 'application/json' in request.headers.get('Accept', ''):
return jsonify({'ok': True, 'tenant_id': tenant_id}), 200
# Default UX: go to overview
return redirect(prefixed_url_for('user_bp.tenant_overview', for_redirect=True))
except CSRFError:
if request.is_json:
return jsonify({'ok': False, 'error': 'csrf_error'}), 400
flash('Session expired. Please retry.', 'danger')
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))
except Exception as e:
current_app.logger.error(f'Consent signing failed: {e}')
db.session.rollback()
if request.is_json:
return jsonify({'ok': False, 'error': str(e)}), 400
flash('Failed to sign consent.', 'danger')
return redirect(prefixed_url_for('user_bp.no_consent', for_redirect=True))