import ast import os from datetime import datetime as dt, timezone as tz import chardet from flask import request, redirect, flash, render_template, Blueprint, session, current_app from flask_security import roles_accepted, current_user from sqlalchemy import desc from sqlalchemy.orm import joinedload from werkzeug.datastructures import FileStorage from werkzeug.utils import secure_filename from sqlalchemy.exc import SQLAlchemyError import requests from requests.exceptions import SSLError from urllib.parse import urlparse import io from minio.error import S3Error from common.models.document import Document, DocumentVersion from common.extensions import db, minio_client from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, AddYoutubeForm, \ AddURLsForm from common.utils.middleware import mw_before_request from common.utils.celery_utils import current_celery from common.utils.nginx_utils import prefixed_url_for from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro, form_to_dict document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @document_bp.before_request def log_before_request(): current_app.logger.debug(f"Before request (document_bp): {request.method} {request.url}") @document_bp.after_request def log_after_request(response): current_app.logger.debug( f"After request (document_bp): {request.method} {request.url} - Status: {response.status}") return response @document_bp.before_request def before_request(): try: mw_before_request() except Exception as e: current_app.logger.error(f'Error switching schema in Document Blueprint: {e}') for role in current_user.roles: current_app.logger.debug(f'User {current_user.email} has role {role.name}') raise @document_bp.route('/add_document', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_document(): form = AddDocumentForm() # If the form is submitted if form.validate_on_submit(): current_app.logger.info(f'Adding document for tenant {session["tenant"]["id"]}') file = form.file.data filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() form_dict = form_to_dict(form) new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: form_validation_failed(request, form) return render_template('document/add_document.html', form=form) @document_bp.route('/add_url', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_url(): form = AddURLForm() # If the form is submitted if form.validate_on_submit(): current_app.logger.info(f'Adding url for tenant {session["tenant"]["id"]}') url = form.url.data doc_vers = DocumentVersion.query.filter_by(url=url).all() if doc_vers: current_app.logger.info(f'A document with url {url} already exists. No new document created.') flash(f'A document with url {url} already exists. No new document created.', 'info') return redirect(prefixed_url_for('document_bp.documents')) # Only when no document with URL exists html = fetch_html(url) file = io.BytesIO(html) parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') filename = path_parts[-1] if filename == '': filename = 'index' if not filename.endswith('.html'): filename += '.html' extension = 'html' form_dict = form_to_dict(form) new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: form_validation_failed(request, form) return render_template('document/add_url.html', form=form) @document_bp.route('/add_urls', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_urls(): form = AddURLsForm() if form.validate_on_submit(): urls = form.urls.data.split('\n') urls = [url.strip() for url in urls if url.strip()] for i, url in enumerate(urls): try: doc_vers = DocumentVersion.query.filter_by(url=url).all() if doc_vers: current_app.logger.info(f'A document with url {url} already exists. No new document created.') flash(f'A document with url {url} already exists. No new document created.', 'info') continue html = fetch_html(url) file = io.BytesIO(html) parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') filename = path_parts[-1] if path_parts[-1] else 'index' if not filename.endswith('.html'): filename += '.html' # Use the name prefix if provided, otherwise use the filename doc_name = f"{form.name.data}-{filename}" if form.name.data else filename new_doc, new_doc_vers = create_document_stack({ 'name': doc_name, 'url': url, 'language': form.language.data, 'user_context': form.user_context.data, 'valid_from': form.valid_from.data }, file, filename, 'html') task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') except Exception as e: current_app.logger.error(f"Error processing URL {url}: {str(e)}") flash(f'Error processing URL {url}: {str(e)}', 'danger') return redirect(prefixed_url_for('document_bp.documents')) else: form_validation_failed(request, form) return render_template('document/add_urls.html', form=form) @document_bp.route('/add_youtube', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_youtube(): form = AddYoutubeForm() if form.validate_on_submit(): current_app.logger.info(f'Adding Youtube document for tenant {session["tenant"]["id"]}') url = form.url.data current_app.logger.debug(f'Value of language field: {form.language.data}') doc_vers = DocumentVersion.query.filter_by(url=url).all() if doc_vers: current_app.logger.info(f'A document with url {url} already exists. No new document created.') flash(f'A document with url {url} already exists. No new document created.', 'info') return redirect(prefixed_url_for('document_bp.documents')) # As downloading a Youtube document can take quite some time, we offload this downloading to the worker # We just pass a simple file to get things conform file = "Youtube placeholder file" filename = 'placeholder.youtube' extension = 'youtube' form_dict = form_to_dict(form) current_app.logger.debug(f'Form data: {form_dict}') new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Processing and Embedding on Youtube document started for tenant ' f'{session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Processing and Embedding Youtube task: {task.id}') flash(f'Processing on Youtube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: form_validation_failed(request, form) return render_template('document/add_youtube.html', form=form) @document_bp.route('/documents', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def documents(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Document.query.order_by(desc(Document.created_at)) pagination = query.paginate(page=page, per_page=per_page, error_out=False) docs = pagination.items rows = prepare_table_for_macro(docs, [('id', ''), ('name', ''), ('valid_from', ''), ('valid_to', '')]) return render_template('document/documents.html', rows=rows, pagination=pagination) @document_bp.route('/handle_document_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_document_selection(): document_identification = request.form['selected_row'] doc_id = ast.literal_eval(document_identification).get('value') action = request.form['action'] match action: case 'edit_document': return redirect(prefixed_url_for('document_bp.edit_document', document_id=doc_id)) case 'document_versions': return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 'refresh_document': refresh_document(doc_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 're_embed_latest_versions': re_embed_latest_versions() # Add more conditions for other actions return redirect(prefixed_url_for('document_bp.documents')) @document_bp.route('/edit_document/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_document(document_id): doc = Document.query.get_or_404(document_id) form = EditDocumentForm(obj=doc) if form.validate_on_submit(): doc.name = form.name.data doc.valid_from = form.valid_from.data doc.valid_to = form.valid_to.data update_logging_information(doc, dt.now(tz.utc)) try: db.session.add(doc) db.session.commit() flash(f'Document {doc.id} updated successfully', 'success') except SQLAlchemyError as e: db.session.rollback() flash(f'Error updating document: {e}', 'danger') current_app.logger.error(f'Error updating document: {e}') else: form_validation_failed(request, form) return render_template('document/edit_document.html', form=form, document_id=document_id) @document_bp.route('/edit_document_version/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_document_version(document_version_id): doc_vers = DocumentVersion.query.get_or_404(document_version_id) form = EditDocumentVersionForm(obj=doc_vers) if form.validate_on_submit(): doc_vers.user_context = form.user_context.data update_logging_information(doc_vers, dt.now(tz.utc)) try: db.session.add(doc_vers) db.session.commit() flash(f'Document Version {doc_vers.id} updated successfully', 'success') except SQLAlchemyError as e: db.session.rollback() flash(f'Error updating document version: {e}', 'danger') current_app.logger.error(f'Error updating document version {doc_vers.id} ' f'for tenant {session['tenant']['id']}: {e}') else: form_validation_failed(request, form) return render_template('document/edit_document_version.html', form=form, document_version_id=document_version_id, doc_details=f'Document {doc_vers.document.name}') @document_bp.route('/document_versions/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def document_versions(document_id): doc_vers = DocumentVersion.query.get_or_404(document_id) doc_desc = f'Document {doc_vers.document.name}, Language {doc_vers.language}' page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = (DocumentVersion.query.filter_by(doc_id=document_id) .order_by(DocumentVersion.language) .order_by(desc(DocumentVersion.id))) pagination = query.paginate(page=page, per_page=per_page, error_out=False) doc_langs = pagination.items rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''), ('file_location', ''), ('file_name', ''), ('file_type', ''), ('processing', ''), ('processing_started_at', ''), ('processing_finished_at', ''), ('processing_error', '')]) return render_template('document/document_versions.html', rows=rows, pagination=pagination, document=doc_desc) @document_bp.route('/handle_document_version_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_document_version_selection(): document_version_identification = request.form['selected_row'] doc_vers_id = ast.literal_eval(document_version_identification).get('value') action = request.form['action'] match action: case 'edit_document_version': return redirect(prefixed_url_for('document_bp.edit_document_version', document_version_id=doc_vers_id)) case 'process_document_version': process_version(doc_vers_id) # Add more conditions for other actions doc_vers = DocumentVersion.query.get_or_404(doc_vers_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_vers.doc_id)) @document_bp.route('/library_operations', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def library_operations(): return render_template('document/library_operations.html') @document_bp.route('/handle_library_selection', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_library_selection(): action = request.form['action'] match action: case 're_embed_latest_versions': re_embed_latest_versions() case 'refresh_all_documents': refresh_all_documents() return redirect(prefixed_url_for('document_bp.library_operations')) def refresh_all_documents(): for doc in Document.query.all(): refresh_document(doc.id) def refresh_document(doc_id): doc = Document.query.get_or_404(doc_id) doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first() if not doc_vers.url: current_app.logger.info(f'Document {doc_id} has no URL, skipping refresh') flash(f'This document has no URL. I can only refresh documents with a URL. skipping refresh', 'alert') return new_doc_vers = create_version_for_document(doc, doc_vers.url, doc_vers.language, doc_vers.user_context) try: db.session.add(new_doc_vers) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error refreshing document {doc_id} for tenant {session["tenant"]["id"]}: {e}') flash('Error refreshing document.', 'alert') db.session.rollback() error = e.args raise except Exception as e: current_app.logger.error('Unknown error') raise html = fetch_html(new_doc_vers.url) file = io.BytesIO(html) parsed_url = urlparse(new_doc_vers.url) path_parts = parsed_url.path.split('/') filename = path_parts[-1] if filename == '': filename = 'index' if not filename.endswith('.html'): filename += '.html' extension = 'html' current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}') upload_file_for_version(new_doc_vers, file, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') flash(f'Processing on document {doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') def re_embed_latest_versions(): docs = Document.query.all() for doc in docs: latest_doc_version = DocumentVersion.query.filter_by(doc_id=doc.id).order_by(desc(DocumentVersion.id)).first() if latest_doc_version: process_version(latest_doc_version.id) def process_version(version_id): task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], version_id, ]) current_app.logger.info(f'Embedding creation retriggered by user {current_user.id}, {current_user.email} ' f'for tenant {session["tenant"]["id"]}, ' f'Document Version {version_id}. ' f'Embedding creation task: {task.id}') flash(f'Processing for document version {version_id} retriggered successfully...', 'success') return redirect(prefixed_url_for('document_bp.documents')) def set_logging_information(obj, timestamp): obj.created_at = timestamp obj.updated_at = timestamp obj.created_by = current_user.id obj.updated_by = current_user.id def update_logging_information(obj, timestamp): obj.updated_at = timestamp obj.updated_by = current_user.id def create_document_stack(form, file, filename, extension): # Create the Document new_doc = create_document(form, filename) # Create the DocumentVersion new_doc_vers = create_version_for_document(new_doc, form.get('url', ''), form.get('language', 'en'), form.get('user_context', '') ) try: db.session.add(new_doc) db.session.add(new_doc_vers) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') flash('Error adding document.', 'alert') db.session.rollback() error = e.args raise except Exception as e: current_app.logger.error('Unknown error') raise current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc.id}') upload_file_for_version(new_doc_vers, file, extension) return new_doc, new_doc_vers def log_session_state(session, msg=""): current_app.logger.debug(f"{msg} - Session dirty: {session.dirty}") current_app.logger.debug(f"{msg} - Session new: {session.new}") def create_document(form, filename): new_doc = Document() if form['name'] == '': new_doc.name = filename.rsplit('.', 1)[0] else: new_doc.name = form['name'] if form['valid_from'] and form['valid_from'] != '': new_doc.valid_from = form['valid_from'] else: new_doc.valid_from = dt.now(tz.utc) new_doc.tenant_id = session['tenant']['id'] set_logging_information(new_doc, dt.now(tz.utc)) return new_doc def create_version_for_document(document, url, language, user_context): new_doc_vers = DocumentVersion() if url != '': new_doc_vers.url = url if language == '': new_doc_vers.language = session['default_language'] else: new_doc_vers.language = language if user_context != '': new_doc_vers.user_context = user_context new_doc_vers.document = document set_logging_information(new_doc_vers, dt.now(tz.utc)) return new_doc_vers def upload_file_for_version(doc_vers, file, extension): doc_vers.file_type = extension doc_vers.file_name = doc_vers.calc_file_name() doc_vers.file_location = doc_vers.calc_file_location() # Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place. tenant_id = session['tenant']['id'] minio_client.create_tenant_bucket(tenant_id) try: minio_client.upload_document_file( tenant_id, doc_vers.doc_id, doc_vers.language, doc_vers.id, doc_vers.file_name, file ) db.session.commit() current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for ' f'document version {doc_vers.id} while uploading file.') except S3Error as e: db.session.rollback() flash('Error saving document to MinIO.', 'error') current_app.logger.error( f'Error saving document to MinIO for tenant {tenant_id}: {e}') raise except SQLAlchemyError as e: db.session.rollback() flash('Error saving document metadata.', 'error') current_app.logger.error( f'Error saving document metadata for tenant {tenant_id}: {e}') raise def fetch_html(url): # Fetches HTML content from a URL try: response = requests.get(url) except SSLError as e: current_app.logger.error(f"Error fetching HTML from {url} for tenant {session['tenant']['id']}. " f"Error Encountered: {e}") if current_app.config.get('DEBUG'): # only allow when in a development environment current_app.logger.info(f"Skipping SSL verification for {url} for tenant {session['tenant']['id']}. " f"Only while in development environment.") response = requests.get(url, verify=False) # Disable SSL verification else: response = None response.raise_for_status() # Will raise an exception for bad requests return response.content def prepare_document_data(docs): rows = [] for doc in docs: doc_row = [{'value': doc.name, 'class': '', 'type': 'text'}, {'value': doc.created_at.strftime("%Y-%m-%d %H:%M:%S"), 'class': '', 'type': 'text'}] # Document basic details if doc.valid_from: doc_row.append({'value': doc.valid_from.strftime("%Y-%m-%d"), 'class': '', 'type': 'text'}) else: doc_row.append({'value': '', 'class': '', 'type': 'text'}) # Nested languages and versions languages_rows = [] for lang in doc.languages: lang_row = [{'value': lang.language, 'class': '', 'type': 'text'}] # Latest version details if available (should be available ;-) ) if lang.latest_version: lang_row.append({'value': lang.latest_version.created_at.strftime("%Y-%m-%d %H:%M:%S"), 'class': '', 'type': 'text'}) if lang.latest_version.url: lang_row.append({'value': lang.latest_version.url, 'class': '', 'type': 'link', 'href': lang.latest_version.url}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) if lang.latest_version.file_name: lang_row.append({'value': lang.latest_version.file_name, 'class': '', 'type': 'text'}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) if lang.latest_version.file_type: lang_row.append({'value': lang.latest_version.file_type, 'class': '', 'type': 'text'}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) # Include other details as necessary languages_rows.append(lang_row) doc_row.append({'is_group': True, 'colspan': '5', 'headers': ['Language', 'Latest Version', 'URL', 'File Name', 'Type'], 'sub_rows': languages_rows}) rows.append(doc_row) return rows