238 lines
9.6 KiB
Python
238 lines
9.6 KiB
Python
# from . import user_bp
|
|
import uuid
|
|
from datetime import datetime as dt, timezone as tz
|
|
from flask import request, redirect, url_for, flash, render_template, Blueprint, session, current_app
|
|
from flask_security import hash_password, roles_required, roles_accepted
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from common.models.user import User, Tenant, Role
|
|
from common.extensions import db
|
|
from .user_forms import TenantForm, CreateUserForm, EditUserForm
|
|
from common.utils.database import Database
|
|
|
|
user_bp = Blueprint('user_bp', __name__, url_prefix='/user')
|
|
|
|
|
|
@user_bp.route('/tenant', methods=['GET', 'POST'])
|
|
@roles_required('Super User')
|
|
def tenant():
|
|
form = TenantForm()
|
|
if form.validate_on_submit():
|
|
# Handle the required attributes
|
|
new_tenant = Tenant(name=form.name.data,
|
|
website=form.website.data,
|
|
default_language=form.default_language.data,
|
|
allowed_languages=form.allowed_languages.data,
|
|
embedding_model=form.embedding_model.data,
|
|
llm_model=form.llm_model.data,
|
|
licence_start_date=form.license_start_date.data,
|
|
lic_end_date=form.license_end_date.data,
|
|
allowed_monthly_interactions=form.allowed_monthly_interactions.data)
|
|
|
|
# Handle Embedding Variables
|
|
new_tenant.html_tags = form.html_tags.data.split(',') if form.html_tags.data else [],
|
|
new_tenant.html_end_tags = form.html_end_tags.data.split(',') if form.html_end_tags.data else [],
|
|
new_tenant.html_included_elements = form.html_included_elements.data.split(',') if form.html_included_elements.data else [],
|
|
new_tenant.html_excluded_elements = form.html_excluded_elements.data.split(',') if form.html_excluded_elements.data else []
|
|
|
|
# Handle Timestamps
|
|
timestamp = dt.now(tz.utc)
|
|
new_tenant.created_at = timestamp
|
|
new_tenant.updated_at = timestamp
|
|
|
|
# Add the new tenant to the database and commit the changes
|
|
try:
|
|
db.session.add(new_tenant)
|
|
db.session.commit()
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.error(f'Failed to add tenant to database. Error: {str(e)}')
|
|
flash(f'Failed to add tenant to database. Error: {str(e)}')
|
|
return render_template('user/tenant.html', form=form)
|
|
|
|
# Create schema for new tenant
|
|
current_app.logger.info(f"Successfully created tenant {new_tenant.id} in Database")
|
|
flash(f"Successfully created tenant {new_tenant.id} in Database")
|
|
current_app.logger.info(f"Creating schema for tenant {new_tenant.id}")
|
|
Database(new_tenant.id).create_tenant_schema()
|
|
return redirect(url_for('basic_bp.index'))
|
|
|
|
return render_template('user/tenant.html', form=form)
|
|
|
|
|
|
@user_bp.route('/tenant/<int:tenant_id>', methods=['GET', 'POST'])
|
|
@roles_required('Super User')
|
|
def edit_tenant(tenant_id):
|
|
tenant = Tenant.query.get_or_404(tenant_id) # This will return a 404 if no tenant is found
|
|
form = TenantForm(obj=tenant)
|
|
|
|
if request.method == 'GET':
|
|
# Populate the form with tenant data
|
|
form.populate_obj(tenant)
|
|
if tenant.html_tags:
|
|
form.html_tags.data = ', '.join(tenant.html_tags)
|
|
if tenant.html_end_tags:
|
|
form.html_end_tags.data = ', '.join(tenant.html_end_tags)
|
|
if tenant.html_included_elements:
|
|
form.html_included_elements.data = ', '.join(tenant.html_included_elements)
|
|
if tenant.html_excluded_elements:
|
|
form.html_excluded_elements.data = ', '.join(tenant.html_excluded_elements)
|
|
|
|
if request.method == 'POST' and form.validate_on_submit():
|
|
# Populate the tenant with form data
|
|
form.populate_obj(tenant)
|
|
# Then handle the special fields manually
|
|
tenant.html_tags = [tag.strip() for tag in form.html_tags.data.split(',') if tag.strip()]
|
|
tenant.html_end_tags = [tag.strip() for tag in form.html_end_tags.data.split(',') if tag.strip()]
|
|
tenant.html_included_elements = [elem.strip() for elem in form.html_included_elements.data.split(',') if
|
|
elem.strip()]
|
|
tenant.html_excluded_elements = [elem.strip() for elem in form.html_excluded_elements.data.split(',') if
|
|
elem.strip()]
|
|
|
|
db.session.commit()
|
|
flash('Tenant updated successfully.', 'success')
|
|
if session.get('tenant'):
|
|
if session['tenant'].get('id') == tenant_id:
|
|
session['tenant'] = tenant.to_dict()
|
|
# return redirect(url_for(f"user/tenant/tenant_id"))
|
|
|
|
return render_template('user/edit_tenant.html', form=form, tenant_id=tenant_id)
|
|
|
|
|
|
@user_bp.route('/user', methods=['GET', 'POST'])
|
|
@roles_accepted('Super User', 'Tenant Admin')
|
|
def user():
|
|
form = CreateUserForm()
|
|
if form.validate_on_submit():
|
|
if form.password.data != form.confirm_password.data:
|
|
flash('Passwords do not match.')
|
|
|
|
# Handle the required attributes
|
|
hashed_password = hash_password(form.password.data)
|
|
new_user = User(
|
|
user_name=form.user_name.data,
|
|
email=form.email.data,
|
|
password=hashed_password,
|
|
first_name=form.first_name.data,
|
|
last_name=form.last_name.data,
|
|
is_active=form.is_active.data,
|
|
valid_to=form.valid_to.data,
|
|
tenant_id=form.tenant_id.data
|
|
)
|
|
|
|
new_user.fs_uniquifier = str(uuid.uuid4())
|
|
timestamp = dt.now(tz.utc)
|
|
new_user.created_at = timestamp
|
|
new_user.updated_at = timestamp
|
|
|
|
# Handle the relations
|
|
tenant_id = request.form.get('tenant_id')
|
|
the_tenant = Tenant.query.get(tenant_id)
|
|
new_user.tenant = the_tenant
|
|
|
|
# Add roles
|
|
for role_id in form.roles.data:
|
|
the_role = Role.query.get(role_id)
|
|
new_user.roles.append(the_role)
|
|
|
|
# Add the new user to the database and commit the changes
|
|
|
|
try:
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
flash('User added successfully.')
|
|
# return redirect(url_for('user/user'))
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f'Failed to add user. Error: {str(e)}')
|
|
|
|
return render_template('user/user.html', form=form)
|
|
|
|
|
|
@user_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
|
|
@roles_accepted('Super User', 'Tenant Admin')
|
|
def edit_user(user_id):
|
|
user = User.query.get_or_404(user_id) # This will return a 404 if no user is found
|
|
form = EditUserForm(obj=user)
|
|
|
|
if request.method == 'POST' and form.validate_on_submit():
|
|
# Populate the user with form data
|
|
user.first_name = form.first_name.data
|
|
user.last_name = form.last_name.data
|
|
user.is_active = form.is_active.data
|
|
user.valid_to = form.valid_to.data
|
|
user.updated_at = dt.now(tz.utc)
|
|
|
|
# Update roles
|
|
current_roles = set(role.id for role in user.roles)
|
|
selected_roles = set(form.roles.data)
|
|
# Add new roles
|
|
for role_id in selected_roles - current_roles:
|
|
role = Role.query.get(role_id)
|
|
if role:
|
|
user.roles.append(role)
|
|
# Remove unselected roles
|
|
for role_id in current_roles - selected_roles:
|
|
role = Role.query.get(role_id)
|
|
if role:
|
|
user.roles.remove(role)
|
|
|
|
db.session.commit()
|
|
flash('User updated successfully.', 'success')
|
|
return redirect(
|
|
url_for('user_bp.edit_user', user_id=user.id)) # Assuming there's a user profile view to redirect to
|
|
|
|
form.roles.data = [role.id for role in user.roles]
|
|
return render_template('user/edit_user.html', form=form, user_id=user_id)
|
|
|
|
|
|
@user_bp.route('/select_tenant')
|
|
@roles_required('Super User')
|
|
def select_tenant():
|
|
tenants = Tenant.query.all() # Fetch all tenants from the database
|
|
return render_template('user/select_tenant.html', tenants=tenants)
|
|
|
|
|
|
@user_bp.route('/handle_tenant_selection', methods=['POST'])
|
|
@roles_required('Super User')
|
|
def handle_tenant_selection():
|
|
tenant_id = request.form['tenant_id']
|
|
the_tenant = Tenant.query.get(tenant_id)
|
|
session['tenant'] = the_tenant.to_dict()
|
|
session['default_language'] = the_tenant.default_language
|
|
session['embedding_model'] = the_tenant.embedding_model
|
|
session['llm_model'] = the_tenant.llm_model
|
|
action = request.form['action']
|
|
|
|
match action:
|
|
case 'view_users':
|
|
return redirect(url_for('user_bp.view_users', tenant_id=tenant_id))
|
|
case 'edit_tenant':
|
|
return redirect(url_for('user_bp.edit_tenant', tenant_id=tenant_id))
|
|
case 'select_tenant':
|
|
return redirect(url_for('basic_bp.session_defaults'))
|
|
# Add more conditions for other actions
|
|
return redirect(url_for('select_tenant'))
|
|
|
|
|
|
@user_bp.route('/view_users/<int:tenant_id>')
|
|
@roles_accepted('Super User', 'Tenant Admin')
|
|
def view_users(tenant_id):
|
|
print(tenant_id)
|
|
tenant_id = int(tenant_id)
|
|
users = User.query.filter_by(tenant_id=tenant_id).all()
|
|
|
|
# Render the users in a template
|
|
return render_template('user/view_users.html', users=users)
|
|
|
|
|
|
@user_bp.route('/handle_user_action', methods=['POST'])
|
|
@roles_accepted('Super User', 'Tenant Admin')
|
|
def handle_user_action():
|
|
user_id = request.form['user_id']
|
|
action = request.form['action']
|
|
|
|
if action == 'edit_user':
|
|
return redirect(url_for('user_bp.edit_user', user_id=user_id))
|
|
# Add more conditions for other actions
|
|
return redirect(url_for('view_users'))
|