Files
eveAI/eveai_app/views/user_views.py
Josako 659588deab Changes Documents - llm and languagefields on tenant, processing on documents
first version of Adding Documents (excl. embeddings)
2024-05-02 00:12:27 +02:00

248 lines
8.9 KiB
Python

# from . import user_bp
import uuid
from datetime import datetime as dt, timezone as tz
from flask import request, redirect, url_for, flash, render_template, Blueprint, session
from flask_security import hash_password, roles_required, roles_accepted
from ..models.user import User, Tenant, Role
from ..extensions import db
from .user_forms import TenantForm, CreateUserForm, EditUserForm
from ..utils.database import Database
user_bp = Blueprint('user_bp', __name__, url_prefix='/user')
@user_bp.route('/tenant', methods=['GET', 'POST'])
@roles_required('Super User')
def tenant():
if request.method == 'POST':
# Handle the required attributes
name = request.form.get('name')
website = request.form.get('website')
error = None
if not name:
error = 'Tenant name is required.'
elif not website:
error = 'Tenant website is required.'
# Create new tenant if there is no error
if error is None:
new_tenant = Tenant(name=name, website=website)
# Handle optional attributes
lic_start = request.form.get('license_start_date')
lic_end = request.form.get('license_end_date')
monthly = request.form.get('allowed_monthly_interactions')
# language fields
default_language = request.form.get('default_language')
allowed_languages = request.form.getlist('allowed_languages')
if default_language != '':
new_tenant.default_language = default_language
if allowed_languages != '':
new_tenant.allowed_languages = allowed_languages
# LLM fields
default_embedding = request.form.get('default_embedding')
allowed_embeddings = request.form.getlist('allowed_embeddings')
if default_embedding != '':
new_tenant.default_embedding = default_embedding
if allowed_embeddings != '':
new_tenant.allowed_embeddings = allowed_embeddings
default_llm_model = request.form.get('default_llm_model')
allowed_llm_models = request.form.getlist('allowed_llm_models')
if default_llm_model != '':
new_tenant.default_llm_model = default_llm_model
if allowed_llm_models != '':
new_tenant.allowed_llm_models = allowed_llm_models
# license data
if lic_start != '':
new_tenant.license_start_date = dt.strptime(lic_start, '%Y-%m-%d')
if lic_end != '':
new_tenant.license_end_date = dt.strptime(lic_end, '%Y-%m-%d')
if monthly != '':
new_tenant.allowed_monthly_interactions = int(monthly)
# Handle Timestamps
timestamp = dt.now(tz.utc)
new_tenant.created_at = timestamp
new_tenant.updated_at = timestamp
# Add the new tenant to the database and commit the changes
try:
db.session.add(new_tenant)
db.session.commit()
except Exception as e:
error = e.args
# Create schema for new tenant
if error is None:
Database(new_tenant.id).create_tenant_schema()
flash(error) if error else flash('Tenant added successfully.')
form = TenantForm()
return render_template('user/tenant.html', form=form)
@user_bp.route('/tenant/<int:tenant_id>', methods=['GET', 'POST'])
@roles_required('Super User')
def edit_tenant(tenant_id):
tenant = Tenant.query.get_or_404(tenant_id) # This will return a 404 if no tenant is found
form = TenantForm(obj=tenant)
if request.method == 'POST' and form.validate_on_submit():
# Populate the tenant with form data
form.populate_obj(tenant)
db.session.commit()
print(session)
flash('Tenant updated successfully.', 'success')
if session.get('tenant'):
if session['tenant'].get('id') == tenant_id:
session['tenant'] = tenant.to_dict()
# return redirect(url_for(f"user/tenant/tenant_id"))
return render_template('user/edit_tenant.html', form=form, tenant_id=tenant_id)
@user_bp.route('/user', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def user():
form = CreateUserForm()
if form.validate_on_submit():
if form.password.data != form.confirm_password.data:
flash('Passwords do not match.')
# Handle the required attributes
hashed_password = hash_password(form.password.data)
new_user = User(
user_name=form.user_name.data,
email=form.email.data,
password=hashed_password,
first_name=form.first_name.data,
last_name=form.last_name.data,
is_active=form.is_active.data,
valid_to=form.valid_to.data,
tenant_id=form.tenant_id.data
)
new_user.fs_uniquifier = str(uuid.uuid4())
timestamp = dt.now(tz.utc)
new_user.created_at = timestamp
new_user.updated_at = timestamp
# Handle the relations
tenant_id = request.form.get('tenant_id')
the_tenant = Tenant.query.get(tenant_id)
new_user.tenant = the_tenant
# Add roles
for role_id in form.roles.data:
the_role = Role.query.get(role_id)
new_user.roles.append(the_role)
# Add the new user to the database and commit the changes
try:
db.session.add(new_user)
db.session.commit()
flash('User added successfully.')
# return redirect(url_for('user/user'))
except Exception as e:
db.session.rollback()
flash(f'Failed to add user. Error: {str(e)}')
return render_template('user/user.html', form=form)
@user_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_user(user_id):
user = User.query.get_or_404(user_id) # This will return a 404 if no user is found
form = EditUserForm(obj=user)
if request.method == 'POST' and form.validate_on_submit():
# Populate the user with form data
user.first_name = form.first_name.data
user.last_name = form.last_name.data
user.is_active = form.is_active.data
user.valid_to = form.valid_to.data
user.updated_at = dt.now(tz.utc)
# Update roles
current_roles = set(role.id for role in user.roles)
selected_roles = set(form.roles.data)
# Add new roles
for role_id in selected_roles - current_roles:
role = Role.query.get(role_id)
if role:
user.roles.append(role)
# Remove unselected roles
for role_id in current_roles - selected_roles:
role = Role.query.get(role_id)
if role:
user.roles.remove(role)
db.session.commit()
flash('User updated successfully.', 'success')
return redirect(
url_for('user_bp.edit_user', user_id=user.id)) # Assuming there's a user profile view to redirect to
form.roles.data = [role.id for role in user.roles]
return render_template('user/edit_user.html', form=form, user_id=user_id)
@user_bp.route('/select_tenant')
@roles_required('Super User')
def select_tenant():
tenants = Tenant.query.all() # Fetch all tenants from the database
return render_template('user/select_tenant.html', tenants=tenants)
@user_bp.route('/handle_tenant_selection', methods=['POST'])
@roles_required('Super User')
def handle_tenant_selection():
tenant_id = request.form['tenant_id']
the_tenant = Tenant.query.get(tenant_id)
session['tenant'] = the_tenant.to_dict()
session['default_language'] = the_tenant.default_language
session['default_embedding_model'] = the_tenant.default_embedding_model
session['default_llm_model'] = the_tenant.default_llm_model
action = request.form['action']
if action == 'view_users':
return redirect(url_for('user_bp.view_users', tenant_id=tenant_id))
elif action == 'edit_tenant':
return redirect(url_for('user_bp.edit_tenant', tenant_id=tenant_id))
# Add more conditions for other actions
return redirect(url_for('select_tenant'))
@user_bp.route('/view_users/<int:tenant_id>')
@roles_accepted('Super User', 'Tenant Admin')
def view_users(tenant_id):
print(tenant_id)
tenant_id = int(tenant_id)
users = User.query.filter_by(tenant_id=tenant_id).all()
# Render the users in a template
return render_template('user/view_users.html', users=users)
@user_bp.route('/handle_user_action', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_user_action():
user_id = request.form['user_id']
action = request.form['action']
if action == 'edit_user':
return redirect(url_for('user_bp.edit_user', user_id=user_id))
# Add more conditions for other actions
return redirect(url_for('view_users'))