Files
eveAI/eveai_app/views/document_views.py
2025-11-18 11:14:49 +01:00

926 lines
40 KiB
Python

import ast
from datetime import datetime as dt, timezone as tz
from flask import request, redirect, flash, render_template, Blueprint, session, current_app
from flask_security import roles_accepted, current_user
from sqlalchemy import desc
from sqlalchemy.orm import aliased
from werkzeug.utils import secure_filename
from werkzeug.datastructures import CombinedMultiDict
from sqlalchemy.exc import SQLAlchemyError
import requests
from requests.exceptions import SSLError, HTTPError
import json
from common.models.document import Document, DocumentVersion, Catalog, Retriever, Processor
from common.extensions import db, cache_manager, minio_client
from common.models.interaction import Specialist, SpecialistRetriever
from common.utils.document_utils import create_document_stack, start_embedding_task, process_url, \
edit_document as util_edit_document, edit_document_version as util_edit_document_version, refresh_document, \
clean_url, is_file_type_supported_by_catalog
from common.utils.dynamic_field_utils import create_default_config_from_type_config
from common.utils.eveai_exceptions import EveAIException
from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, \
CatalogForm, EditCatalogForm, RetrieverForm, EditRetrieverForm, ProcessorForm, EditProcessorForm
from common.utils.middleware import mw_before_request
from common.utils.celery_utils import current_celery
from common.utils.nginx_utils import prefixed_url_for
from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro
from eveai_app.views.list_views.list_view_utils import render_list_view
from eveai_app.views.list_views.document_list_views import get_catalogs_list_view, get_processors_list_view, \
get_retrievers_list_view, get_documents_list_view, get_documents_processing_list_view
from eveai_app.views.list_views.list_view_utils import render_list_view
document_bp = Blueprint('document_bp', __name__, url_prefix='/document')
@document_bp.before_request
def log_before_request():
current_app.logger.debug(f'Before request: {request.path} =====================================')
@document_bp.after_request
def log_after_request(response):
return response
@document_bp.before_request
def before_request():
try:
mw_before_request()
except Exception as e:
current_app.logger.error(f'Error switching schema in Document Blueprint: {e}')
raise
# Catalog Management ------------------------------------------------------------------------------
@document_bp.route('/catalog', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def catalog():
form = CatalogForm()
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
new_catalog = Catalog()
form.populate_obj(new_catalog)
set_logging_information(new_catalog, dt.now(tz.utc))
# Set Type information, including the configuration for backward compatibility
new_catalog.type_version = cache_manager.catalogs_version_tree_cache.get_latest_version(new_catalog.type)
new_catalog.configuration = (cache_manager.catalogs_config_cache
.get_config(new_catalog.type, new_catalog.type_version).get("configuration", {}))
try:
db.session.add(new_catalog)
db.session.commit()
flash('Catalog successfully added!', 'success')
current_app.logger.info(f'Catalog {new_catalog.name} successfully added for tenant {tenant_id}!')
# Enable step 2 of creation of catalog - add configuration of the catalog (dependent on type)
return redirect(prefixed_url_for('document_bp.catalog', catalog_id=new_catalog.id, for_redirect=True))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add catalog. Error: {e}', 'danger')
current_app.logger.error(f'Failed to add catalog {new_catalog.name}'
f'for tenant {tenant_id}. Error: {str(e)}')
return render_template('document/catalog.html', form=form)
@document_bp.route('/catalogs', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def catalogs():
# Haal configuratie op en render de lijst-weergave
config = get_catalogs_list_view()
return render_list_view('list_view.html', **config)
@document_bp.route('/handle_catalog_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_catalog_selection():
action = request.form['action']
if action == 'create_catalog':
return redirect(prefixed_url_for('document_bp.catalog', for_redirect=True))
catalog_identification = request.form.get('selected_row')
catalog_id = ast.literal_eval(catalog_identification).get('value')
catalog = Catalog.query.get_or_404(catalog_id)
if action == 'set_session_catalog':
current_app.logger.info(f'Setting session catalog to {catalog.name}')
session['catalog_id'] = catalog_id
session['catalog_name'] = catalog.name
session['catalog'] = catalog.to_dict()
elif action == 'edit_catalog':
return redirect(prefixed_url_for('document_bp.edit_catalog', catalog_id=catalog_id, for_redirect=True))
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
@document_bp.route('/catalog/<int:catalog_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_catalog(catalog_id):
catalog = Catalog.query.get_or_404(catalog_id)
tenant_id = session.get('tenant').get('id')
form = EditCatalogForm(request.form, obj=catalog)
full_config = cache_manager.catalogs_config_cache.get_config(catalog.type)
if request.method == 'POST' and form.validate_on_submit():
form.populate_obj(catalog)
update_logging_information(catalog, dt.now(tz.utc))
try:
db.session.add(catalog)
db.session.commit()
if session.get('catalog_id') == catalog_id:
session['catalog'] = catalog.to_dict()
flash('Catalog successfully updated successfully!', 'success')
current_app.logger.info(f'Catalog {catalog.name} successfully updated for tenant {tenant_id}')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update catalog. Error: {e}', 'danger')
current_app.logger.error(f'Failed to update catalog {catalog_id} for tenant {tenant_id}. Error: {str(e)}')
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
else:
form_validation_failed(request, form)
return render_template('document/edit_catalog.html', form=form, catalog_id=catalog_id)
# Processor Management ----------------------------------------------------------------------------
@document_bp.route('/processor', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def processor():
form = ProcessorForm()
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
new_processor = Processor()
form.populate_obj(new_processor)
new_processor.catalog_id = session.get('catalog_id')
processor_config = cache_manager.processors_config_cache.get_config(new_processor.type)
new_processor.configuration = create_default_config_from_type_config(
processor_config["configuration"])
set_logging_information(new_processor, dt.now(tz.utc))
try:
db.session.add(new_processor)
db.session.commit()
flash('Processor successfully added!', 'success')
current_app.logger.info(f'Processor {new_processor.name} successfully added for tenant {tenant_id}!')
# Enable step 2 of creation of retriever - add configuration of the retriever (dependent on type)
return redirect(prefixed_url_for('document_bp.edit_processor', processor_id=new_processor.id, for_redirect=True))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add processor. Error: {e}', 'danger')
current_app.logger.error(f'Failed to add retriever {new_processor.name}'
f'for tenant {tenant_id}. Error: {str(e)}')
return render_template('document/processor.html', form=form)
@document_bp.route('/processor/<int:processor_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_processor(processor_id):
"""Edit an existing processor configuration."""
# Get the processor or return 404
processor = Processor.query.get_or_404(processor_id)
if processor.catalog_id:
# If catalog_id is just an ID, fetch the Catalog object
processor.catalog = Catalog.query.get(processor.catalog_id)
else:
processor.catalog = None
# Create form instance with the processor
form = EditProcessorForm(request.form, obj=processor)
full_config = cache_manager.processors_config_cache.get_config(processor.type)
form.add_dynamic_fields("configuration", full_config, processor.configuration)
if form.validate_on_submit():
# Update basic fields
form.populate_obj(processor)
processor.configuration = form.get_dynamic_data('configuration')
# Update logging information
update_logging_information(processor, dt.now(tz.utc))
# Save changes to database
try:
db.session.add(processor)
db.session.commit()
flash('Processor updated successfully!', 'success')
current_app.logger.info(f'Processor {processor.id} updated successfully')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update processor. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update processor {processor_id}. Error: {str(e)}')
return render_template('document/edit_processor.html', form=form, processor_id=processor_id)
return redirect(prefixed_url_for('document_bp.processors', for_redirect=True))
else:
form_validation_failed(request, form)
return render_template('document/edit_processor.html', form=form, processor_id=processor_id)
@document_bp.route('/processors', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def processors():
catalog_id = session.get('catalog_id', None)
if not catalog_id:
flash('You need to set a Session Catalog before adding Documents or URLs', 'warning')
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
# Get configuration and render the list view
config = get_processors_list_view(catalog_id)
return render_list_view('list_view.html', **config)
@document_bp.route('/handle_processor_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_processor_selection():
action = request.form['action']
if action == 'create_processor':
return redirect(prefixed_url_for('document_bp.processor', for_redirect=True))
processor_identification = request.form.get('selected_row')
processor_id = ast.literal_eval(processor_identification).get('value')
if action == 'edit_processor':
return redirect(prefixed_url_for('document_bp.edit_processor', processor_id=processor_id, for_redirect=True))
return redirect(prefixed_url_for('document_bp.processors', for_redirect=True))
# Retriever Management ----------------------------------------------------------------------------
@document_bp.route('/retriever', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def retriever():
form = RetrieverForm()
if form.validate_on_submit():
tenant_id = session.get('tenant').get('id')
new_retriever = Retriever()
form.populate_obj(new_retriever)
new_retriever.catalog_id = session.get('catalog_id')
new_retriever.type_version = cache_manager.retrievers_version_tree_cache.get_latest_version(
new_retriever.type)
set_logging_information(new_retriever, dt.now(tz.utc))
try:
db.session.add(new_retriever)
db.session.commit()
flash('Retriever successfully added!', 'success')
current_app.logger.info(f'Catalog {new_retriever.name} successfully added for tenant {tenant_id}!')
# Enable step 2 of creation of retriever - add configuration of the retriever (dependent on type)
return redirect(prefixed_url_for('document_bp.edit_retriever', retriever_id=new_retriever.id, for_redirect=True))
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to add retriever. Error: {e}', 'danger')
current_app.logger.error(f'Failed to add retriever {new_retriever.name}'
f'for tenant {tenant_id}. Error: {str(e)}')
return render_template('document/retriever.html', form=form)
@document_bp.route('/retriever/<int:retriever_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_retriever(retriever_id):
"""Edit an existing retriever configuration."""
# Get the retriever or return 404
retriever = Retriever.query.get_or_404(retriever_id)
# Create form instance with the retriever
form = EditRetrieverForm(request.form, obj=retriever)
retriever_config = cache_manager.retrievers_config_cache.get_config(retriever.type, retriever.type_version)
form.add_dynamic_fields("configuration", retriever_config, retriever.configuration)
if form.validate_on_submit():
# Update basic fields
form.populate_obj(retriever)
retriever.configuration = form.get_dynamic_data('configuration')
# Update logging information
update_logging_information(retriever, dt.now(tz.utc))
# Save changes to database
try:
db.session.add(retriever)
db.session.commit()
flash('Retriever updated successfully!', 'success')
current_app.logger.info(f'Retriever {retriever.id} updated successfully')
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to update retriever. Error: {str(e)}', 'danger')
current_app.logger.error(f'Failed to update retriever {retriever_id}. Error: {str(e)}')
return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id)
return redirect(prefixed_url_for('document_bp.retrievers', for_redirect=True))
else:
form_validation_failed(request, form)
return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id)
@document_bp.route('/retrievers', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def retrievers():
catalog_id = session.get('catalog_id', None)
if not catalog_id:
flash('You need to set a Session Catalog before adding Documents or URLs', 'warning')
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
# Get configuration and render the list view
config = get_retrievers_list_view(catalog_id)
return render_list_view('list_view.html', **config)
@document_bp.route('/handle_retriever_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_retriever_selection():
action = request.form['action']
if action == 'create_retriever':
return redirect(prefixed_url_for('document_bp.retriever', for_redirect=True))
retriever_identification = request.form.get('selected_row')
retriever_id = ast.literal_eval(retriever_identification).get('value')
if action == 'edit_retriever':
return redirect(prefixed_url_for('document_bp.edit_retriever', retriever_id=retriever_id, for_redirect=True))
return redirect(prefixed_url_for('document_bp.retrievers', for_redirect=True))
# Document Management -----------------------------------------------------------------------------
@document_bp.route('/add_document', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def add_document():
# Log vroege request-info om uploadproblemen te diagnosticeren
try:
current_app.logger.debug(
f"[add_document] method={request.method}, content_type={request.content_type}, "
f"files_keys={list(request.files.keys())}"
)
except Exception:
pass
# Bind expliciet zowel form- als file-data aan de form (belangrijk voor FileField & CSRF)
form = AddDocumentForm(CombinedMultiDict([request.form, request.files]))
catalog_id = session.get('catalog_id', None)
if catalog_id is None:
flash('You need to set a Session Catalog before adding Documents or URLs', 'warning')
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
catalog = Catalog.query.get_or_404(catalog_id)
if catalog.configuration and len(catalog.configuration) > 0:
form.add_dynamic_fields("tagging_fields", catalog.configuration)
current_app.logger.debug("In Add Document")
# Extra debug logging om CSRF/payload te controleren
try:
current_app.logger.debug(
f"[add_document] request.form keys: {list(request.form.keys())}"
)
current_app.logger.debug(
f"[add_document] csrf_token in form? {request.form.get('csrf_token') is not None}"
)
try:
has_csrf_field = hasattr(form, 'csrf_token')
current_app.logger.debug(
f"[add_document] form has csrf field? {has_csrf_field}"
)
if has_csrf_field:
# Let op: we loggen geen tokenwaarde om lekken te vermijden; enkel aanwezigheid
current_app.logger.debug(
"[add_document] form.csrf_token field is present on form object"
)
# Bevestig of de CSRF-waarde effectief in de form is gebonden
try:
current_app.logger.debug(
f"[add_document] csrf bound? data_present={bool(form.csrf_token.data)} field_name={getattr(form.csrf_token, 'name', None)}"
)
except Exception:
pass
except Exception:
pass
except Exception:
pass
if form.validate_on_submit():
try:
current_app.logger.info(f'Adding Document for {catalog_id}')
tenant_id = session['tenant']['id']
file = form.file.data
sub_file_type = form.sub_file_type.data
filename = secure_filename(file.filename)
extension = filename.rsplit('.', 1)[1].lower()
is_file_type_supported_by_catalog(catalog_id, extension)
catalog_properties = form.get_dynamic_data("tagging_fields")
api_input = {
'catalog_id': catalog_id,
'name': form.name.data,
'sub_file_type': form.sub_file_type.data,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data,
'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None,
'catalog_properties': catalog_properties,
}
new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents_processing', for_redirect=True))
except EveAIException as e:
flash(str(e), 'danger')
current_app.logger.error(f"Error adding document: {str(e)}")
except Exception as e:
current_app.logger.error(f'Error adding document: {str(e)}')
flash('An error occurred while adding the document.', 'danger')
else:
# Toon en log validatiefouten als de submit faalt
if request.method == 'POST':
try:
current_app.logger.warning(
f"[add_document] form validation failed. errors={getattr(form, 'errors', {})}"
)
current_app.logger.debug(
f"[add_document] request.files keys after validation: {list(request.files.keys())}"
)
current_app.logger.debug(
f"[add_document] request.form keys after validation: {list(request.form.keys())}"
)
current_app.logger.debug(
f"[add_document] csrf_token in form after validation? {request.form.get('csrf_token') is not None}"
)
except Exception:
pass
form_validation_failed(request, form)
return render_template('document/add_document.html', form=form)
@document_bp.route('/add_url', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def add_url():
# Log vroege request-info om submitproblemen te diagnosticeren
try:
current_app.logger.debug(
f"[add_url] method={request.method}, content_type={request.content_type}, files_keys={list(request.files.keys())}"
)
except Exception:
pass
# Bind expliciet zowel form- als file-data (consistentie en duidelijkheid)
form = AddURLForm(CombinedMultiDict([request.form, request.files]))
catalog_id = session.get('catalog_id', None)
if catalog_id is None:
flash('You need to set a Session Catalog before adding Documents or URLs', 'warning')
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
catalog = Catalog.query.get_or_404(catalog_id)
if catalog.configuration and len(catalog.configuration) > 0:
form.add_dynamic_fields("tagging_fields", catalog.configuration)
url=""
# Kleine debug om te zien of CSRF aan de form gebonden is
try:
if hasattr(form, 'csrf_token'):
current_app.logger.debug(
f"[add_url] csrf bound? data_present={bool(form.csrf_token.data)} field_name={getattr(form.csrf_token, 'name', None)}"
)
except Exception:
pass
if form.validate_on_submit():
try:
tenant_id = session['tenant']['id']
url = form.url.data
url = clean_url(url)
file_content, filename, extension = process_url(url, tenant_id)
is_file_type_supported_by_catalog(catalog_id, extension)
catalog_properties = {}
full_config = cache_manager.catalogs_config_cache.get_config(catalog.type)
document_version_configurations = full_config['document_version_configurations']
for config in document_version_configurations:
catalog_properties[config] = form.get_dynamic_data(config)
api_input = {
'catalog_id': catalog_id,
'name': form.name.data or filename,
'sub_file_type': form.sub_file_type.data,
'url': url,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data,
'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None,
'catalog_properties': catalog_properties,
}
new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents_processing', for_redirect=True))
except EveAIException as e:
current_app.logger.error(f"Error adding document: {str(e)}")
flash(str(e), 'danger')
except HTTPError as e:
current_app.logger.error(f"Server refused download for {url}: {str(e)}")
flash(f'Server refused download for {url}: {str(e)}', 'danger')
except Exception as e:
current_app.logger.error(f'Error adding document: {str(e)}')
flash('An error occurred while adding the document.', 'danger')
else:
if request.method == 'POST':
try:
current_app.logger.warning(
f"[add_url] form validation failed. errors={getattr(form, 'errors', {})}"
)
except Exception:
pass
form_validation_failed(request, form)
return render_template('document/add_url.html', form=form)
@document_bp.route('/documents', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def documents():
catalog_id = session.get('catalog_id', None)
if not catalog_id:
flash('You need to set a Session Catalog before adding Documents or URLs', 'warning')
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
config = get_documents_list_view(catalog_id)
return render_list_view('list_view.html', **config)
@document_bp.route('/documents_processing', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def documents_processing():
catalog_id = session.get('catalog_id', None)
if not catalog_id:
flash('You need to set a Session Catalog before adding Documents or URLs', 'warning')
return redirect(prefixed_url_for('document_bp.catalogs', for_redirect=True))
config = get_documents_processing_list_view(catalog_id)
return render_list_view('list_view.html', **config)
@document_bp.route('/handle_document_selection', methods=['POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_document_selection():
document_identification = request.form['selected_row']
if isinstance(document_identification, int) or document_identification.isdigit():
doc_id = int(document_identification)
else:
# If it's not an integer, assume it's a string representation of a dictionary
try:
doc_id = ast.literal_eval(document_identification).get('value')
except (ValueError, AttributeError):
flash('Invalid document selection.', 'danger')
return redirect(prefixed_url_for('document_bp.documents', for_redirect=True))
action = request.form['action']
match action:
case 'edit_document':
return redirect(prefixed_url_for('document_bp.edit_document', document_id=doc_id, for_redirect=True))
case 'refresh':
refresh_document_view(doc_id)
return redirect(prefixed_url_for('document_bp.documents', document_id=doc_id, for_redirect=True))
case 're_process':
document = Document.query.get_or_404(doc_id)
doc_vers_id = document.latest_version.id
process_version(doc_vers_id)
case 'view_document_markdown':
document = Document.query.get_or_404(doc_id)
doc_vers_id = document.latest_version.id
return redirect(prefixed_url_for('document_bp.view_document_version_markdown',
document_version_id=doc_vers_id, for_redirect=True))
case 'edit_document_version':
document = Document.query.get_or_404(doc_id)
doc_vers_id = document.latest_version.id
return redirect(prefixed_url_for('document_bp.edit_document_version', document_version_id=doc_vers_id, for_redirect=True))
# Add more conditions for other actions
return redirect(prefixed_url_for('document_bp.documents', for_redirect=True))
@document_bp.route('/edit_document/<int:document_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_document(document_id):
# Use an alias for the Catalog to avoid column name conflicts
CatalogAlias = aliased(Catalog)
# Query for the document and its catalog
result = db.session.query(Document, CatalogAlias.name.label('catalog_name')) \
.join(CatalogAlias, Document.catalog_id == CatalogAlias.id) \
.filter(Document.id == document_id) \
.first_or_404()
doc, catalog_name = result
form = EditDocumentForm(obj=doc)
if request.method == 'GET':
# Populate form with current values
form.name.data = doc.name
form.valid_from.data = doc.valid_from
form.valid_to.data = doc.valid_to
if form.validate_on_submit():
updated_doc, error = util_edit_document(
session.get('tenant').get('id', 0),
document_id,
form.name.data,
form.valid_from.data,
form.valid_to.data
)
if updated_doc:
flash(f'Document {updated_doc.id} updated successfully', 'success')
return redirect(prefixed_url_for('document_bp.documents', for_redirect=True))
else:
flash(f'Error updating document: {error}', 'danger')
else:
form_validation_failed(request, form)
return render_template('document/edit_document.html', form=form, document_id=document_id, catalog_name=catalog_name)
@document_bp.route('/edit_document_version/<int:document_version_id>', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def edit_document_version(document_version_id):
doc_vers = DocumentVersion.query.get_or_404(document_version_id)
form = EditDocumentVersionForm(request.form, obj=doc_vers)
doc_vers = DocumentVersion.query.get_or_404(document_version_id)
catalog_id = doc_vers.document.catalog_id
catalog = Catalog.query.get_or_404(catalog_id)
current_app.logger.debug(f"Catalog Configuration: {catalog.configuration}")
if catalog.configuration and len(catalog.configuration) > 0:
current_app.logger.debug(f"Document Version Catalog Properties: {doc_vers.catalog_properties}")
form.add_dynamic_fields("tagging_fields", catalog.configuration, doc_vers.catalog_properties["tagging_fields"])
if form.validate_on_submit():
catalog_properties = {}
# Use the full_config variable we already defined
catalog_properties = {"tagging_fields": form.get_dynamic_data("tagging_fields")}
current_app.logger.debug(f"New Document Version Catalog Properties: {catalog_properties}")
updated_version, error = util_edit_document_version(
session.get('tenant').get('id', 0),
document_version_id,
form.user_context.data,
catalog_properties,
)
if updated_version:
flash(f'Document Version {updated_version.id} updated successfully', 'success')
return redirect(prefixed_url_for('document_bp.documents', document_id=updated_version.doc_id, for_redirect=True))
else:
flash(f'Error updating document version: {error}', 'danger')
else:
form_validation_failed(request, form)
return render_template('document/edit_document_version.html', form=form, document_version_id=document_version_id,
doc_details=f'Document {doc_vers.document.name}')
@document_bp.route('/view_document_version_markdown/<int:document_version_id>', methods=['GET'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def view_document_version_markdown(document_version_id):
# Retrieve document version
document_version = DocumentVersion.query.get_or_404(document_version_id)
# retrieve tenant information
tenant_id = session.get('tenant').get('id')
try:
# Generate markdown filename
markdown_filename = f"{document_version.id}.md"
markdown_object_name = minio_client.generate_object_name(tenant_id,
document_version.doc_id, document_version.language,
document_version.id, markdown_filename)
# Download actual markdown file
file_data = minio_client.download_document_file(
tenant_id,
document_version.bucket_name,
markdown_object_name,
)
# Decodeer de binaire data naar UTF-8 tekst
markdown_content = file_data.decode('utf-8')
# Render de template met de markdown inhoud
return render_template(
'document/view_document_version_markdown.html',
document_version=document_version,
markdown_content=markdown_content
)
except Exception as e:
current_app.logger.error(f"Error retrieving markdown for document version {document_version_id}: {str(e)}")
flash(f"Error retrieving processed document: {str(e)}", "danger")
return redirect(prefixed_url_for('document_bp.document_versions', for_redirect=True))
# Utilities ---------------------------------------------------------------------------------------
@document_bp.route('/library_operations', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def library_operations():
return render_template('document/library_operations.html')
@document_bp.route('/handle_library_selection', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Partner Admin', 'Tenant Admin')
def handle_library_selection():
action = request.form['action']
match action:
case 'create_default_rag_library':
create_default_rag_library()
case 're_embed_latest_versions':
re_embed_latest_versions()
case 'refresh_all_documents':
refresh_all_documents()
return redirect(prefixed_url_for('document_bp.library_operations', for_redirect=True))
def create_default_rag_library():
# Check if no catalog exists. If non exists, no processors, retrievers or specialist can exists
catalogs = Catalog.query.all()
if catalogs:
flash("Default RAG Library can only be created if no catalogs are defined!", 'danger')
return redirect(prefixed_url_for('document_bp.library_operations', for_redirect=True))
timestamp = dt.now(tz=tz.utc)
try:
cat = Catalog(
name='Default RAG Catalog',
description='Default RAG Catalog',
type="STANDARD_CATALOG",
min_chunk_size=1500,
max_chunk_size=2500,
embedding_model="mistral.mistral-embed"
)
set_logging_information(cat, timestamp)
db.session.add(cat)
db.session.commit()
proc = Processor(
name='Default HTML Processor',
description='Default HTML Processor',
catalog_id=cat.id,
type="HTML_PROCESSOR",
configuration={
"html_tags": "p, h1, h2, h3, h4, h5, h6, li, table, thead, tbody, tr, td",
"html_end_tags": "p, li, table",
"html_excluded_classes": "",
"html_excluded_elements": "header, footer, nav, script",
"html_included_elements": "article, main"
}
)
set_logging_information(proc, timestamp)
retr = Retriever(
name='Default RAG Retriever',
description='Default RAG Retriever',
catalog_id=cat.id,
type="STANDARD_RAG",
type_version="1.0",
configuration={
"es_k": "8",
"es_similarity_threshold": 0.3
}
)
set_logging_information(retr, timestamp)
db.session.add(proc)
db.session.add(retr)
db.session.commit()
spec = Specialist(
name='Default RAG Specialist',
description='Default RAG Specialist',
type='STANDARD_RAG_SPECIALIST',
configuration={"temperature": "0.3", "specialist_context": "To be specified"}
)
set_logging_information(spec, timestamp)
db.session.add(spec)
db.session.commit()
spec_retr = SpecialistRetriever(
specialist_id=spec.id,
retriever_id=retr.id,
)
db.session.add(spec_retr)
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
flash(f'Failed to create Default RAG Library. Error: {e}', 'danger')
current_app.logger.error(f'Failed to create Default RAG Library'
f'for tenant {session['tenant']['id']}. Error: {str(e)}')
return redirect(prefixed_url_for('document_bp.library_operations', for_redirect=True))
def refresh_all_documents():
for doc in Document.query.all():
refresh_document(doc.id)
def refresh_document_view(document_id):
new_version, result = refresh_document(document_id, session['tenant']['id'])
if new_version:
flash(f'Document refreshed. New version: {new_version.id}. Task ID: {result}', 'success')
else:
flash(f'Error refreshing document: {result}', 'danger')
return redirect(prefixed_url_for('document_bp.documents', for_redirect=True))
def re_embed_latest_versions():
docs = Document.query.all()
for doc in docs:
latest_doc_version = DocumentVersion.query.filter_by(doc_id=doc.id).order_by(desc(DocumentVersion.id)).first()
if latest_doc_version:
process_version(latest_doc_version.id)
def process_version(version_id):
task = current_celery.send_task('create_embeddings',
args=[session['tenant']['id'], version_id,],
queue='embeddings')
current_app.logger.info(f'Embedding creation retriggered by user {current_user.id}, {current_user.email} '
f'for tenant {session["tenant"]["id"]}, '
f'Document Version {version_id}. '
f'Embedding creation task: {task.id}')
flash(f'Processing for document version {version_id} retriggered successfully...', 'success')
return redirect(prefixed_url_for('document_bp.documents', for_redirect=True))
def set_logging_information(obj, timestamp):
obj.created_at = timestamp
obj.updated_at = timestamp
obj.created_by = current_user.id
obj.updated_by = current_user.id
def update_logging_information(obj, timestamp):
obj.updated_at = timestamp
obj.updated_by = current_user.id
def log_session_state(session, msg=""):
pass
# current_app.logger.info(f"{msg} - Session dirty: {session.dirty}")
# current_app.logger.info(f"{msg} - Session new: {session.new}")
def fetch_html(url):
# Fetches HTML content from a URL
try:
response = requests.get(url)
except SSLError as e:
current_app.logger.error(f"Error fetching HTML from {url} for tenant {session['tenant']['id']}. "
f"Error Encountered: {e}")
if current_app.config.get('DEBUG'): # only allow when in a development environment
current_app.logger.info(f"Skipping SSL verification for {url} for tenant {session['tenant']['id']}. "
f"Only while in development environment.")
response = requests.get(url, verify=False) # Disable SSL verification
else:
response = None
response.raise_for_status() # Will raise an exception for bad requests
return response.content
def clean_markdown(markdown):
"""Functie die triple backticks uit markdown verwijdert"""
markdown = markdown.strip()
if markdown.startswith("```markdown"):
markdown = markdown[len("```markdown"):].strip()
elif markdown.startswith("```"):
markdown = markdown[3:].strip()
if markdown.endswith("```"):
markdown = markdown[:-3].strip()
return markdown