From dc235b5d2c22be6288f4adb7e5a31269aba3cb20 Mon Sep 17 00:00:00 2001 From: Josako Date: Thu, 25 Apr 2024 16:13:09 +0200 Subject: [PATCH] implement security --- eveai_app/utils/security.py | 50 +++++++++++++++++++++++++++++++++++ eveai_app/views/auth_views.py | 50 ++++++++++++++++++++++++----------- eveai_app/views/user_views.py | 7 ++++- 3 files changed, 90 insertions(+), 17 deletions(-) create mode 100644 eveai_app/utils/security.py diff --git a/eveai_app/utils/security.py b/eveai_app/utils/security.py new file mode 100644 index 0000000..0f7df8c --- /dev/null +++ b/eveai_app/utils/security.py @@ -0,0 +1,50 @@ +from functools import wraps +from flask_jwt_extended import get_jwt, verify_jwt_in_request + + +def super_required(): + def wrapper(fn): + @wraps(fn) + def decorator(*args, **kwargs): + verify_jwt_in_request() + claims = get_jwt() + if not claims['is_super']: + return {'message': 'Authentication Error: Super users only!'}, 403 + else: + return fn(*args, **kwargs) + + return decorator + return wrapper + + +# Decorators + + +def admin_required(): + def wrapper(fn): + @wraps(fn) + def decorator(*args, **kwargs): + verify_jwt_in_request() + claims = get_jwt() + if not claims['is_admin']: + return {'message': 'Authentication Error: Admins only!'}, 403 + else: + return fn(*args, **kwargs) + + return decorator + return wrapper + + +def tester_required(): + def wrapper(fn): + @wraps(fn) + def decorator(*args, **kwargs): + verify_jwt_in_request() + claims = get_jwt() + if not claims['is_tester']: + return {'message': 'Authentication Error: Testers only!'}, 403 + else: + return fn(*args, **kwargs) + + return decorator + return wrapper diff --git a/eveai_app/views/auth_views.py b/eveai_app/views/auth_views.py index 9c04bb8..1289b5e 100644 --- a/eveai_app/views/auth_views.py +++ b/eveai_app/views/auth_views.py @@ -1,5 +1,5 @@ from datetime import datetime as dt, timezone as tz -from flask import request, redirect, url_for, flash, render_template, Blueprint, jsonify +from flask import request, redirect, url_for, flash, render_template, Blueprint, jsonify, session from ..models.user import User, Tenant from ..extensions import db, bcrypt from .auth_forms import LoginForm @@ -14,27 +14,45 @@ def login(): if request.method == 'POST': email = request.form.get('email') password = request.form.get('password') - remember_me = True if request.form.get('remember_me') else False + # remember_me = True if request.form.get('remember_me') else False user = User.query.filter_by(email=email).first() + tenant = Tenant.query.filter_by(id=user.tenant_id).first() if user: - if bcrypt.check_password_hash(user.password, password): - response = jsonify({'msg': 'Login Successful'}) - flash('Logged in successfully!', category='success') - access_token = create_access_token( - identity=user.id, - additional_claims={'tenant': user.tenant_id}) - refresh_token = create_refresh_token( - identity=user.id, - additional_claims={'tenant': user.tenant_id}) - set_access_cookies(response, access_token) - set_refresh_cookies(response, refresh_token) + if user.is_active: + if bcrypt.check_password_hash(user.password, password): + response = jsonify({'msg': 'Login Successful'}) + flash('Logged in successfully!', category='success') - return redirect(url_for('user_bp.user')) + # set session information + # session['user_id'] = user.id + # session['user_name'] = user.user_name + # session['email'] = user.email + # session['tenant_id'] = user.tenant_id + # session['tenant_name'] = tenant.name + + # set JWT header information + additional_claims = {'tenant': user.tenant_id, + 'is_super': user.is_super, + 'is_admin': user.is_admin, + 'is_tester': user.is_tester} + access_token = create_access_token( + identity=user.id, + additional_claims=additional_claims) + refresh_token = create_refresh_token( + identity=user.id, + additional_claims=additional_claims) + set_access_cookies(response, access_token) + set_refresh_cookies(response, refresh_token) + response.headers['Location'] = url_for('user_bp.user') + + return response, 302 + else: + flash('Incorrect email/password combination, try again.', category='error') else: - flash('Incorrect password, try again.', category='error') + flash('Account disabled. Please contact your administrator.', category='error') else: - flash('Email does not exist.', category='error') + flash('Incorrect email/password combination, try again.', category='error') form = LoginForm() return render_template('login.html', form=form) diff --git a/eveai_app/views/user_views.py b/eveai_app/views/user_views.py index 1f6c282..06622e9 100644 --- a/eveai_app/views/user_views.py +++ b/eveai_app/views/user_views.py @@ -1,15 +1,18 @@ # from . import user_bp from datetime import datetime as dt, timezone as tz -from flask import request, redirect, url_for, flash, render_template, Blueprint +from flask import request, redirect, url_for, flash, render_template, Blueprint, session +from flask_jwt_extended import verify_jwt_in_request, get_jwt, get_jwt_identity, jwt_required from ..models.user import User, Tenant from ..extensions import db, bcrypt from .user_forms import TenantForm, UserForm from ..utils.database import Database +from ..utils.security import admin_required, super_required, tester_required user_bp = Blueprint('user_bp', __name__, url_prefix='/user') @user_bp.route('/tenant', methods=['GET', 'POST']) +@super_required def tenant(): if request.method == 'POST': # Handle the required attributes @@ -62,6 +65,8 @@ def tenant(): @user_bp.route('/user', methods=['GET', 'POST']) +@admin_required +@jwt_required() def user(): if request.method == 'POST': # Handle the required attributes