import ast from datetime import datetime as dt, timezone as tz from flask import request, redirect, flash, render_template, Blueprint, session, current_app from flask_security import roles_accepted, current_user from sqlalchemy import desc from sqlalchemy.orm import aliased from werkzeug.utils import secure_filename from sqlalchemy.exc import SQLAlchemyError import requests from requests.exceptions import SSLError import json from common.models.document import Document, DocumentVersion, Catalog, Retriever, Processor from common.extensions import db, cache_manager, minio_client from common.models.interaction import Specialist, SpecialistRetriever from common.utils.document_utils import create_document_stack, start_embedding_task, process_url, \ edit_document, \ edit_document_version, refresh_document, clean_url from common.utils.dynamic_field_utils import create_default_config_from_type_config from common.utils.eveai_exceptions import EveAIInvalidLanguageException, EveAIUnsupportedFileType, \ EveAIDoubleURLException, EveAIException from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, \ CatalogForm, EditCatalogForm, RetrieverForm, EditRetrieverForm, ProcessorForm, EditProcessorForm from common.utils.middleware import mw_before_request from common.utils.celery_utils import current_celery from common.utils.nginx_utils import prefixed_url_for from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro from .document_list_view import DocumentListView from .document_version_list_view import DocumentVersionListView document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @document_bp.before_request def log_before_request(): current_app.logger.debug(f'Before request: {request.path} =====================================') @document_bp.after_request def log_after_request(response): return response @document_bp.before_request def before_request(): try: mw_before_request() except Exception as e: current_app.logger.error(f'Error switching schema in Document Blueprint: {e}') raise @document_bp.route('/catalog', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def catalog(): form = CatalogForm() if form.validate_on_submit(): tenant_id = session.get('tenant').get('id') new_catalog = Catalog() form.populate_obj(new_catalog) set_logging_information(new_catalog, dt.now(tz.utc)) try: db.session.add(new_catalog) db.session.commit() flash('Catalog successfully added!', 'success') current_app.logger.info(f'Catalog {new_catalog.name} successfully added for tenant {tenant_id}!') # Enable step 2 of creation of catalog - add configuration of the catalog (dependent on type) return redirect(prefixed_url_for('document_bp.catalog', catalog_id=new_catalog.id)) except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to add catalog. Error: {e}', 'danger') current_app.logger.error(f'Failed to add catalog {new_catalog.name}' f'for tenant {tenant_id}. Error: {str(e)}') return render_template('document/catalog.html', form=form) @document_bp.route('/catalogs', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def catalogs(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Catalog.query.order_by(Catalog.id) pagination = query.paginate(page=page, per_page=per_page) the_catalogs = pagination.items # prepare table data rows = prepare_table_for_macro(the_catalogs, [('id', ''), ('name', ''), ('type', '')]) # Render the catalogs in a template return render_template('document/catalogs.html', rows=rows, pagination=pagination) @document_bp.route('/handle_catalog_selection', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_catalog_selection(): action = request.form['action'] if action == 'create_catalog': return redirect(prefixed_url_for('document_bp.catalog')) catalog_identification = request.form.get('selected_row') catalog_id = ast.literal_eval(catalog_identification).get('value') catalog = Catalog.query.get_or_404(catalog_id) if action == 'set_session_catalog': current_app.logger.info(f'Setting session catalog to {catalog.name}') session['catalog_id'] = catalog_id session['catalog_name'] = catalog.name current_app.logger.info(f'Finished setting session catalog to {catalog.name}') elif action == 'edit_catalog': return redirect(prefixed_url_for('document_bp.edit_catalog', catalog_id=catalog_id)) return redirect(prefixed_url_for('document_bp.catalogs')) @document_bp.route('/catalog/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def edit_catalog(catalog_id): catalog = Catalog.query.get_or_404(catalog_id) tenant_id = session.get('tenant').get('id') form = EditCatalogForm(request.form, obj=catalog) full_config = cache_manager.catalogs_config_cache.get_config(catalog.type) form.add_dynamic_fields("configuration", full_config, catalog.configuration) if request.method == 'POST' and form.validate_on_submit(): form.populate_obj(catalog) catalog.configuration = form.get_dynamic_data('configuration') update_logging_information(catalog, dt.now(tz.utc)) try: db.session.add(catalog) db.session.commit() flash('Catalog successfully updated successfully!', 'success') current_app.logger.info(f'Catalog {catalog.name} successfully updated for tenant {tenant_id}') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to update catalog. Error: {e}', 'danger') current_app.logger.error(f'Failed to update catalog {catalog_id} for tenant {tenant_id}. Error: {str(e)}') return redirect(prefixed_url_for('document_bp.catalogs')) else: form_validation_failed(request, form) return render_template('document/edit_catalog.html', form=form, catalog_id=catalog_id) @document_bp.route('/processor', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def processor(): form = ProcessorForm() if form.validate_on_submit(): tenant_id = session.get('tenant').get('id') new_processor = Processor() form.populate_obj(new_processor) new_processor.catalog_id = form.catalog.data.id processor_config = cache_manager.processors_config_cache.get_config(new_processor.type) new_processor.configuration = create_default_config_from_type_config( processor_config["configuration"]) set_logging_information(new_processor, dt.now(tz.utc)) try: db.session.add(new_processor) db.session.commit() flash('Processor successfully added!', 'success') current_app.logger.info(f'Processor {new_processor.name} successfully added for tenant {tenant_id}!') # Enable step 2 of creation of retriever - add configuration of the retriever (dependent on type) return redirect(prefixed_url_for('document_bp.edit_processor', processor_id=new_processor.id)) except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to add processor. Error: {e}', 'danger') current_app.logger.error(f'Failed to add retriever {new_processor.name}' f'for tenant {tenant_id}. Error: {str(e)}') return render_template('document/processor.html', form=form) @document_bp.route('/processor/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def edit_processor(processor_id): """Edit an existing processor configuration.""" # Get the processor or return 404 processor = Processor.query.get_or_404(processor_id) if processor.catalog_id: # If catalog_id is just an ID, fetch the Catalog object processor.catalog = Catalog.query.get(processor.catalog_id) else: processor.catalog = None # Create form instance with the processor form = EditProcessorForm(request.form, obj=processor) full_config = cache_manager.processors_config_cache.get_config(processor.type) form.add_dynamic_fields("configuration", full_config, processor.configuration) if form.validate_on_submit(): # Update basic fields form.populate_obj(processor) processor.configuration = form.get_dynamic_data('configuration') # Update catalog relationship processor.catalog_id = form.catalog.data.id if form.catalog.data else None # Update logging information update_logging_information(processor, dt.now(tz.utc)) # Save changes to database try: db.session.add(processor) db.session.commit() flash('Processor updated successfully!', 'success') current_app.logger.info(f'Processor {processor.id} updated successfully') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to update processor. Error: {str(e)}', 'danger') current_app.logger.error(f'Failed to update processor {processor_id}. Error: {str(e)}') return render_template('document/edit_processor.html', form=form, processor_id=processor_id) return redirect(prefixed_url_for('document_bp.processors')) else: form_validation_failed(request, form) return render_template('document/edit_processor.html', form=form, processor_id=processor_id) @document_bp.route('/processors', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def processors(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Processor.query.order_by(Processor.id) pagination = query.paginate(page=page, per_page=per_page) the_processors = pagination.items # prepare table data rows = prepare_table_for_macro(the_processors, [('id', ''), ('name', ''), ('type', ''), ('catalog_id', '')]) # Render the catalogs in a template return render_template('document/processors.html', rows=rows, pagination=pagination) @document_bp.route('/handle_processor_selection', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_processor_selection(): action = request.form['action'] if action == 'create_processor': return redirect(prefixed_url_for('document_bp.processor')) processor_identification = request.form.get('selected_row') processor_id = ast.literal_eval(processor_identification).get('value') if action == 'edit_processor': return redirect(prefixed_url_for('document_bp.edit_processor', processor_id=processor_id)) return redirect(prefixed_url_for('document_bp.processors')) @document_bp.route('/retriever', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def retriever(): form = RetrieverForm() if form.validate_on_submit(): tenant_id = session.get('tenant').get('id') new_retriever = Retriever() form.populate_obj(new_retriever) new_retriever.catalog_id = form.catalog.data.id new_retriever.type_version = cache_manager.retrievers_version_tree_cache.get_latest_version( new_retriever.type) set_logging_information(new_retriever, dt.now(tz.utc)) try: db.session.add(new_retriever) db.session.commit() flash('Retriever successfully added!', 'success') current_app.logger.info(f'Catalog {new_retriever.name} successfully added for tenant {tenant_id}!') # Enable step 2 of creation of retriever - add configuration of the retriever (dependent on type) return redirect(prefixed_url_for('document_bp.edit_retriever', retriever_id=new_retriever.id)) except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to add retriever. Error: {e}', 'danger') current_app.logger.error(f'Failed to add retriever {new_retriever.name}' f'for tenant {tenant_id}. Error: {str(e)}') return render_template('document/retriever.html', form=form) @document_bp.route('/retriever/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def edit_retriever(retriever_id): """Edit an existing retriever configuration.""" # Get the retriever or return 404 retriever = Retriever.query.get_or_404(retriever_id) if retriever.catalog_id: # If catalog_id is just an ID, fetch the Catalog object retriever.catalog = Catalog.query.get(retriever.catalog_id) else: retriever.catalog = None # Create form instance with the retriever form = EditRetrieverForm(request.form, obj=retriever) retriever_config = cache_manager.retrievers_config_cache.get_config(retriever.type, retriever.type_version) configuration_config = retriever_config.get("configuration") form.add_dynamic_fields("configuration", configuration_config, retriever.configuration) if form.validate_on_submit(): # Update basic fields form.populate_obj(retriever) retriever.configuration = form.get_dynamic_data('configuration') # Update catalog relationship retriever.catalog_id = form.catalog.data.id if form.catalog.data else None # Update logging information update_logging_information(retriever, dt.now(tz.utc)) # Save changes to database try: db.session.add(retriever) db.session.commit() flash('Retriever updated successfully!', 'success') current_app.logger.info(f'Retriever {retriever.id} updated successfully') except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to update retriever. Error: {str(e)}', 'danger') current_app.logger.error(f'Failed to update retriever {retriever_id}. Error: {str(e)}') return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id) return redirect(prefixed_url_for('document_bp.retrievers')) else: form_validation_failed(request, form) return render_template('document/edit_retriever.html', form=form, retriever_id=retriever_id) @document_bp.route('/retrievers', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def retrievers(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Retriever.query.order_by(Retriever.id) pagination = query.paginate(page=page, per_page=per_page) the_retrievers = pagination.items # prepare table data rows = prepare_table_for_macro(the_retrievers, [('id', ''), ('name', ''), ('type', ''), ('catalog_id', '')]) # Render the catalogs in a template return render_template('document/retrievers.html', rows=rows, pagination=pagination) @document_bp.route('/handle_retriever_selection', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_retriever_selection(): action = request.form['action'] if action == 'create_retriever': return redirect(prefixed_url_for('document_bp.retriever')) retriever_identification = request.form.get('selected_row') retriever_id = ast.literal_eval(retriever_identification).get('value') if action == 'edit_retriever': return redirect(prefixed_url_for('document_bp.edit_retriever', retriever_id=retriever_id)) return redirect(prefixed_url_for('document_bp.retrievers')) @document_bp.route('/add_document', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def add_document(): form = AddDocumentForm(request.form) catalog_id = session.get('catalog_id', None) if catalog_id is None: flash('You need to set a Session Catalog before adding Documents or URLs', 'warning') return redirect(prefixed_url_for('document_bp.catalogs')) catalog = Catalog.query.get_or_404(catalog_id) if catalog.configuration and len(catalog.configuration) > 0: form.add_dynamic_fields("tagging_fields", catalog.configuration) if form.validate_on_submit(): try: current_app.logger.info(f'Adding Document for {catalog_id}') tenant_id = session['tenant']['id'] file = form.file.data sub_file_type = form.sub_file_type.data filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() catalog_properties = form.get_dynamic_data("tagging_fields") api_input = { 'catalog_id': catalog_id, 'name': form.name.data, 'sub_file_type': form.sub_file_type.data, 'language': form.language.data, 'user_context': form.user_context.data, 'valid_from': form.valid_from.data, 'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None, 'catalog_properties': catalog_properties, } new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id) task_id = start_embedding_task(tenant_id, new_doc_vers.id) flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) except EveAIException as e: flash(str(e), 'error') current_app.logger.error(f"Error adding document: {str(e)}") except Exception as e: current_app.logger.error(f'Error adding document: {str(e)}') flash('An error occurred while adding the document.', 'error') return render_template('document/add_document.html', form=form) @document_bp.route('/add_url', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def add_url(): form = AddURLForm(request.form) catalog_id = session.get('catalog_id', None) if catalog_id is None: flash('You need to set a Session Catalog before adding Documents or URLs', 'warning') return redirect(prefixed_url_for('document_bp.catalogs')) catalog = Catalog.query.get_or_404(catalog_id) if catalog.configuration and len(catalog.configuration) > 0: form.add_dynamic_fields("tagging_fields", catalog.configuration) if form.validate_on_submit(): try: tenant_id = session['tenant']['id'] url = form.url.data url = clean_url(url) file_content, filename, extension = process_url(url, tenant_id) catalog_properties = {} full_config = cache_manager.catalogs_config_cache.get_config(catalog.type) document_version_configurations = full_config['document_version_configurations'] for config in document_version_configurations: catalog_properties[config] = form.get_dynamic_data(config) api_input = { 'catalog_id': catalog_id, 'name': form.name.data or filename, 'sub_file_type': form.sub_file_type.data, 'url': url, 'language': form.language.data, 'user_context': form.user_context.data, 'valid_from': form.valid_from.data, 'user_metadata': json.loads(form.user_metadata.data) if form.user_metadata.data else None, 'catalog_properties': catalog_properties, } new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id) task_id = start_embedding_task(tenant_id, new_doc_vers.id) flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) except EveAIException as e: current_app.logger.error(f"Error adding document: {str(e)}") flash(str(e), 'error') except Exception as e: current_app.logger.error(f'Error adding document: {str(e)}') flash('An error occurred while adding the document.', 'error') return render_template('document/add_url.html', form=form) @document_bp.route('/documents', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def documents(): view = DocumentListView(Document, 'document/documents.html', per_page=10) return view.get() @document_bp.route('/handle_document_selection', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_document_selection(): document_identification = request.form['selected_row'] if isinstance(document_identification, int) or document_identification.isdigit(): doc_id = int(document_identification) else: # If it's not an integer, assume it's a string representation of a dictionary try: doc_id = ast.literal_eval(document_identification).get('value') except (ValueError, AttributeError): flash('Invalid document selection.', 'error') return redirect(prefixed_url_for('document_bp.documents')) action = request.form['action'] match action: case 'edit_document': return redirect(prefixed_url_for('document_bp.edit_document_view', document_id=doc_id)) case 'document_versions': return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 'refresh_document': refresh_document_view(doc_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 're_embed_latest_versions': re_embed_latest_versions() # Add more conditions for other actions return redirect(prefixed_url_for('document_bp.documents')) @document_bp.route('/edit_document/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def edit_document_view(document_id): # Use an alias for the Catalog to avoid column name conflicts CatalogAlias = aliased(Catalog) # Query for the document and its catalog result = db.session.query(Document, CatalogAlias.name.label('catalog_name')) \ .join(CatalogAlias, Document.catalog_id == CatalogAlias.id) \ .filter(Document.id == document_id) \ .first_or_404() doc, catalog_name = result form = EditDocumentForm(obj=doc) if request.method == 'GET': # Populate form with current values form.name.data = doc.name form.valid_from.data = doc.valid_from form.valid_to.data = doc.valid_to if form.validate_on_submit(): updated_doc, error = edit_document( session.get('tenant').get('id', 0), document_id, form.name.data, form.valid_from.data, form.valid_to.data ) if updated_doc: flash(f'Document {updated_doc.id} updated successfully', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: flash(f'Error updating document: {error}', 'danger') else: form_validation_failed(request, form) return render_template('document/edit_document.html', form=form, document_id=document_id, catalog_name=catalog_name) @document_bp.route('/edit_document_version/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def edit_document_version_view(document_version_id): doc_vers = DocumentVersion.query.get_or_404(document_version_id) form = EditDocumentVersionForm(request.form, obj=doc_vers) doc_vers = DocumentVersion.query.get_or_404(document_version_id) catalog_id = doc_vers.document.catalog_id catalog = Catalog.query.get_or_404(catalog_id) if catalog.configuration and len(catalog.configuration) > 0: full_config = cache_manager.catalogs_config_cache.get_config(catalog.type) document_version_configurations = full_config['document_version_configurations'] for config in document_version_configurations: form.add_dynamic_fields(config, full_config, doc_vers.catalog_properties[config]) if form.validate_on_submit(): catalog_properties = {} # Use the full_config variable we already defined for config in document_version_configurations: catalog_properties[config] = form.get_dynamic_data(config) updated_version, error = edit_document_version( session.get('tenant').get('id', 0), document_version_id, form.user_context.data, catalog_properties, ) if updated_version: flash(f'Document Version {updated_version.id} updated successfully', 'success') return redirect(prefixed_url_for('document_bp.document_versions', document_id=updated_version.doc_id)) else: flash(f'Error updating document version: {error}', 'danger') else: form_validation_failed(request, form) return render_template('document/edit_document_version.html', form=form, document_version_id=document_version_id, doc_details=f'Document {doc_vers.document.name}') @document_bp.route('/document_versions/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def document_versions(document_id): doc = Document.query.get_or_404(document_id) doc_desc = f'Document {doc.name}' page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = (DocumentVersion.query.filter_by(doc_id=document_id) .order_by(DocumentVersion.language) .order_by(desc(DocumentVersion.id))) pagination = query.paginate(page=page, per_page=per_page, error_out=False) doc_langs = pagination.items rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''), ('object_name', ''), ('file_type', ''), ('processing', ''), ('processing_started_at', ''), ('processing_finished_at', ''), ('processing_error', '')]) return render_template('document/document_versions.html', rows=rows, pagination=pagination, document=doc_desc) @document_bp.route('/handle_document_version_selection', methods=['POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_document_version_selection(): document_version_identification = request.form['selected_row'] if isinstance(document_version_identification, int) or document_version_identification.isdigit(): doc_vers_id = int(document_version_identification) else: # If it's not an integer, assume it's a string representation of a dictionary try: doc_vers_id = ast.literal_eval(document_version_identification).get('value') except (ValueError, AttributeError): flash('Invalid document version selection.', 'error') return redirect(prefixed_url_for('document_bp.document_versions_list')) action = request.form['action'] current_app.logger.debug(f'Action: {action}') match action: case 'edit_document_version': return redirect(prefixed_url_for('document_bp.edit_document_version_view', document_version_id=doc_vers_id)) case 'process_document_version': process_version(doc_vers_id) # Add more conditions for other actions case 'view_document_version_markdown': return redirect(prefixed_url_for('document_bp.view_document_version_markdown', document_version_id=doc_vers_id)) doc_vers = DocumentVersion.query.get_or_404(doc_vers_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_vers.doc_id)) @document_bp.route('/library_operations', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def library_operations(): return render_template('document/library_operations.html') @document_bp.route('/handle_library_selection', methods=['GET', 'POST']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def handle_library_selection(): action = request.form['action'] match action: case 'create_default_rag_library': create_default_rag_library() case 're_embed_latest_versions': re_embed_latest_versions() case 'refresh_all_documents': refresh_all_documents() return redirect(prefixed_url_for('document_bp.library_operations')) def create_default_rag_library(): # Check if no catalog exists. If non exists, no processors, retrievers or specialist can exists catalogs = Catalog.query.all() if catalogs: flash("Default RAG Library can only be created if no catalogs are defined!", 'danger') return redirect(prefixed_url_for('document_bp.library_operations')) timestamp = dt.now(tz=tz.utc) try: cat = Catalog( name='Default RAG Catalog', description='Default RAG Catalog', type="STANDARD_CATALOG", min_chunk_size=1500, max_chunk_size=2500, embedding_model="mistral.mistral-embed" ) set_logging_information(cat, timestamp) db.session.add(cat) db.session.commit() proc = Processor( name='Default HTML Processor', description='Default HTML Processor', catalog_id=cat.id, type="HTML_PROCESSOR", configuration={ "html_tags": "p, h1, h2, h3, h4, h5, h6, li, table, thead, tbody, tr, td", "html_end_tags": "p, li, table", "html_excluded_classes": "", "html_excluded_elements": "header, footer, nav, script", "html_included_elements": "article, main" } ) set_logging_information(proc, timestamp) retr = Retriever( name='Default RAG Retriever', description='Default RAG Retriever', catalog_id=cat.id, type="STANDARD_RAG", type_version="1.0", configuration={ "es_k": "8", "es_similarity_threshold": 0.3 } ) set_logging_information(retr, timestamp) db.session.add(proc) db.session.add(retr) db.session.commit() spec = Specialist( name='Default RAG Specialist', description='Default RAG Specialist', type='STANDARD_RAG_SPECIALIST', configuration={"temperature": "0.3", "specialist_context": "To be specified"} ) set_logging_information(spec, timestamp) db.session.add(spec) db.session.commit() spec_retr = SpecialistRetriever( specialist_id=spec.id, retriever_id=retr.id, ) db.session.add(spec_retr) db.session.commit() except SQLAlchemyError as e: db.session.rollback() flash(f'Failed to create Default RAG Library. Error: {e}', 'danger') current_app.logger.error(f'Failed to create Default RAG Library' f'for tenant {session['tenant']['id']}. Error: {str(e)}') return redirect(prefixed_url_for('document_bp.library_operations')) @document_bp.route('/document_versions_list', methods=['GET']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def document_versions_list(): view = DocumentVersionListView(DocumentVersion, 'document/document_versions_list_view.html', per_page=20) return view.get() @document_bp.route('/view_document_version_markdown/', methods=['GET']) @roles_accepted('Super User', 'Partner Admin', 'Tenant Admin') def view_document_version_markdown(document_version_id): current_app.logger.debug(f'Viewing document version markdown {document_version_id}') # Retrieve document version document_version = DocumentVersion.query.get_or_404(document_version_id) # retrieve tenant information tenant_id = session.get('tenant').get('id') try: # Generate markdown filename markdown_filename = f"{document_version.id}.md" markdown_object_name = minio_client.generate_object_name(document_version.doc_id, document_version.language, document_version.id, markdown_filename) current_app.logger.debug(f'Markdown object name: {markdown_object_name}') # Download actual markdown file file_data = minio_client.download_document_file( tenant_id, document_version.bucket_name, markdown_object_name, ) # Decodeer de binaire data naar UTF-8 tekst markdown_content = file_data.decode('utf-8') current_app.logger.debug(f'Markdown content: {markdown_content}') # Render de template met de markdown inhoud return render_template( 'document/view_document_version_markdown.html', document_version=document_version, markdown_content=markdown_content ) except Exception as e: current_app.logger.error(f"Error retrieving markdown for document version {document_version_id}: {str(e)}") flash(f"Error retrieving processed document: {str(e)}", "danger") return redirect(prefixed_url_for('document_bp.document_versions')) def refresh_all_documents(): for doc in Document.query.all(): refresh_document(doc.id) def refresh_document_view(document_id): new_version, result = refresh_document(document_id, session['tenant']['id']) if new_version: flash(f'Document refreshed. New version: {new_version.id}. Task ID: {result}', 'success') else: flash(f'Error refreshing document: {result}', 'danger') return redirect(prefixed_url_for('document_bp.documents')) def re_embed_latest_versions(): docs = Document.query.all() for doc in docs: latest_doc_version = DocumentVersion.query.filter_by(doc_id=doc.id).order_by(desc(DocumentVersion.id)).first() if latest_doc_version: process_version(latest_doc_version.id) def process_version(version_id): task = current_celery.send_task('create_embeddings', args=[session['tenant']['id'], version_id,], queue='embeddings') current_app.logger.info(f'Embedding creation retriggered by user {current_user.id}, {current_user.email} ' f'for tenant {session["tenant"]["id"]}, ' f'Document Version {version_id}. ' f'Embedding creation task: {task.id}') flash(f'Processing for document version {version_id} retriggered successfully...', 'success') return redirect(prefixed_url_for('document_bp.documents')) def set_logging_information(obj, timestamp): obj.created_at = timestamp obj.updated_at = timestamp obj.created_by = current_user.id obj.updated_by = current_user.id def update_logging_information(obj, timestamp): obj.updated_at = timestamp obj.updated_by = current_user.id def log_session_state(session, msg=""): pass # current_app.logger.info(f"{msg} - Session dirty: {session.dirty}") # current_app.logger.info(f"{msg} - Session new: {session.new}") def fetch_html(url): # Fetches HTML content from a URL try: response = requests.get(url) except SSLError as e: current_app.logger.error(f"Error fetching HTML from {url} for tenant {session['tenant']['id']}. " f"Error Encountered: {e}") if current_app.config.get('DEBUG'): # only allow when in a development environment current_app.logger.info(f"Skipping SSL verification for {url} for tenant {session['tenant']['id']}. " f"Only while in development environment.") response = requests.get(url, verify=False) # Disable SSL verification else: response = None response.raise_for_status() # Will raise an exception for bad requests return response.content def clean_markdown(markdown): """Functie die triple backticks uit markdown verwijdert""" markdown = markdown.strip() if markdown.startswith("```markdown"): markdown = markdown[len("```markdown"):].strip() elif markdown.startswith("```"): markdown = markdown[3:].strip() if markdown.endswith("```"): markdown = markdown[:-3].strip() return markdown