Files
eveAI/eveai_app/views/document_views.py
Josako 43547287b1 - Refining & Enhancing dynamic fields
- Creating a specialized Form class for handling dynamic fields
- Refinement of HTML-macros to handle dynamic fields
- Introduction of dynamic fields for Catalogs
2024-10-29 09:17:44 +01:00

667 lines
28 KiB
Python

import ast
from datetime import datetime as dt, timezone as tz
from babel.messages.setuptools_frontend import update_catalog
from flask import request, redirect, flash, render_template, Blueprint, session, current_app
from flask_security import roles_accepted, current_user
from sqlalchemy import desc
from sqlalchemy.orm import aliased
from werkzeug.utils import secure_filename
from sqlalchemy.exc import SQLAlchemyError
import requests
from requests.exceptions import SSLError
from urllib.parse import urlparse, unquote
import io
import json
from common.models.document import Document, DocumentVersion, Catalog, Retriever
from common.extensions import db, minio_client
from common.utils.document_utils import validate_file_type, create_document_stack, start_embedding_task, process_url, \
process_multiple_urls, get_documents_list, edit_document, \
edit_document_version, refresh_document
from common.utils.eveai_exceptions import EveAIInvalidLanguageException, EveAIUnsupportedFileType, \
EveAIDoubleURLException
from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, AddURLsForm, \
CatalogForm, EditCatalogForm, RetrieverForm, EditRetrieverForm
from common.utils.middleware import mw_before_request
from common.utils.celery_utils import current_celery
from common.utils.nginx_utils import prefixed_url_for
from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro, form_to_dict
from .document_list_view import DocumentListView
from .document_version_list_view import DocumentVersionListView
from config.catalog_types import CATALOG_TYPES
from config.retriever_types import RETRIEVER_TYPES
document_bp = Blueprint('document_bp', __name__, url_prefix='/document')
@document_bp.before_request
def log_before_request():
current_app.logger.debug(f"Before request (document_bp): {request.method} {request.url}")
@document_bp.after_request
def log_after_request(response):
current_app.logger.debug(
f"After request (document_bp): {request.method} {request.url} - Status: {response.status}")
return response
@document_bp.before_request
def before_request():
try:
mw_before_request()
except Exception as e:
current_app.logger.error(f'Error switching schema in Document Blueprint: {e}')
for role in current_user.roles:
current_app.logger.debug(f'User {current_user.email} has role {role.name}')
raise
@document_bp.route('/catalog', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def catalog():
form = CatalogForm()
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
new_catalog = Catalog()
form.populate_obj(new_catalog)
# Handle Embedding Variables
new_catalog.html_tags = [tag.strip() for tag in form.html_tags.data.split(',')] if form.html_tags.data else []
new_catalog.html_end_tags = [tag.strip() for tag in form.html_end_tags.data.split(',')] \
if form.html_end_tags.data else []
new_catalog.html_included_elements = [tag.strip() for tag in form.html_included_elements.data.split(',')] \
if form.html_included_elements.data else []
new_catalog.html_excluded_elements = [tag.strip() for tag in form.html_excluded_elements.data.split(',')] \
if form.html_excluded_elements.data else []
new_catalog.html_excluded_classes = [cls.strip() for cls in form.html_excluded_classes.data.split(',')] \
if form.html_excluded_classes.data else []
set_logging_information(new_catalog, dt.now(tz.utc))
try:
db.session.add(new_catalog)
db.session.commit()
flash('Catalog successfully added!', 'success')
current_app.logger.info(f'Catalog {new_catalog.name} successfully added for tenant {tenant_id}!')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add catalog. Error: {e}', 'danger')
current_app.logger.error(f'Failed to add catalog {new_catalog.name}'
f'for tenant {tenant_id}. Error: {str(e)}')
return render_template('document/catalog.html', form=form)
@document_bp.route('/catalogs', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def catalogs():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = Catalog.query.order_by(Catalog.id)
pagination = query.paginate(page=page, per_page=per_page)
the_catalogs = pagination.items
# prepare table data
rows = prepare_table_for_macro(the_catalogs, [('id', ''), ('name', ''), ('type', '')])
# Render the catalogs in a template
return render_template('document/catalogs.html', rows=rows, pagination=pagination)
@document_bp.route('/handle_catalog_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_catalog_selection():
catalog_identification = request.form.get('selected_row')
catalog_id = ast.literal_eval(catalog_identification).get('value')
action = request.form['action']
catalog = Catalog.query.get_or_404(catalog_id)
if action == 'set_session_catalog':
current_app.logger.info(f'Setting session catalog to {catalog.name}')
session['catalog_id'] = catalog_id
session['catalog_name'] = catalog.name
current_app.logger.info(f'Finished setting session catalog to {catalog.name}')
elif action == 'edit_catalog':
return redirect(prefixed_url_for('document_bp.edit_catalog', catalog_id=catalog_id))
return redirect(prefixed_url_for('document_bp.catalogs'))
@document_bp.route('/catalog/<int:catalog_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_catalog(catalog_id):
catalog = Catalog.query.get_or_404(catalog_id)
tenant_id = session.get('tenant').get('id')
form = EditCatalogForm(request.form, obj=catalog)
configuration_config = CATALOG_TYPES[catalog.type]["configuration"]
form.add_dynamic_fields("configuration", configuration_config, catalog.configuration)
# Convert arrays to comma-separated strings for display
if request.method == 'GET':
form.html_tags.data = ', '.join(catalog.html_tags or '')
form.html_end_tags.data = ', '.join(catalog.html_end_tags or '')
form.html_included_elements.data = ', '.join(catalog.html_included_elements or '')
form.html_excluded_elements.data = ', '.join(catalog.html_excluded_elements or '')
form.html_excluded_classes.data = ', '.join(catalog.html_excluded_classes or '')
if request.method == 'POST' and form.validate_on_submit():
form.populate_obj(catalog)
# Handle Embedding Variables
catalog.html_tags = [tag.strip() for tag in form.html_tags.data.split(',')] if form.html_tags.data else []
catalog.html_end_tags = [tag.strip() for tag in form.html_end_tags.data.split(',')] \
if form.html_end_tags.data else []
catalog.html_included_elements = [tag.strip() for tag in form.html_included_elements.data.split(',')] \
if form.html_included_elements.data else []
catalog.html_excluded_elements = [tag.strip() for tag in form.html_excluded_elements.data.split(',')] \
if form.html_excluded_elements.data else []
catalog.html_excluded_classes = [cls.strip() for cls in form.html_excluded_classes.data.split(',')] \
if form.html_excluded_classes.data else []
catalog.configuration = form.get_dynamic_data('configuration')
update_logging_information(catalog, dt.now(tz.utc))
try:
db.session.add(catalog)
db.session.commit()
flash('Catalog successfully updated successfully!', 'success')
current_app.logger.info(f'Catalog {catalog.name} successfully updated for tenant {tenant_id}')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update catalog. Error: {e}', 'danger')
current_app.logger.error(f'Failed to update catalog {catalog_id} for tenant {tenant_id}. Error: {str(e)}')
return redirect(prefixed_url_for('document_bp.catalogs'))
else:
form_validation_failed(request, form)
return render_template('document/edit_catalog.html', form=form, catalog_id=catalog_id)
@document_bp.route('/retriever', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def retriever():
form = RetrieverForm()
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
new_retriever = Retriever()
form.populate_obj(new_retriever)
new_retriever.catalog_id = form.catalog.data.id
set_logging_information(new_retriever, dt.now(tz.utc))
try:
db.session.add(new_retriever)
db.session.commit()
flash('Retriever successfully added!', 'success')
current_app.logger.info(f'Catalog {new_retriever.name} successfully added for tenant {tenant_id}!')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add retriever. Error: {e}', 'danger')
current_app.logger.error(f'Failed to add retriever {new_retriever.name}'
f'for tenant {tenant_id}. Error: {str(e)}')
# Enable step 2 of creation of retriever - add configuration of the retriever (dependent on type)
return redirect(prefixed_url_for('document_bp.retriever', retriever_id=new_retriever.id))
return render_template('document/retriever.html', form=form)
@document_bp.route('/retriever/<int:retriever_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_retriever(retriever_id):
"""Edit an existing retriever configuration."""
# Get the retriever or return 404
retriever = Retriever.query.get_or_404(retriever_id)
if retriever.catalog_id:
# If catalog_id is just an ID, fetch the Catalog object
retriever.catalog = Catalog.query.get(retriever.catalog_id)
else:
retriever.catalog = None
# Create form instance with the retriever
form = EditRetrieverForm(request.form, obj=retriever)
configuration_config = RETRIEVER_TYPES[retriever.type]["configuration"]
form.add_dynamic_fields("configuration", configuration_config, retriever.configuration)
if form.validate_on_submit():
# Update basic fields
form.populate_obj(retriever)
retriever.configuration = form.get_dynamic_data('configuration')
# Update catalog relationship
retriever.catalog_id = form.catalog.data.id if form.catalog.data else None
# Update logging information
update_logging_information(retriever, dt.now(tz.utc))
# Save changes to database
try:
db.session.add(retriever)
db.session.commit()
flash('Retriever updated successfully!', 'success')
current_app.logger.info(f'Retriever {retriever.id} updated successfully')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update retriever. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update retriever {retriever_id}. Error: {str(e)}')
return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id)
return redirect(prefixed_url_for('document_bp.retrievers'))
else:
form_validation_failed(request, form)
current_app.logger.debug(f"Rendering Template for {retriever_id}")
return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id)
@document_bp.route('/retrievers', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def retrievers():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = Retriever.query.order_by(Retriever.id)
pagination = query.paginate(page=page, per_page=per_page)
the_retrievers = pagination.items
# prepare table data
rows = prepare_table_for_macro(the_retrievers,
[('id', ''), ('name', ''), ('type', ''), ('catalog_id', '')])
# Render the catalogs in a template
return render_template('document/retrievers.html', rows=rows, pagination=pagination)
@document_bp.route('/handle_retriever_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_retriever_selection():
retriever_identification = request.form.get('selected_row')
retriever_id = ast.literal_eval(retriever_identification).get('value')
action = request.form['action']
if action == 'edit_retriever':
return redirect(prefixed_url_for('document_bp.edit_retriever', retriever_id=retriever_id))
return redirect(prefixed_url_for('document_bp.retrievers'))
@document_bp.route('/add_document', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def add_document():
form = AddDocumentForm()
if form.validate_on_submit():
try:
tenant_id = session['tenant']['id']
catalog_id = session['catalog_id']
file = form.file.data
filename = secure_filename(file.filename)
extension = filename.rsplit('.', 1)[1].lower()
validate_file_type(extension)
current_app.logger.debug(f'Language on form: {form.language.data}')
api_input = {
'catalog_id': catalog_id,
'name': form.name.data,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data,
'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None,
}
current_app.logger.debug(f'Creating document stack with input {api_input}')
new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents'))
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
flash(str(e), 'error')
except Exception as e:
current_app.logger.error(f'Error adding document: {str(e)}')
flash('An error occurred while adding the document.', 'error')
return render_template('document/add_document.html', form=form)
@document_bp.route('/add_url', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def add_url():
form = AddURLForm()
if form.validate_on_submit():
try:
tenant_id = session['tenant']['id']
catalog_id = session['catalog_id']
url = form.url.data
file_content, filename, extension = process_url(url, tenant_id)
api_input = {
'catalog_id': catalog_id,
'name': form.name.data or filename,
'url': url,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data,
'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None,
}
new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents'))
except EveAIDoubleURLException:
flash(f'A document with url {url} already exists. No new document created.', 'info')
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
flash(str(e), 'error')
except Exception as e:
current_app.logger.error(f'Error adding document: {str(e)}')
flash('An error occurred while adding the document.', 'error')
return render_template('document/add_url.html', form=form)
@document_bp.route('/add_urls', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def add_urls():
form = AddURLsForm()
if form.validate_on_submit():
try:
tenant_id = session['tenant']['id']
urls = form.urls.data.split('\n')
urls = [url.strip() for url in urls if url.strip()]
api_input = {
'name': form.name.data,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data
}
results = process_multiple_urls(urls, tenant_id, api_input)
for result in results:
if result['status'] == 'success':
flash(
f"Processed URL: {result['url']} - Document ID: {result['document_id']}, Version ID: {result['document_version_id']}",
'success')
else:
flash(f"Error processing URL: {result['url']} - {result['message']}", 'error')
return redirect(prefixed_url_for('document_bp.documents'))
except Exception as e:
current_app.logger.error(f'Error adding multiple URLs: {str(e)}')
flash('An error occurred while adding the URLs.', 'error')
return render_template('document/add_urls.html', form=form)
@document_bp.route('/documents', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def documents():
view = DocumentListView(Document, 'document/documents.html', per_page=10)
return view.get()
@document_bp.route('/handle_document_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_document_selection():
document_identification = request.form['selected_row']
if isinstance(document_identification, int) or document_identification.isdigit():
doc_id = int(document_identification)
else:
# If it's not an integer, assume it's a string representation of a dictionary
try:
doc_id = ast.literal_eval(document_identification).get('value')
except (ValueError, AttributeError):
flash('Invalid document selection.', 'error')
return redirect(prefixed_url_for('document_bp.documents'))
action = request.form['action']
match action:
case 'edit_document':
return redirect(prefixed_url_for('document_bp.edit_document_view', document_id=doc_id))
case 'document_versions':
return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id))
case 'refresh_document':
refresh_document_view(doc_id)
return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id))
case 're_embed_latest_versions':
re_embed_latest_versions()
# Add more conditions for other actions
return redirect(prefixed_url_for('document_bp.documents'))
@document_bp.route('/edit_document/<int:document_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_document_view(document_id):
# Use an alias for the Catalog to avoid column name conflicts
CatalogAlias = aliased(Catalog)
# Query for the document and its catalog
result = db.session.query(Document, CatalogAlias.name.label('catalog_name')) \
.join(CatalogAlias, Document.catalog_id == CatalogAlias.id) \
.filter(Document.id == document_id) \
.first_or_404()
doc, catalog_name = result
form = EditDocumentForm(obj=doc)
if request.method == 'GET':
# Populate form with current values
form.name.data = doc.name
form.valid_from.data = doc.valid_from
form.valid_to.data = doc.valid_to
if form.validate_on_submit():
updated_doc, error = edit_document(
document_id,
form.name.data,
form.valid_from.data,
form.valid_to.data
)
if updated_doc:
flash(f'Document {updated_doc.id} updated successfully', 'success')
return redirect(prefixed_url_for('document_bp.documents'))
else:
flash(f'Error updating document: {error}', 'danger')
else:
form_validation_failed(request, form)
return render_template('document/edit_document.html', form=form, document_id=document_id, catalog_name=catalog_name)
@document_bp.route('/edit_document_version/<int:document_version_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def edit_document_version_view(document_version_id):
doc_vers = DocumentVersion.query.get_or_404(document_version_id)
form = EditDocumentVersionForm(obj=doc_vers)
if form.validate_on_submit():
updated_version, error = edit_document_version(
document_version_id,
form.user_context.data
)
if updated_version:
flash(f'Document Version {updated_version.id} updated successfully', 'success')
return redirect(prefixed_url_for('document_bp.document_versions', document_id=updated_version.doc_id))
else:
flash(f'Error updating document version: {error}', 'danger')
else:
form_validation_failed(request, form)
return render_template('document/edit_document_version.html', form=form, document_version_id=document_version_id,
doc_details=f'Document {doc_vers.document.name}')
@document_bp.route('/document_versions/<int:document_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def document_versions(document_id):
doc = Document.query.get_or_404(document_id)
doc_desc = f'Document {doc.name}'
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
query = (DocumentVersion.query.filter_by(doc_id=document_id)
.order_by(DocumentVersion.language)
.order_by(desc(DocumentVersion.id)))
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
doc_langs = pagination.items
rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''),
('object_name', ''), ('file_type', ''),
('processing', ''), ('processing_started_at', ''),
('processing_finished_at', ''), ('processing_error', '')])
return render_template('document/document_versions.html', rows=rows, pagination=pagination, document=doc_desc)
@document_bp.route('/handle_document_version_selection', methods=['POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_document_version_selection():
document_version_identification = request.form['selected_row']
if isinstance(document_version_identification, int) or document_version_identification.isdigit():
doc_vers_id = int(document_version_identification)
else:
# If it's not an integer, assume it's a string representation of a dictionary
try:
doc_vers_id = ast.literal_eval(document_version_identification).get('value')
except (ValueError, AttributeError):
flash('Invalid document version selection.', 'error')
return redirect(prefixed_url_for('document_bp.document_versions_list'))
action = request.form['action']
current_app.logger.debug(f'Triggered Document Version Action: {action}')
match action:
case 'edit_document_version':
return redirect(prefixed_url_for('document_bp.edit_document_version_view', document_version_id=doc_vers_id))
case 'process_document_version':
process_version(doc_vers_id)
# Add more conditions for other actions
doc_vers = DocumentVersion.query.get_or_404(doc_vers_id)
return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_vers.doc_id))
@document_bp.route('/library_operations', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def library_operations():
return render_template('document/library_operations.html')
@document_bp.route('/handle_library_selection', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def handle_library_selection():
action = request.form['action']
match action:
case 're_embed_latest_versions':
re_embed_latest_versions()
case 'refresh_all_documents':
refresh_all_documents()
return redirect(prefixed_url_for('document_bp.library_operations'))
@document_bp.route('/document_versions_list', methods=['GET'])
@roles_accepted('Super User', 'Tenant Admin')
def document_versions_list():
current_app.logger.debug('Getting document versions list')
view = DocumentVersionListView(DocumentVersion, 'document/document_versions_list_view.html', per_page=20)
current_app.logger.debug('Got document versions list')
return view.get()
def refresh_all_documents():
for doc in Document.query.all():
refresh_document(doc.id)
def refresh_document_view(document_id):
new_version, result = refresh_document(document_id, session['tenant']['id'])
if new_version:
flash(f'Document refreshed. New version: {new_version.id}. Task ID: {result}', 'success')
else:
flash(f'Error refreshing document: {result}', 'danger')
return redirect(prefixed_url_for('document_bp.documents'))
def re_embed_latest_versions():
docs = Document.query.all()
for doc in docs:
latest_doc_version = DocumentVersion.query.filter_by(doc_id=doc.id).order_by(desc(DocumentVersion.id)).first()
if latest_doc_version:
process_version(latest_doc_version.id)
def process_version(version_id):
task = current_celery.send_task('create_embeddings',
args=[session['tenant']['id'], version_id,],
queue='embeddings')
current_app.logger.info(f'Embedding creation retriggered by user {current_user.id}, {current_user.email} '
f'for tenant {session["tenant"]["id"]}, '
f'Document Version {version_id}. '
f'Embedding creation task: {task.id}')
flash(f'Processing for document version {version_id} retriggered successfully...', 'success')
return redirect(prefixed_url_for('document_bp.documents'))
def set_logging_information(obj, timestamp):
obj.created_at = timestamp
obj.updated_at = timestamp
obj.created_by = current_user.id
obj.updated_by = current_user.id
def update_logging_information(obj, timestamp):
obj.updated_at = timestamp
obj.updated_by = current_user.id
def log_session_state(session, msg=""):
current_app.logger.debug(f"{msg} - Session dirty: {session.dirty}")
current_app.logger.debug(f"{msg} - Session new: {session.new}")
def fetch_html(url):
# Fetches HTML content from a URL
try:
response = requests.get(url)
except SSLError as e:
current_app.logger.error(f"Error fetching HTML from {url} for tenant {session['tenant']['id']}. "
f"Error Encountered: {e}")
if current_app.config.get('DEBUG'): # only allow when in a development environment
current_app.logger.info(f"Skipping SSL verification for {url} for tenant {session['tenant']['id']}. "
f"Only while in development environment.")
response = requests.get(url, verify=False) # Disable SSL verification
else:
response = None
response.raise_for_status() # Will raise an exception for bad requests
return response.content