import ast import os from datetime import datetime as dt, timezone as tz import chardet from flask import request, redirect, flash, render_template, Blueprint, session, current_app from flask_security import roles_accepted, current_user from sqlalchemy import desc from sqlalchemy.orm import joinedload from werkzeug.datastructures import FileStorage from werkzeug.utils import secure_filename from sqlalchemy.exc import SQLAlchemyError import requests from requests.exceptions import SSLError from urllib.parse import urlparse import io from common.models.document import Document, DocumentVersion from common.extensions import db from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, AddYoutubeForm from common.utils.middleware import mw_before_request from common.utils.celery_utils import current_celery from common.utils.nginx_utils import prefixed_url_for from common.utils.view_assistants import form_validation_failed, prepare_table_for_macro, form_to_dict document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @document_bp.before_request def log_before_request(): current_app.logger.debug(f"Before request (document_bp): {request.method} {request.url}") @document_bp.after_request def log_after_request(response): current_app.logger.debug( f"After request (document_bp): {request.method} {request.url} - Status: {response.status}") return response @document_bp.before_request def before_request(): try: mw_before_request() except Exception as e: current_app.logger.error(f'Error switching schema in Document Blueprint: {e}') for role in current_user.roles: current_app.logger.debug(f'User {current_user.email} has role {role.name}') raise @document_bp.route('/add_document', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_document(): form = AddDocumentForm() # If the form is submitted if form.validate_on_submit(): current_app.logger.info(f'Adding document for tenant {session["tenant"]["id"]}') file = form.file.data filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() form_dict = form_to_dict(form) new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: form_validation_failed(request, form) return render_template('document/add_document.html', form=form) @document_bp.route('/add_url', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_url(): form = AddURLForm() # If the form is submitted if form.validate_on_submit(): current_app.logger.info(f'Adding url for tenant {session["tenant"]["id"]}') url = form.url.data doc_vers = DocumentVersion.query.filter_by(url=url).all() if doc_vers: current_app.logger.info(f'A document with url {url} already exists. No new document created.') flash(f'A document with url {url} already exists. No new document created.', 'info') return redirect(prefixed_url_for('document_bp.documents')) # Only when no document with URL exists html = fetch_html(url) file = io.BytesIO(html) parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') filename = path_parts[-1] if filename == '': filename = 'index' if not filename.endswith('.html'): filename += '.html' extension = 'html' form_dict = form_to_dict(form) new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: form_validation_failed(request, form) return render_template('document/add_url.html', form=form) @document_bp.route('/add_youtube', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_youtube(): form = AddYoutubeForm() if form.validate_on_submit(): current_app.logger.info(f'Adding Youtube document for tenant {session["tenant"]["id"]}') url = form.url.data current_app.logger.debug(f'Value of language field: {form.language.data}') doc_vers = DocumentVersion.query.filter_by(url=url).all() if doc_vers: current_app.logger.info(f'A document with url {url} already exists. No new document created.') flash(f'A document with url {url} already exists. No new document created.', 'info') return redirect(prefixed_url_for('document_bp.documents')) # As downloading a Youtube document can take quite some time, we offload this downloading to the worker # We just pass a simple file to get things conform file = "Youtube placeholder file" filename = 'placeholder.youtube' extension = 'youtube' form_dict = form_to_dict(form) current_app.logger.debug(f'Form data: {form_dict}') new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Processing and Embedding on Youtube document started for tenant ' f'{session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Processing and Embedding Youtube task: {task.id}') flash(f'Processing on Youtube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') return redirect(prefixed_url_for('document_bp.documents')) else: form_validation_failed(request, form) return render_template('document/add_youtube.html', form=form) @document_bp.route('/documents', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def documents(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Document.query.order_by(desc(Document.created_at)) pagination = query.paginate(page=page, per_page=per_page, error_out=False) docs = pagination.items rows = prepare_table_for_macro(docs, [('id', ''), ('name', ''), ('valid_from', ''), ('valid_to', '')]) return render_template('document/documents.html', rows=rows, pagination=pagination) @document_bp.route('/handle_document_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_document_selection(): document_identification = request.form['selected_row'] doc_id = ast.literal_eval(document_identification).get('value') action = request.form['action'] match action: case 'edit_document': return redirect(prefixed_url_for('document_bp.edit_document', document_id=doc_id)) case 'document_versions': return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 'refresh_document': refresh_document(doc_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_id)) case 're_embed_latest_versions': re_embed_latest_versions() # Add more conditions for other actions return redirect(prefixed_url_for('document_bp.documents')) @document_bp.route('/edit_document/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_document(document_id): doc = Document.query.get_or_404(document_id) form = EditDocumentForm(obj=doc) if form.validate_on_submit(): doc.name = form.name.data doc.valid_from = form.valid_from.data doc.valid_to = form.valid_to.data update_logging_information(doc, dt.now(tz.utc)) try: db.session.add(doc) db.session.commit() flash(f'Document {doc.id} updated successfully', 'success') except SQLAlchemyError as e: db.session.rollback() flash(f'Error updating document: {e}', 'danger') current_app.logger.error(f'Error updating document: {e}') else: form_validation_failed(request, form) return render_template('document/edit_document.html', form=form, document_id=document_id) @document_bp.route('/edit_document_version/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def edit_document_version(document_version_id): doc_vers = DocumentVersion.query.get_or_404(document_version_id) form = EditDocumentVersionForm(obj=doc_vers) if form.validate_on_submit(): doc_vers.user_context = form.user_context.data update_logging_information(doc_vers, dt.now(tz.utc)) try: db.session.add(doc_vers) db.session.commit() flash(f'Document Version {doc_vers.id} updated successfully', 'success') except SQLAlchemyError as e: db.session.rollback() flash(f'Error updating document version: {e}', 'danger') current_app.logger.error(f'Error updating document version {doc_vers.id} ' f'for tenant {session['tenant']['id']}: {e}') else: form_validation_failed(request, form) return render_template('document/edit_document_version.html', form=form, document_version_id=document_version_id, doc_details=f'Document {doc_vers.document.name}') @document_bp.route('/document_versions/', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def document_versions(document_id): doc_vers = DocumentVersion.query.get_or_404(document_id) doc_desc = f'Document {doc_vers.document.name}, Language {doc_vers.language}' page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = (DocumentVersion.query.filter_by(doc_id=document_id) .order_by(DocumentVersion.language) .order_by(desc(DocumentVersion.id))) pagination = query.paginate(page=page, per_page=per_page, error_out=False) doc_langs = pagination.items rows = prepare_table_for_macro(doc_langs, [('id', ''), ('url', ''), ('file_location', ''), ('file_name', ''), ('file_type', ''), ('processing', ''), ('processing_started_at', ''), ('processing_finished_at', ''), ('processing_error', '')]) return render_template('document/document_versions.html', rows=rows, pagination=pagination, document=doc_desc) @document_bp.route('/handle_document_version_selection', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_document_version_selection(): document_version_identification = request.form['selected_row'] doc_vers_id = ast.literal_eval(document_version_identification).get('value') action = request.form['action'] match action: case 'edit_document_version': return redirect(prefixed_url_for('document_bp.edit_document_version', document_version_id=doc_vers_id)) case 'process_document_version': process_version(doc_vers_id) # Add more conditions for other actions doc_vers = DocumentVersion.query.get_or_404(doc_vers_id) return redirect(prefixed_url_for('document_bp.document_versions', document_id=doc_vers.doc_id)) @document_bp.route('/library_operations', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def library_operations(): return render_template('document/library_operations.html') @document_bp.route('/handle_library_selection', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def handle_library_selection(): action = request.form['action'] match action: case 're_embed_latest_versions': re_embed_latest_versions() case 'refresh_all_documents': refresh_all_documents() return redirect(prefixed_url_for('document_bp.library_operations')) def refresh_all_documents(): for doc in Document.query.all(): refresh_document(doc.id) def refresh_document(doc_id): doc = Document.query.get_or_404(doc_id) doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first() if not doc_vers.url: current_app.logger.info(f'Document {doc_id} has no URL, skipping refresh') flash(f'This document has no URL. I can only refresh documents with a URL. skipping refresh', 'alert') return new_doc_vers = create_version_for_document(doc, doc_vers.url, doc_vers.language, doc_vers.user_context) try: db.session.add(new_doc_vers) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error refreshing document {doc_id} for tenant {session["tenant"]["id"]}: {e}') flash('Error refreshing document.', 'alert') db.session.rollback() error = e.args raise except Exception as e: current_app.logger.error('Unknown error') raise html = fetch_html(new_doc_vers.url) file = io.BytesIO(html) parsed_url = urlparse(new_doc_vers.url) path_parts = parsed_url.path.split('/') filename = path_parts[-1] if filename == '': filename = 'index' if not filename.endswith('.html'): filename += '.html' extension = 'html' current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}') upload_file_for_version(new_doc_vers, file, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') flash(f'Processing on document {doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', 'success') def re_embed_latest_versions(): docs = Document.query.all() for doc in docs: latest_doc_version = DocumentVersion.query.filter_by(doc_id=doc.id).order_by(desc(DocumentVersion.id)).first() if latest_doc_version: process_version(latest_doc_version.id) def process_version(version_id): task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], version_id, ]) current_app.logger.info(f'Embedding creation retriggered by user {current_user.id}, {current_user.email} ' f'for tenant {session["tenant"]["id"]}, ' f'Document Version {version_id}. ' f'Embedding creation task: {task.id}') flash(f'Processing for document version {version_id} retriggered successfully...', 'success') return redirect(prefixed_url_for('document_bp.documents')) def set_logging_information(obj, timestamp): obj.created_at = timestamp obj.updated_at = timestamp obj.created_by = current_user.id obj.updated_by = current_user.id def update_logging_information(obj, timestamp): obj.updated_at = timestamp obj.updated_by = current_user.id def create_document_stack(form, file, filename, extension): # Create the Document new_doc = create_document(form, filename) # Create the DocumentVersion new_doc_vers = create_version_for_document(new_doc, form.get('url', ''), form.get('language', 'en'), form.get('user_context', '') ) try: db.session.add(new_doc) db.session.add(new_doc_vers) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') flash('Error adding document.', 'alert') db.session.rollback() error = e.args raise except Exception as e: current_app.logger.error('Unknown error') raise current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc.id}') upload_file_for_version(new_doc_vers, file, extension) return new_doc, new_doc_vers def log_session_state(session, msg=""): current_app.logger.debug(f"{msg} - Session dirty: {session.dirty}") current_app.logger.debug(f"{msg} - Session new: {session.new}") def create_document(form, filename): new_doc = Document() if form['name'] == '': new_doc.name = filename.rsplit('.', 1)[0] else: new_doc.name = form['name'] if form['valid_from'] and form['valid_from'] != '': new_doc.valid_from = form['valid_from'] else: new_doc.valid_from = dt.now(tz.utc) new_doc.tenant_id = session['tenant']['id'] set_logging_information(new_doc, dt.now(tz.utc)) return new_doc def create_version_for_document(document, url, language, user_context): new_doc_vers = DocumentVersion() if url != '': new_doc_vers.url = url if language == '': new_doc_vers.language = session['default_language'] else: new_doc_vers.language = language if user_context != '': new_doc_vers.user_context = user_context new_doc_vers.document = document set_logging_information(new_doc_vers, dt.now(tz.utc)) return new_doc_vers def upload_file_for_version(doc_vers, file, extension): doc_vers.file_type = extension doc_vers.file_name = doc_vers.calc_file_name() doc_vers.file_location = doc_vers.calc_file_location() upload_path = os.path.join(current_app.config['UPLOAD_FOLDER'], doc_vers.file_location) if not os.path.exists(upload_path): os.makedirs(upload_path, exist_ok=True) if isinstance(file, FileStorage): file.save(os.path.join(upload_path, doc_vers.file_name)) elif isinstance(file, io.BytesIO): # It's a BytesIO object, handle accordingly # Example: write content to a file manually with open(os.path.join(upload_path, doc_vers.file_name), 'wb') as f: f.write(file.getvalue()) elif isinstance(file, str): # It's a string, handle accordingly with open(os.path.join(upload_path, doc_vers.file_name), 'w') as f: f.write(file) else: raise TypeError('Unsupported file type.') try: db.session.commit() except SQLAlchemyError as e: db.session.rollback() flash('Error saving document.', 'error') current_app.logger.error( f'Error saving document for tenant {session["tenant"]["id"]} while uploading file: {e}') current_app.logger.info(f'Succesfully saved document for tenant {session['tenant']['id']} for ' f'document version {doc_vers.id} while uploading file.') def fetch_html(url): # Fetches HTML content from a URL try: response = requests.get(url) except SSLError as e: current_app.logger.error(f"Error fetching HTML from {url} for tenant {session['tenant']['id']}. " f"Error Encountered: {e}") if current_app.config.get('DEBUG'): # only allow when in a development environment current_app.logger.info(f"Skipping SSL verification for {url} for tenant {session['tenant']['id']}. " f"Only while in development environment.") response = requests.get(url, verify=False) # Disable SSL verification else: response = None response.raise_for_status() # Will raise an exception for bad requests return response.content def prepare_document_data(docs): rows = [] for doc in docs: doc_row = [{'value': doc.name, 'class': '', 'type': 'text'}, {'value': doc.created_at.strftime("%Y-%m-%d %H:%M:%S"), 'class': '', 'type': 'text'}] # Document basic details if doc.valid_from: doc_row.append({'value': doc.valid_from.strftime("%Y-%m-%d"), 'class': '', 'type': 'text'}) else: doc_row.append({'value': '', 'class': '', 'type': 'text'}) # Nested languages and versions languages_rows = [] for lang in doc.languages: lang_row = [{'value': lang.language, 'class': '', 'type': 'text'}] # Latest version details if available (should be available ;-) ) if lang.latest_version: lang_row.append({'value': lang.latest_version.created_at.strftime("%Y-%m-%d %H:%M:%S"), 'class': '', 'type': 'text'}) if lang.latest_version.url: lang_row.append({'value': lang.latest_version.url, 'class': '', 'type': 'link', 'href': lang.latest_version.url}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) if lang.latest_version.file_name: lang_row.append({'value': lang.latest_version.file_name, 'class': '', 'type': 'text'}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) if lang.latest_version.file_type: lang_row.append({'value': lang.latest_version.file_type, 'class': '', 'type': 'text'}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) # Include other details as necessary languages_rows.append(lang_row) doc_row.append({'is_group': True, 'colspan': '5', 'headers': ['Language', 'Latest Version', 'URL', 'File Name', 'Type'], 'sub_rows': languages_rows}) rows.append(doc_row) return rows