import os from datetime import datetime as dt, timezone as tz from flask import request, redirect, url_for, flash, render_template, Blueprint, session, current_app from flask_security import roles_accepted, current_user from sqlalchemy import desc from sqlalchemy.orm import joinedload from werkzeug.datastructures import FileStorage from werkzeug.utils import secure_filename from sqlalchemy.exc import SQLAlchemyError import requests from requests.exceptions import SSLError from urllib.parse import urlparse import io from common.models.document import Document, DocumentLanguage, DocumentVersion from common.extensions import db from .document_forms import AddDocumentForm, AddURLForm from common.utils.middleware import mw_before_request from common.utils.celery_utils import current_celery document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @document_bp.before_request def before_request(): mw_before_request() @document_bp.route('/add_document', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_document(): form = AddDocumentForm() # If the form is submitted if request.method == 'POST' and form.validate_on_submit(): current_app.logger.info(f'Adding document for tenant {session["tenant"]["id"]}') file = form.file.data filename = secure_filename(file.filename) extension = filename.rsplit('.', 1)[1].lower() new_doc, new_doc_lang, new_doc_vers = create_document_stack(form, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') return redirect(url_for('document_bp.documents')) return render_template('document/add_document.html', form=form) @document_bp.route('/add_url', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_url(): form = AddURLForm() # If the form is submitted if request.method == 'POST' and form.validate_on_submit(): current_app.logger.info(f'Adding document for tenant {session["tenant"]["id"]}') url = form.url.data html = fetch_html(url) file = io.StringIO(html) parsed_url = urlparse(url) path_parts = parsed_url.path.split('/') filename = path_parts[-1] if filename == '': filename = 'index' if not filename.endswith('.html'): filename += '.html' extension = 'html' new_doc, new_doc_lang, new_doc_vers = create_document_stack(form, file, filename, extension) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], new_doc_vers.id, ]) current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}. ' f'Embedding creation task: {task.id}') return redirect(url_for('document_bp.documents')) return render_template('document/add_url.html', form=form) @document_bp.route('/documents', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def documents(): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) query = Document.query.order_by(desc(Document.created_at)).options( joinedload(Document.languages).joinedload(DocumentLanguage.versions)) pagination = query.paginate(page=page, per_page=per_page, error_out=False) docs = pagination.items rows = prepare_document_data(docs) return render_template('document/documents.html', rows=rows, pagination=pagination) @document_bp.route('/process_version/', methods=['POST']) @roles_accepted('Super User', 'Tenant Admin') def process_version(version_id): version = DocumentVersion.query.get_or_404(version_id) if not version.processing: print(f'Placeholder for processing version: {version_id}') return redirect(url_for('documents')) def set_logging_information(obj, timestamp): obj.created_at = timestamp obj.updated_at = timestamp obj.created_by = current_user.id obj.updated_by = current_user.id def create_document_stack(form, file, filename, extension): # Create the Document new_doc = create_document(form, filename) # Create the DocumentLanguage new_doc_lang = create_language_for_document(new_doc, form.language.data, form.user_context.data) # Create the DocumentVersion new_doc_vers = DocumentVersion() new_doc_vers.document_language = new_doc_lang set_logging_information(new_doc_vers, dt.now(tz.utc)) try: db.session.add(new_doc) db.session.add(new_doc_lang) db.session.add(new_doc_vers) db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') flash('Error adding document.', 'error') db.session.rollback() error = e.args raise except Exception as e: current_app.logger.error('Unknown error') raise try: new_doc_lang = db.session.merge(new_doc_lang) new_doc_vers = db.session.merge(new_doc_vers) new_doc_lang.latest_version_id = new_doc_vers.id db.session.commit() except SQLAlchemyError as e: current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') flash('Error adding document.', 'error') db.session.rollback() error = e.args raise except Exception as e: current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') flash('Error adding document.', 'error') db.session.rollback() error = e.args raise current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc.id}') upload_file_for_version(new_doc_vers, file, extension) return new_doc, new_doc_lang, new_doc_vers def log_session_state(session, msg=""): current_app.logger.debug(f"{msg} - Session dirty: {session.dirty}") current_app.logger.debug(f"{msg} - Session new: {session.new}") def create_document(form, filename): new_doc = Document() if form.name.data == '': new_doc.name = filename.rsplit('.', 1)[0] else: new_doc.name = form.name.data if form.valid_from.data or form.valid_from.data != '': new_doc.valid_from = form.valid_from.data else: new_doc.valid_from = dt.now(tz.utc) new_doc.tenant_id = session['tenant']['id'] set_logging_information(new_doc, dt.now(tz.utc)) return new_doc def create_language_for_document(document, language, user_context): new_doc_lang = DocumentLanguage() if language == '': new_doc_lang.language = session['default_language'] else: new_doc_lang.language = language if user_context != '': new_doc_lang.user_context = user_context new_doc_lang.document = document set_logging_information(new_doc_lang, dt.now(tz.utc)) return new_doc_lang def upload_file_for_version(doc_vers, file, extension): doc_vers.file_type = extension doc_vers.file_name = doc_vers.calc_file_name() doc_vers.file_location = doc_vers.calc_file_location() upload_path = os.path.join(current_app.config['UPLOAD_FOLDER'], doc_vers.file_location) if not os.path.exists(upload_path): os.makedirs(upload_path, exist_ok=True) if isinstance(file, FileStorage): file.save(os.path.join(upload_path, doc_vers.file_name)) elif isinstance(file, io.StringIO): # It's a StringIO object, handle accordingly # Example: write content to a file manually content = file.getvalue() with open(os.path.join(upload_path, doc_vers.file_name), 'w', encoding='utf-8') as file: file.write(content) else: raise TypeError('Unsupported file type.') try: db.session.commit() except SQLAlchemyError as e: db.session.rollback() flash('Error saving document.', 'error') current_app.logger.error( f'Error saving document for tenant {session["tenant"]["id"]} while uploading file: {error}') current_app.logger.info(f'Succesfully saved document for tenant {session['tenant']['id']} for ' f'document version {doc_vers.id} while uploading file.') def fetch_html(url): # Fetches HTML content from a URL try: response = requests.get(url) except SSLError as e: current_app.logger.error(f"Error fetching HTML from {url} for tenant {session['tenant']['id']}. " f"Error Encountered: {e}") if current_app.config.get('DEBUG'): # only allow when in a development environment current_app.logger.info(f"Skipping SSL verification for {url} for tenant {session['tenant']['id']}. " f"Only while in development environment.") response = requests.get(url, verify=False) # Disable SSL verification else: response = None response.raise_for_status() # Will raise an exception for bad requests return response.text # Sample code for adding or updating versions and ensuring latest_version is set in DocumentLanguage # def add_or_update_version(language_id, version_data): # new_version = Version(language_id=language_id, **version_data) # db.session.add(new_version) # db.session.flush() # Ensures new_version gets an ID assigned if it's new # # # Assuming we always call this when we know it's the latest # language = Language.query.get(language_id) # language.latest_version_id = new_version.id # db.session.commit() # sample code for using latest_version in the application # @app.route('/language/') # def show_language(language_id): # language = Language.query.get_or_404(language_id) # latest_version = language.latest_version # This is now a direct, efficient database access # return render_template('language_details.html', language=language, latest_version=latest_version) def prepare_document_data(docs): rows = [] for doc in docs: doc_row = [{'value': doc.name, 'class': '', 'type': 'text'}, {'value': doc.created_at.strftime("%Y-%m-%d %H:%M:%S"), 'class': '', 'type': 'text'}] # Document basic details if doc.valid_from: doc_row.append({'value': doc.valid_from.strftime("%Y-%m-%d"), 'class': '', 'type': 'text'}) else: doc_row.append({'value': '', 'class': '', 'type': 'text'}) # Nested languages and versions languages_rows = [] for lang in doc.languages: lang_row = [{'value': lang.language, 'class': '', 'type': 'text'}] # Latest version details if available (should be available ;-) ) if lang.latest_version: lang_row.append({'value': lang.latest_version.created_at.strftime("%Y-%m-%d %H:%M:%S"), 'class': '', 'type': 'text'}) if lang.latest_version.url: lang_row.append({'value': lang.latest_version.url, 'class': '', 'type': 'link', 'href': lang.latest_version.url}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) if lang.latest_version.file_name: lang_row.append({'value': lang.latest_version.file_name, 'class': '', 'type': 'text'}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) if lang.latest_version.file_type: lang_row.append({'value': lang.latest_version.file_type, 'class': '', 'type': 'text'}) else: lang_row.append({'value': '', 'class': '', 'type': 'text'}) # Include other details as necessary languages_rows.append(lang_row) doc_row.append({'is_group': True, 'colspan': '5', 'headers': ['Language', 'Latest Version', 'URL', 'File Name', 'Type'], 'sub_rows': languages_rows}) rows.append(doc_row) return rows