import ast from datetime import datetime as dt, timezone as tz from babel.messages.setuptools_frontend import update_catalog from flask import request, redirect, flash, render_template, Blueprint, session, current_app from flask_security import roles_accepted, current_user from sqlalchemy import desc from sqlalchemy.orm import aliased from werkzeug.utils import secure_filename from sqlalchemy.exc import SQLAlchemyError import requests from requests.exceptions import SSLError from urllib.parse import urlparse, unquote import io import json from common.models.document import Document, DocumentVersion, Catalog, Retriever from common.extensions import db, minio_client from common.utils.document_utils import validate_file_type, create_document_stack, start_embedding_task, process_url, \ process_multiple_urls, get_documents_list, edit_document, \ edit_document_version, refresh_document from common.utils.eveai_exceptions import EveAIInvalidLanguageException, EveAIUnsupportedFileType, \ EveAIDoubleURLException from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, AddURLsForm, \ CatalogForm, EditCatalogForm, RetrieverForm, EditRetrieverForm from common.utils.middleware import mw_before_request from common.utils.celery_utils import current_celery from common.utils.nginx_utils import prefixed_url_for from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro, form_to_dict from .document_list_view import DocumentListView from .document_version_list_view import DocumentVersionListView from config.catalog_types import CATALOG_TYPES from config.retriever_types import RETRIEVER_TYPES document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @document_bp.before_request def log_before_request(): current_app.logger.debug(f"Before request (document_bp): {request.method} {request.url}") @document_bp.after_request def log_after_request(response): current_app.logger.debug( f"After request (document_bp): {request.method} {request.url} - Status: {response.status}") return response @document_bp.before_request def before_request(): try: mw_before_request() except Exception as e: current_app.logger.error(f'Error switching schema in Document Blueprint: {e}') for role in current_user.roles: current_app.logger.debug(f'User {current_user.email} has role {role.name}') raise @document_bp.route('/catalog', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def catalog(): form = CatalogForm() if form.validate_on_submit(): tenant_id = session.get('tenant').get('id') new_catalog = Catalog() form.populate_obj(new_catalog) # Handle Embedding Variables new_catalog.html_tags = [tag.strip() for tag in form.html_tags.data.split(',')] if form.html_tags.data else [] new_catalog.html_end_tags = [tag.strip() for tag in form.html_end_tags.data.split(',')] \ if form.html_end_tags.data else [] new_catalog.html_included_elements = [tag.strip() for tag in form.html_included_elements.data.split(',')] \ if form.html_included_elements.data else [] new_catalog.html_excluded_elements = [tag.strip() for tag in form.html_excluded_elements.data.split(',')] \ if form.html_excluded_elements.data else [] new_catalog.html_excluded_classes = [cls.strip() for cls in form.html_excluded_classes.data.split(',')] \ if form.html_excluded_classes.data else [] set_logging_information(new_catalog, dt.now(tz.utc)) try: db.session.add(new_catalog) db.session.commit() flash('Catalog successfully added!', 'success') current_app.logger.info(f'Catalog {new_catalog.name} successfully added for tenant {tenant_id}!') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to add catalog. Error: {e}', 'danger') current_app.logger.error(f'Failed to add catalog {new_catalog.name}' f'for tenant {tenant_id}. Error: {str(e)}') return render_template('document/catalog.html', form=form) @document_bp.route('/catalogs', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def catalogs(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Catalog.query.order_by(Catalog.id) pagination = query.paginate(page=page, per_page=per_page) the_catalogs = pagination.items # prepare table data rows = prepare_table_for_macro(the_catalogs, [('id', ''), ('name', ''), ('type', '')]) # Render the catalogs in a template return render_template('document/catalogs.html', rows=rows, pagination=pagination) @document_bp.route('/handle_catalog_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_catalog_selection(): catalog_identification = request.form.get('selected_row') catalog_id = ast.literal_eval(catalog_identification).get('value') action = request.form['action'] catalog = Catalog.query.get_or_404(catalog_id) if action == 'set_session_catalog': current_app.logger.info(f'Setting session catalog to {catalog.name}') session['catalog_id'] = catalog_id session['catalog_name'] = catalog.name current_app.logger.info(f'Finished setting session catalog to {catalog.name}') elif action == 'edit_catalog': return redirect(prefixed_url_for('document_bp.edit_catalog', catalog_id=catalog_id)) return redirect(prefixed_url_for('document_bp.catalogs')) @document_bp.route('/catalog/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_catalog(catalog_id): catalog = Catalog.query.get_or_404(catalog_id) tenant_id = session.get('tenant').get('id') form = EditCatalogForm(request.form, obj=catalog) configuration_config = CATALOG_TYPES[catalog.type]["configuration"] form.add_dynamic_fields("configuration", configuration_config, catalog.configuration) # Convert arrays to comma-separated strings for display if request.method == 'GET': form.html_tags.data = ', '.join(catalog.html_tags or '') form.html_end_tags.data = ', '.join(catalog.html_end_tags or '') form.html_included_elements.data = ', '.join(catalog.html_included_elements or '') form.html_excluded_elements.data = ', '.join(catalog.html_excluded_elements or '') form.html_excluded_classes.data = ', '.join(catalog.html_excluded_classes or '') if request.method == 'POST' and form.validate_on_submit(): form.populate_obj(catalog) # Handle Embedding Variables catalog.html_tags = [tag.strip() for tag in form.html_tags.data.split(',')] if form.html_tags.data else [] catalog.html_end_tags = [tag.strip() for tag in form.html_end_tags.data.split(',')] \ if form.html_end_tags.data else [] catalog.html_included_elements = [tag.strip() for tag in form.html_included_elements.data.split(',')] \ if form.html_included_elements.data else [] catalog.html_excluded_elements = [tag.strip() for tag in form.html_excluded_elements.data.split(',')] \ if form.html_excluded_elements.data else [] catalog.html_excluded_classes = [cls.strip() for cls in form.html_excluded_classes.data.split(',')] \ if form.html_excluded_classes.data else [] catalog.configuration = form.get_dynamic_data('configuration') update_logging_information(catalog, dt.now(tz.utc)) try: db.session.add(catalog) db.session.commit() flash('Catalog successfully updated successfully!', 'success') current_app.logger.info(f'Catalog {catalog.name} successfully updated for tenant {tenant_id}') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to update catalog. Error: {e}', 'danger') current_app.logger.error(f'Failed to update catalog {catalog_id} for tenant {tenant_id}. Error: {str(e)}') return redirect(prefixed_url_for('document_bp.catalogs')) else: form_validation_failed(request, form) return render_template('document/edit_catalog.html', form=form, catalog_id=catalog_id) @document_bp.route('/retriever', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def retriever(): form = RetrieverForm() if form.validate_on_submit(): tenant_id = session.get('tenant').get('id') new_retriever = Retriever() form.populate_obj(new_retriever) new_retriever.catalog_id = form.catalog.data.id set_logging_information(new_retriever, dt.now(tz.utc)) try: db.session.add(new_retriever) db.session.commit() flash('Retriever successfully added!', 'success') current_app.logger.info(f'Catalog {new_retriever.name} successfully added for tenant {tenant_id}!') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to add retriever. Error: {e}', 'danger') current_app.logger.error(f'Failed to add retriever {new_retriever.name}' f'for tenant {tenant_id}. Error: {str(e)}') # Enable step 2 of creation of retriever - add configuration of the retriever (dependent on type) return redirect(prefixed_url_for('document_bp.retriever', retriever_id=new_retriever.id)) return render_template('document/retriever.html', form=form) @document_bp.route('/retriever/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_retriever(retriever_id): """Edit an existing retriever configuration.""" # Get the retriever or return 404 retriever = Retriever.query.get_or_404(retriever_id) if retriever.catalog_id: # If catalog_id is just an ID, fetch the Catalog object retriever.catalog = Catalog.query.get(retriever.catalog_id) else: retriever.catalog = None # Create form instance with the retriever form = EditRetrieverForm(request.form, obj=retriever) configuration_config = RETRIEVER_TYPES[retriever.type]["configuration"] form.add_dynamic_fields("configuration", configuration_config, retriever.configuration) if request.method == 'POST': current_app.logger.debug(f'Received POST request with {request.form}') if form.validate_on_submit(): # Update basic fields form.populate_obj(retriever) retriever.configuration = form.get_dynamic_data('configuration') # Update catalog relationship retriever.catalog_id = form.catalog.data.id if form.catalog.data else None # Update logging information update_logging_information(retriever, dt.now(tz.utc)) # Save changes to database try: db.session.add(retriever) db.session.commit() flash('Retriever updated successfully!', 'success') current_app.logger.info(f'Retriever {retriever.id} updated successfully') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to update retriever. Error: {str(e)}', 'danger') current_app.logger.error(f'Failed to update retriever {retriever_id}. Error: {str(e)}') return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id) return redirect(prefixed_url_for('document_bp.retrievers')) else: form_validation_failed(request, form) current_app.logger.debug(f"Rendering Template for {retriever_id}") return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id) @document_bp.route('/retrievers', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def retrievers(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Retriever.query.order_by(Retriever.id) pagination = query.paginate(page=page, per_page=per_page) the_retrievers = pagination.items # prepare table data rows = prepare_table_for_macro(the_retrievers, [('id', ''), ('name', ''), ('type', ''), ('catalog_id', '')]) # Render the catalogs in a template return render_template('document/retrievers.html', rows=rows, pagination=pagination) @document_bp.route('/handle_retriever_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_retriever_selection(): retriever_identification = request.form.get('selected_row') retriever_id = ast.literal_eval(retriever_identification).get('value') action = request.form['action'] if action == 'edit_retriever': return redirect(prefixed_url_for('document_bp.edit_retriever', retriever_id=retriever_id)) return redirect(prefixed_url_for('document_bp.retrievers')) @document_bp.route('/add_document', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_document(): form = AddDocumentForm(request.form) catalog_id = session.get('catalog_id', None) if catalog_id is None: flash('You need to set a Session Catalog before adding Documents or URLs') return redirect(prefixed_url_for('document_bp.catalogs')) catalog = Catalog.query.get_or_404(catalog_id) if catalog.configuration and len(catalog.configuration) > 0: document_version_configurations = CATALOG_TYPES[catalog.type]['document_version_configurations'] for config in document_version_configurations: form.add_dynamic_fields(config, catalog.configuration[config]) if form.validate_on_submit(): try: current_app.logger.info(f'Adding Document for {catalog_id}') tenant_id = session['tenant']['id'] file = form.file.data filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() validate_file_type(extension) catalog_properties = {} document_version_configurations = CATALOG_TYPES[catalog.type]['document_version_configurations'] for config in document_version_configurations: catalog_properties[config] = form.get_dynamic_data(config) api_input = { 'catalog_id': catalog_id, 'name': form.name.data, 'language': form.language.data, 'user_context': form.user_context.data, 'valid_from': form.valid_from.data, 'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None, 'catalog_properties': catalog_properties, } current_app.logger.debug(f'Creating document stack with input {api_input}') new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id) task_id = start_embedding_task(tenant_id, new_doc_vers.id) flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: flash(str(e), 'error') except Exception as e: current_app.logger.error(f'Error adding document: {str(e)}') flash('An error occurred while adding the document.', 'error') return render_template('document/add_document.html', form=form) @document_bp.route('/add_url', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_url(): form = AddURLForm(request.form) catalog_id = session.get('catalog_id', None) if catalog_id is None: flash('You need to set a Session Catalog before adding Documents or URLs') return redirect(prefixed_url_for('document_bp.catalogs')) catalog = Catalog.query.get_or_404(catalog_id) if catalog.configuration and len(catalog.configuration) > 0: document_version_configurations = CATALOG_TYPES[catalog.type]['document_version_configurations'] for config in document_version_configurations: form.add_dynamic_fields(config, catalog.configuration[config]) if form.validate_on_submit(): try: tenant_id = session['tenant']['id'] url = form.url.data file_content, filename, extension = process_url(url, tenant_id) catalog_properties = {} document_version_configurations = CATALOG_TYPES[catalog.type]['document_version_configurations'] for config in document_version_configurations: catalog_properties[config] = form.get_dynamic_data(config) api_input = { 'catalog_id': catalog_id, 'name': form.name.data or filename, 'url': url, 'language': form.language.data, 'user_context': form.user_context.data, 'valid_from': form.valid_from.data, 'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None, 'catalog_properties': catalog_properties, } new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id) task_id = start_embedding_task(tenant_id, new_doc_vers.id) flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) except EveAIDoubleURLException: flash(f'A document with url {url} already exists. No new document created.', 'info') except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: flash(str(e), 'error') except Exception as e: current_app.logger.error(f'Error adding document: {str(e)}') flash('An error occurred while adding the document.', 'error') return render_template('document/add_url.html', form=form) @document_bp.route('/documents', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def documents(): view = DocumentListView(Document, 'document/documents.html', per_page=10) return view.get() @document_bp.route('/handle_document_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_document_selection(): document_identification = request.form['selected_row'] if isinstance(document_identification, int) or document_identification.isdigit(): doc_id = int(document_identification) else: # If it's not an integer, assume it's a string representation of a dictionary try: doc_id = ast.literal_eval(document_identification).get('value') except (ValueError, AttributeError): flash('Invalid document selection.', 'error') return redirect(prefixed_url_for('document_bp.documents')) action = request.form['action'] match action: case 'edit_document': return redirect(prefixed_url_for('document_bp.edit_document_view', document_id=doc_id)) case 'document_versions': return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 'refresh_document': refresh_document_view(doc_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 're_embed_latest_versions': re_embed_latest_versions() # Add more conditions for other actions return redirect(prefixed_url_for('document_bp.documents')) @document_bp.route('/edit_document/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_document_view(document_id): # Use an alias for the Catalog to avoid column name conflicts CatalogAlias = aliased(Catalog) # Query for the document and its catalog result = db.session.query(Document, CatalogAlias.name.label('catalog_name')) \ .join(CatalogAlias, Document.catalog_id == CatalogAlias.id) \ .filter(Document.id == document_id) \ .first_or_404() doc, catalog_name = result form = EditDocumentForm(obj=doc) if request.method == 'GET': # Populate form with current values form.name.data = doc.name form.valid_from.data = doc.valid_from form.valid_to.data = doc.valid_to if form.validate_on_submit(): updated_doc, error = edit_document( document_id, form.name.data, form.valid_from.data, form.valid_to.data ) if updated_doc: flash(f'Document {updated_doc.id} updated successfully', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: flash(f'Error updating document: {error}', 'danger') else: form_validation_failed(request, form) return render_template('document/edit_document.html', form=form, document_id=document_id, catalog_name=catalog_name) @document_bp.route('/edit_document_version/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_document_version_view(document_version_id): doc_vers = DocumentVersion.query.get_or_404(document_version_id) form = EditDocumentVersionForm(request.form, obj=doc_vers) catalog_id = session.get('catalog_id', None) if catalog_id is None: flash('You need to set a Session Catalog before adding Documents or URLs') return redirect(prefixed_url_for('document_bp.catalogs')) catalog = Catalog.query.get_or_404(catalog_id) if catalog.configuration and len(catalog.configuration) > 0: document_version_configurations = CATALOG_TYPES[catalog.type]['document_version_configurations'] for config in document_version_configurations: form.add_dynamic_fields(config, catalog.configuration[config], doc_vers.catalog_properties[config]) if form.validate_on_submit(): catalog_properties = {} document_version_configurations = CATALOG_TYPES[catalog.type]['document_version_configurations'] for config in document_version_configurations: catalog_properties[config] = form.get_dynamic_data(config) updated_version, error = edit_document_version( document_version_id, form.user_context.data, catalog_properties, ) if updated_version: flash(f'Document Version {updated_version.id} updated successfully', 'success') return redirect(prefixed_url_for('document_bp.document_versions', document_id=updated_version.doc_id)) else: flash(f'Error updating document version: {error}', 'danger') else: form_validation_failed(request, form) return render_template('document/edit_document_version.html', form=form, document_version_id=document_version_id, doc_details=f'Document {doc_vers.document.name}') @document_bp.route('/document_versions/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def document_versions(document_id): doc = Document.query.get_or_404(document_id) doc_desc = f'Document {doc.name}' page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = (DocumentVersion.query.filter_by(doc_id=document_id) .order_by(DocumentVersion.language) .order_by(desc(DocumentVersion.id))) pagination = query.paginate(page=page, per_page=per_page, error_out=False) doc_langs = pagination.items rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''), ('object_name', ''), ('file_type', ''), ('processing', ''), ('processing_started_at', ''), ('processing_finished_at', ''), ('processing_error', '')]) return render_template('document/document_versions.html', rows=rows, pagination=pagination, document=doc_desc) @document_bp.route('/handle_document_version_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_document_version_selection(): document_version_identification = request.form['selected_row'] if isinstance(document_version_identification, int) or document_version_identification.isdigit(): doc_vers_id = int(document_version_identification) else: # If it's not an integer, assume it's a string representation of a dictionary try: doc_vers_id = ast.literal_eval(document_version_identification).get('value') except (ValueError, AttributeError): flash('Invalid document version selection.', 'error') return redirect(prefixed_url_for('document_bp.document_versions_list')) action = request.form['action'] current_app.logger.debug(f'Triggered Document Version Action: {action}') match action: case 'edit_document_version': return redirect(prefixed_url_for('document_bp.edit_document_version_view', document_version_id=doc_vers_id)) case 'process_document_version': process_version(doc_vers_id) # Add more conditions for other actions doc_vers = DocumentVersion.query.get_or_404(doc_vers_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_vers.doc_id)) @document_bp.route('/library_operations', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def library_operations(): return render_template('document/library_operations.html') @document_bp.route('/handle_library_selection', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_library_selection(): action = request.form['action'] match action: case 're_embed_latest_versions': re_embed_latest_versions() case 'refresh_all_documents': refresh_all_documents() return redirect(prefixed_url_for('document_bp.library_operations')) @document_bp.route('/document_versions_list', methods=['GET']) @roles_accepted('Super User', 'Tenant Admin') def document_versions_list(): current_app.logger.debug('Getting document versions list') view = DocumentVersionListView(DocumentVersion, 'document/document_versions_list_view.html', per_page=20) current_app.logger.debug('Got document versions list') return view.get() def refresh_all_documents(): for doc in Document.query.all(): refresh_document(doc.id) def refresh_document_view(document_id): new_version, result = refresh_document(document_id, session['tenant']['id']) if new_version: flash(f'Document refreshed. New version: {new_version.id}. Task ID: {result}', 'success') else: flash(f'Error refreshing document: {result}', 'danger') return redirect(prefixed_url_for('document_bp.documents')) def re_embed_latest_versions(): docs = Document.query.all() for doc in docs: latest_doc_version = DocumentVersion.query.filter_by(doc_id=doc.id).order_by(desc(DocumentVersion.id)).first() if latest_doc_version: process_version(latest_doc_version.id) def process_version(version_id): task = current_celery.send_task('create_embeddings', args=[session['tenant']['id'], version_id,], queue='embeddings') current_app.logger.info(f'Embedding creation retriggered by user {current_user.id}, {current_user.email} ' f'for tenant {session["tenant"]["id"]}, ' f'Document Version {version_id}. ' f'Embedding creation task: {task.id}') flash(f'Processing for document version {version_id} retriggered successfully...', 'success') return redirect(prefixed_url_for('document_bp.documents')) def set_logging_information(obj, timestamp): obj.created_at = timestamp obj.updated_at = timestamp obj.created_by = current_user.id obj.updated_by = current_user.id def update_logging_information(obj, timestamp): obj.updated_at = timestamp obj.updated_by = current_user.id def log_session_state(session, msg=""): current_app.logger.debug(f"{msg} - Session dirty: {session.dirty}") current_app.logger.debug(f"{msg} - Session new: {session.new}") def fetch_html(url): # Fetches HTML content from a URL try: response = requests.get(url) except SSLError as e: current_app.logger.error(f"Error fetching HTML from {url} for tenant {session['tenant']['id']}. " f"Error Encountered: {e}") if current_app.config.get('DEBUG'): # only allow when in a development environment current_app.logger.info(f"Skipping SSL verification for {url} for tenant {session['tenant']['id']}. " f"Only while in development environment.") response = requests.get(url, verify=False) # Disable SSL verification else: response = None response.raise_for_status() # Will raise an exception for bad requests return response.content