Files
eveAI/eveai_app/views/user_views.py
2024-06-04 15:10:51 +02:00

461 lines
19 KiB
Python

# from . import user_bp
import uuid
from datetime import datetime as dt, timezone as tz
from flask import request, redirect, flash, render_template, Blueprint, session, current_app, jsonify
from flask_security import hash_password, roles_required, roles_accepted, current_user
from itsdangerous import URLSafeTimedSerializer
from sqlalchemy.exc import SQLAlchemyError
import ast
from common.models.user import User, Tenant, Role, TenantDomain
from common.extensions import db, kms_client, security
from common.utils.security_utils import send_confirmation_email, send_reset_email
from .user_forms import TenantForm, CreateUserForm, EditUserForm, TenantDomainForm
from common.utils.database import Database
from common.utils.view_assistants import prepare_table_for_macro, form_validation_failed
from common.utils.key_encryption import generate_api_key
from common.utils.nginx_utils import prefixed_url_for
user_bp = Blueprint('user_bp', __name__, url_prefix='/user')
@user_bp.before_request
def log_before_request():
current_app.logger.debug(f"Before request (user_bp): {request.method} {request.url}")
if current_user and current_user.is_authenticated:
current_app.logger.debug(f"After request (user_bp): Current User: {current_user.email}")
else:
current_app.logger.debug(f"After request (user_bp): No user logged in")
@user_bp.after_request
def log_after_request(response):
current_app.logger.debug(f"After request (user_bp): {request.method} {request.url} - Status: {response.status}")
if current_user and current_user.is_authenticated:
current_app.logger.debug(f"After request (user_bp): Current User: {current_user.email}")
else:
current_app.logger.debug(f"After request (user_bp): No user logged in")
return response
@user_bp.route('/tenant', methods=['GET', 'POST'])
@roles_required('Super User')
def tenant():
form = TenantForm()
if form.validate_on_submit():
current_app.logger.debug('Creating new tenant')
# Handle the required attributes
new_tenant = Tenant(name=form.name.data,
website=form.website.data,
default_language=form.default_language.data,
allowed_languages=form.allowed_languages.data,
embedding_model=form.embedding_model.data,
llm_model=form.llm_model.data,
license_start_date=form.license_start_date.data,
license_end_date=form.license_end_date.data,
allowed_monthly_interactions=form.allowed_monthly_interactions.data)
# Handle Embedding Variables
new_tenant.html_tags = form.html_tags.data.split(',') if form.html_tags.data else []
new_tenant.html_end_tags = form.html_end_tags.data.split(',') if form.html_end_tags.data else []
new_tenant.html_included_elements = form.html_included_elements.data.split(
',') if form.html_included_elements.data else []
new_tenant.html_excluded_elements = form.html_excluded_elements.data.split(
',') if form.html_excluded_elements.data else []
current_app.logger.debug(f'html_tags: {new_tenant.html_tags},'
f'html_end_tags: {new_tenant.html_end_tags},'
f'html_included_elements: {new_tenant.html_included_elements},'
f'html_excluded_elements: {new_tenant.html_excluded_elements}')
# Handle Timestamps
timestamp = dt.now(tz.utc)
new_tenant.created_at = timestamp
new_tenant.updated_at = timestamp
# Add the new tenant to the database and commit the changes
try:
db.session.add(new_tenant)
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Failed to add tenant to database. Error: {str(e)}')
flash(f'Failed to add tenant to database. Error: {str(e)}')
return render_template('user/tenant.html', form=form)
# Create schema for new tenant
current_app.logger.info(f"Successfully created tenant {new_tenant.id} in Database")
flash(f"Successfully created tenant {new_tenant.id} in Database")
current_app.logger.info(f"Creating schema for tenant {new_tenant.id}")
Database(new_tenant.id).create_tenant_schema()
return redirect(prefixed_url_for('basic_bp.index'))
else:
form_validation_failed(request, form)
return render_template('user/tenant.html', form=form)
@user_bp.route('/tenant/<int:tenant_id>', methods=['GET', 'POST'])
@roles_required('Super User')
def edit_tenant(tenant_id):
tenant = Tenant.query.get_or_404(tenant_id) # This will return a 404 if no tenant is found
form = TenantForm(obj=tenant)
if request.method == 'GET':
# Populate the form with tenant data
form.populate_obj(tenant)
if tenant.html_tags:
form.html_tags.data = ', '.join(tenant.html_tags)
if tenant.html_end_tags:
form.html_end_tags.data = ', '.join(tenant.html_end_tags)
if tenant.html_included_elements:
form.html_included_elements.data = ', '.join(tenant.html_included_elements)
if tenant.html_excluded_elements:
form.html_excluded_elements.data = ', '.join(tenant.html_excluded_elements)
if form.validate_on_submit():
# Populate the tenant with form data
form.populate_obj(tenant)
# Then handle the special fields manually
tenant.html_tags = [tag.strip() for tag in form.html_tags.data.split(',') if tag.strip()]
tenant.html_end_tags = [tag.strip() for tag in form.html_end_tags.data.split(',') if tag.strip()]
tenant.html_included_elements = [elem.strip() for elem in form.html_included_elements.data.split(',') if
elem.strip()]
tenant.html_excluded_elements = [elem.strip() for elem in form.html_excluded_elements.data.split(',') if
elem.strip()]
db.session.commit()
flash('Tenant updated successfully.', 'success')
if session.get('tenant'):
if session['tenant'].get('id') == tenant_id:
session['tenant'] = tenant.to_dict()
# return redirect(url_for(f"user/tenant/tenant_id"))
else:
form_validation_failed(request, form)
return render_template('user/edit_tenant.html', form=form, tenant_id=tenant_id)
@user_bp.route('/user', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def user():
form = CreateUserForm()
form.tenant_id.data = session.get('tenant').get('id') # It is only possible to create users for the session tenant
if form.validate_on_submit():
current_app.logger.info(f"Adding User for tenant {session['tenant']['id']} ")
if form.password.data != form.confirm_password.data:
flash('Passwords do not match.', 'danger')
return render_template('user/user.html', form=form)
# Handle the required attributes
hashed_password = hash_password(form.password.data)
new_user = User(user_name=form.user_name.data,
email=form.email.data,
password=hashed_password,
first_name=form.first_name.data,
last_name=form.last_name.data,
valid_to=form.valid_to.data,
tenant_id=form.tenant_id.data
)
timestamp = dt.now(tz.utc)
new_user.created_at = timestamp
new_user.updated_at = timestamp
# Handle the relations
tenant_id = request.form.get('tenant_id')
# the_tenant = Tenant.query.get(tenant_id)
# new_user.tenant = the_tenant
# Add roles
for role_id in form.roles.data:
the_role = Role.query.get(role_id)
new_user.roles.append(the_role)
# Add the new user to the database and commit the changes
try:
db.session.add(new_user)
db.session.commit()
security.datastore.set_uniquifier()
send_confirmation_email(new_user)
current_app.logger.debug(f'User {new_user.id} with name {new_user.user_name} added to database'
f'Confirmation email sent to {new_user.email}')
flash('User added successfully and confirmation email sent.', 'success')
return redirect(prefixed_url_for('user_bp.view_users'))
except Exception as e:
current_app.logger.error(f'Failed to add user with name {new_user.user_name}. Error: {str(e)}')
db.session.rollback()
flash(f'Failed to add user. Email or user name already exists.', 'danger')
else:
form_validation_failed(request, form)
return render_template('user/user.html', form=form)
@user_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_user(user_id):
user = User.query.get_or_404(user_id) # This will return a 404 if no user is found
form = EditUserForm(obj=user)
if form.validate_on_submit():
# Populate the user with form data
user.first_name = form.first_name.data
user.last_name = form.last_name.data
user.valid_to = form.valid_to.data
user.updated_at = dt.now(tz.utc)
# Update roles
current_roles = set(role.id for role in user.roles)
selected_roles = set(form.roles.data)
# Add new roles
for role_id in selected_roles - current_roles:
role = Role.query.get(role_id)
if role:
user.roles.append(role)
# Remove unselected roles
for role_id in current_roles - selected_roles:
role = Role.query.get(role_id)
if role:
user.roles.remove(role)
db.session.commit()
flash('User updated successfully.', 'success')
return redirect(
prefixed_url_for('user_bp.edit_user',
user_id=user.id)) # Assuming there's a user profile view to redirect to
else:
form_validation_failed(request, form)
form.roles.data = [role.id for role in user.roles]
return render_template('user/edit_user.html', form=form, user_id=user_id)
@user_bp.route('/select_tenant')
@roles_required('Super User')
def select_tenant():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = Tenant.query.order_by(Tenant.name) # Fetch all tenants from the database
pagination = query.paginate(page=page, per_page=per_page)
tenants = pagination.items
rows = prepare_table_for_macro(tenants, [('id', ''), ('name', ''), ('website', '')])
return render_template('user/select_tenant.html', rows=rows, pagination=pagination)
@user_bp.route('/handle_tenant_selection', methods=['POST'])
@roles_required('Super User')
def handle_tenant_selection():
tenant_identification = request.form['selected_row']
tenant_id = ast.literal_eval(tenant_identification).get('value')
the_tenant = Tenant.query.get(tenant_id)
session['tenant'] = the_tenant.to_dict()
session['default_language'] = the_tenant.default_language
session['embedding_model'] = the_tenant.embedding_model
session['llm_model'] = the_tenant.llm_model
action = request.form['action']
match action:
case 'view_users':
return redirect(prefixed_url_for('user_bp.view_users', tenant_id=tenant_id))
case 'edit_tenant':
return redirect(prefixed_url_for('user_bp.edit_tenant', tenant_id=tenant_id))
case 'select_tenant':
return redirect(prefixed_url_for('basic_bp.session_defaults'))
# Add more conditions for other actions
return redirect(prefixed_url_for('select_tenant'))
@user_bp.route('/view_users')
@roles_accepted('Super User', 'Tenant Admin')
def view_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
tenant_id = session.get('tenant').get('id')
query = User.query.filter_by(tenant_id=tenant_id).order_by(User.user_name)
pagination = query.paginate(page=page, per_page=per_page)
users = pagination.items
# prepare table data
rows = prepare_table_for_macro(users, [('id', ''), ('user_name', ''), ('email', '')])
# Render the users in a template
return render_template('user/view_users.html', rows=rows, pagination=pagination)
@user_bp.route('/handle_user_action', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_user_action():
user_identification = request.form['selected_row']
user_id = ast.literal_eval(user_identification).get('value')
user = User.query.get_or_404(user_id)
action = request.form['action']
if action == 'edit_user':
return redirect(prefixed_url_for('user_bp.edit_user', user_id=user_id))
elif action == 'resend_confirmation_email':
send_confirmation_email(user)
flash(f'Confirmation email sent to {user.email}.', 'success')
elif action == 'reset_uniquifier':
reset_uniquifier(user)
flash(f'Uniquifier reset for {user.user_name}.', 'success')
return redirect(prefixed_url_for('user_bp.view_users'))
@user_bp.route('/view_tenant_domains')
@roles_accepted('Super User', 'Tenant Admin')
def view_tenant_domains():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
tenant_id = session.get('tenant').get('id')
query = TenantDomain.query.filter_by(tenant_id=tenant_id).order_by(TenantDomain.domain)
pagination = query.paginate(page=page, per_page=per_page)
tenant_domains = pagination.items
# prepare table data
rows = prepare_table_for_macro(tenant_domains, [('id', ''), ('domain', ''), ('valid_to', '')])
# Render the users in a template
return render_template('user/view_tenant_domains.html', rows=rows, pagination=pagination)
@user_bp.route('/handle_tenant_domain_action', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_tenant_domain_action():
tenant_domain_identification = request.form['selected_row']
tenant_domain_id = ast.literal_eval(tenant_domain_identification).get('value')
action = request.form['action']
if action == 'edit_tenant_domain':
return redirect(prefixed_url_for('user_bp.edit_tenant_domain', tenant_domain_id=tenant_domain_id))
# Add more conditions for other actions
return redirect(prefixed_url_for('view_tenant_domains'))
@user_bp.route('/tenant_domain', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def tenant_domain():
form = TenantDomainForm()
if form.validate_on_submit():
new_tenant_domain = TenantDomain()
form.populate_obj(new_tenant_domain)
new_tenant_domain.tenant_id = session['tenant']['id']
set_logging_information(new_tenant_domain, dt.now(tz.utc))
# Add the new user to the database and commit the changes
try:
db.session.add(new_tenant_domain)
db.session.commit()
flash('Tenant Domain added successfully.', 'success')
current_app.logger.info(f'Tenant Domain {new_tenant_domain.domain} added for tenant {session["tenant"]["id"]}')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add Tenant Domain. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to create Tenant Domain {new_tenant_domain.domain}. '
f'for tenant {session["tenant"]["id"]}'
f'Error: {str(e)}')
else:
flash('Please fill in all required fields.', 'information')
return render_template('user/tenant_domain.html', form=form)
@user_bp.route('/tenant_domain/<int:tenant_domain_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_tenant_domain(tenant_domain_id):
tenant_domain = TenantDomain.query.get_or_404(tenant_domain_id) # This will return a 404 if no user is found
form = TenantDomainForm(obj=tenant_domain)
if request.method == 'POST' and form.validate_on_submit():
form.populate_obj(tenant_domain)
update_logging_information(tenant_domain, dt.now(tz.utc))
try:
db.session.add(tenant_domain)
db.session.commit()
flash('Tenant Domain updated successfully.', 'success')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update Tenant Domain. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update Tenant Domain {tenant_domain.id}. '
f'for tenant {session["tenant"]["id"]}'
f'Error: {str(e)}')
return redirect(
prefixed_url_for('user_bp.view_tenant_domains',
tenant_id=session['tenant']['id'])) # Assuming there's a user profile view to redirect to
else:
form_validation_failed(request, form)
return render_template('user/edit_tenant_domain.html', form=form, tenant_domain_id=tenant_domain_id)
@user_bp.route('/check_chat_api_key', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def check_chat_api_key():
tenant_id = session['tenant']['id']
tenant = Tenant.query.get_or_404(tenant_id)
if tenant.encrypted_chat_api_key:
return jsonify({'api_key_exists': True})
return jsonify({'api_key_exists': False})
@user_bp.route('/generate_chat_api_key', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def generate_chat_api_key():
tenant = Tenant.query.get_or_404(session['tenant']['id'])
new_api_key = generate_api_key(prefix="EveAI-CHAT")
tenant.encrypted_chat_api_key = kms_client.encrypt_api_key(new_api_key)
update_logging_information(tenant, dt.now(tz.utc))
try:
db.session.add(tenant)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
current_app.logger.error(f'Unable to store api key for tenant {tenant.id}. Error: {str(e)}')
return jsonify({'new_api_key': 'API key generated successfully.', 'api_key': new_api_key}), 200
@user_bp.route('/tenant_overview', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
def tenant_overview():
current_app.logger.debug('Rendering tenant overview')
current_app.logger.debug(f'current_user: {current_user}')
current_app.logger.debug(f'Current user roles: {current_user.roles}')
tenant_id = session['tenant']['id']
current_app.logger.debug(f'Generating overview for tenant {tenant_id}')
tenant = Tenant.query.get_or_404(tenant_id)
form = TenantForm(obj=tenant)
return render_template('user/tenant_overview.html', form=form)
def reset_uniquifier(user):
security.datastore.set_uniquifier(user)
db.session.add(user)
db.session.commit()
send_reset_email(user)
def set_logging_information(obj, timestamp):
obj.created_at = timestamp
obj.updated_at = timestamp
obj.created_by = current_user.id
obj.updated_by = current_user.id
def update_logging_information(obj, timestamp):
obj.updated_at = timestamp
obj.updated_by = current_user.id