From 914c265afe8767300641ef5435e26135a65d5250 Mon Sep 17 00:00:00 2001 From: Josako Date: Mon, 2 Sep 2024 12:37:44 +0200 Subject: [PATCH] - Improvements on document uploads (accept other files than html-files when entering a URL) - Introduction of API-functionality (to be continued). Deduplication of document and url uploads between views and api. - Improvements on document processing - introduction of processor classes to streamline document inputs - Removed pure Youtube functionality, as Youtube retrieval of documents continuously changes. But added upload of srt, mp3, ogg and mp4 --- common/extensions.py | 2 + common/models/document.py | 2 +- common/utils/document_utils.py | 230 +++++ common/utils/eveai_exceptions.py | 43 + config/config.py | 4 + config/logging_config.py | 13 + eveai_api/__init__.py | 76 +- eveai_api/api/auth.py | 24 + eveai_api/api/document_api.py | 178 ++++ eveai_api/auth.py | 7 - eveai_app/__init__.py | 22 +- eveai_app/templates/navbar.html | 1 - eveai_app/views/document_forms.py | 15 +- eveai_app/views/document_views.py | 368 +++----- eveai_workers/Processors/audio_processor.py | 187 ++++ eveai_workers/Processors/html_processor.py | 142 ++++ .../{PDF_Processor.py => pdf_processor.py} | 42 +- eveai_workers/Processors/processor.py | 42 + eveai_workers/Processors/srt_processor.py | 80 ++ eveai_workers/tasks.py | 795 +++++++----------- requirements.txt | 4 +- 21 files changed, 1425 insertions(+), 852 deletions(-) create mode 100644 common/utils/document_utils.py create mode 100644 common/utils/eveai_exceptions.py create mode 100644 eveai_api/api/auth.py create mode 100644 eveai_api/api/document_api.py delete mode 100644 eveai_api/auth.py create mode 100644 eveai_workers/Processors/audio_processor.py create mode 100644 eveai_workers/Processors/html_processor.py rename eveai_workers/Processors/{PDF_Processor.py => pdf_processor.py} (88%) create mode 100644 eveai_workers/Processors/processor.py create mode 100644 eveai_workers/Processors/srt_processor.py diff --git a/common/extensions.py b/common/extensions.py index deeafdd..7c3f49a 100644 --- a/common/extensions.py +++ b/common/extensions.py @@ -9,6 +9,7 @@ from flask_socketio import SocketIO from flask_jwt_extended import JWTManager from flask_session import Session from flask_wtf import CSRFProtect +from flask_restful import Api from .utils.nginx_utils import prefixed_url_for from .utils.simple_encryption import SimpleEncryption @@ -27,6 +28,7 @@ cors = CORS() socketio = SocketIO() jwt = JWTManager() session = Session() +api = Api() # kms_client = JosKMSClient.from_service_account_json('config/gc_sa_eveai.json') diff --git a/common/models/document.py b/common/models/document.py index aeed9a3..6923463 100644 --- a/common/models/document.py +++ b/common/models/document.py @@ -12,7 +12,7 @@ class Document(db.Model): # Versioning Information created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now()) - created_by = db.Column(db.Integer, db.ForeignKey(User.id), nullable=False) + created_by = db.Column(db.Integer, db.ForeignKey(User.id), nullable=True) updated_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now(), onupdate=db.func.now()) updated_by = db.Column(db.Integer, db.ForeignKey(User.id)) diff --git a/common/utils/document_utils.py b/common/utils/document_utils.py new file mode 100644 index 0000000..7a11fdb --- /dev/null +++ b/common/utils/document_utils.py @@ -0,0 +1,230 @@ +from datetime import datetime as dt, timezone as tz +from sqlalchemy.exc import SQLAlchemyError +from werkzeug.utils import secure_filename +from common.models.document import Document, DocumentVersion +from common.extensions import db, minio_client +from common.utils.celery_utils import current_celery +from flask import current_app +import requests +from urllib.parse import urlparse, unquote +import os +from .eveai_exceptions import EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, \ + EveAIYoutubeError + + +def create_document_stack(api_input, file, filename, extension, tenant_id): + # Create the Document + new_doc = create_document(api_input, filename, tenant_id) + db.session.add(new_doc) + + # Create the DocumentVersion + new_doc_vers = create_version_for_document(new_doc, + api_input.get('url', ''), + api_input.get('language', 'en'), + api_input.get('user_context', '') + ) + db.session.add(new_doc_vers) + + try: + db.session.commit() + except SQLAlchemyError as e: + current_app.logger.error(f'Error adding document for tenant {tenant_id}: {e}') + db.session.rollback() + raise + + current_app.logger.info(f'Document added successfully for tenant {tenant_id}, ' + f'Document Version {new_doc.id}') + + # Upload file to storage + upload_file_for_version(new_doc_vers, file, extension, tenant_id) + + return new_doc, new_doc_vers + + +def create_document(form, filename, tenant_id): + new_doc = Document() + if form['name'] == '': + new_doc.name = filename.rsplit('.', 1)[0] + else: + new_doc.name = form['name'] + + if form['valid_from'] and form['valid_from'] != '': + new_doc.valid_from = form['valid_from'] + else: + new_doc.valid_from = dt.now(tz.utc) + new_doc.tenant_id = tenant_id + set_logging_information(new_doc, dt.now(tz.utc)) + + return new_doc + + +def create_version_for_document(document, url, language, user_context): + new_doc_vers = DocumentVersion() + if url != '': + new_doc_vers.url = url + + if language == '': + raise EveAIInvalidLanguageException('Language is required for document creation!') + else: + new_doc_vers.language = language + + if user_context != '': + new_doc_vers.user_context = user_context + + new_doc_vers.document = document + + set_logging_information(new_doc_vers, dt.now(tz.utc)) + + return new_doc_vers + + +def upload_file_for_version(doc_vers, file, extension, tenant_id): + doc_vers.file_type = extension + doc_vers.file_name = doc_vers.calc_file_name() + doc_vers.file_location = doc_vers.calc_file_location() + + # Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place. + minio_client.create_tenant_bucket(tenant_id) + + try: + minio_client.upload_document_file( + tenant_id, + doc_vers.doc_id, + doc_vers.language, + doc_vers.id, + doc_vers.file_name, + file + ) + db.session.commit() + current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for ' + f'document version {doc_vers.id} while uploading file.') + except Exception as e: + db.session.rollback() + current_app.logger.error( + f'Error saving document to MinIO for tenant {tenant_id}: {e}') + raise + + +def set_logging_information(obj, timestamp): + obj.created_at = timestamp + obj.updated_at = timestamp + + +def update_logging_information(obj, timestamp): + obj.updated_at = timestamp + + +def get_extension_from_content_type(content_type): + content_type_map = { + 'text/html': 'html', + 'application/pdf': 'pdf', + 'text/plain': 'txt', + 'application/msword': 'doc', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', + # Add more mappings as needed + } + return content_type_map.get(content_type, 'html') # Default to 'html' if unknown + + +def process_url(url, tenant_id): + response = requests.head(url, allow_redirects=True) + content_type = response.headers.get('Content-Type', '').split(';')[0] + + # Determine file extension based on Content-Type + extension = get_extension_from_content_type(content_type) + + # Generate filename + parsed_url = urlparse(url) + path = unquote(parsed_url.path) + filename = os.path.basename(path) + + if not filename or '.' not in filename: + # Use the last part of the path or a default name + filename = path.strip('/').split('/')[-1] or 'document' + filename = secure_filename(f"{filename}.{extension}") + else: + filename = secure_filename(filename) + + # Check if a document with this URL already exists + existing_doc = DocumentVersion.query.filter_by(url=url).first() + if existing_doc: + raise EveAIDoubleURLException + + # Download the content + response = requests.get(url) + response.raise_for_status() + file_content = response.content + + return file_content, filename, extension + + +def process_multiple_urls(urls, tenant_id, api_input): + results = [] + for url in urls: + try: + file_content, filename, extension = process_url(url, tenant_id) + + url_input = api_input.copy() + url_input.update({ + 'url': url, + 'name': f"{api_input['name']}-{filename}" if api_input['name'] else filename + }) + + new_doc, new_doc_vers = create_document_stack(url_input, file_content, filename, extension, tenant_id) + task_id = start_embedding_task(tenant_id, new_doc_vers.id) + + results.append({ + 'url': url, + 'document_id': new_doc.id, + 'document_version_id': new_doc_vers.id, + 'task_id': task_id, + 'status': 'success' + }) + except Exception as e: + current_app.logger.error(f"Error processing URL {url}: {str(e)}") + results.append({ + 'url': url, + 'status': 'error', + 'message': str(e) + }) + return results + + +def prepare_youtube_document(url, tenant_id, api_input): + try: + filename = f"placeholder.youtube" + extension = 'youtube' + + new_doc = create_document(api_input, filename, tenant_id) + new_doc_vers = create_version_for_document(new_doc, url, api_input['language'], api_input['user_context']) + + new_doc_vers.file_type = extension + new_doc_vers.file_name = new_doc_vers.calc_file_name() + new_doc_vers.file_location = new_doc_vers.calc_file_location() + + db.session.add(new_doc) + db.session.add(new_doc_vers) + db.session.commit() + + return new_doc, new_doc_vers + except Exception as e: + raise EveAIYoutubeError(f"Error preparing YouTube document: {str(e)}") + + +def start_embedding_task(tenant_id, doc_vers_id): + task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ + tenant_id, + doc_vers_id, + ]) + current_app.logger.info(f'Embedding creation started for tenant {tenant_id}, ' + f'Document Version {doc_vers_id}. ' + f'Embedding creation task: {task.id}') + return task.id + + +def validate_file_type(extension): + current_app.logger.debug(f'Validating file type {extension}') + current_app.logger.debug(f'Supported file types: {current_app.config["SUPPORTED_FILE_TYPES"]}') + if extension not in current_app.config['SUPPORTED_FILE_TYPES']: + raise EveAIUnsupportedFileType(f"Filetype {extension} is currently not supported. " + f"Supported filetypes: {', '.join(current_app.config['SUPPORTED_FILE_TYPES'])}") diff --git a/common/utils/eveai_exceptions.py b/common/utils/eveai_exceptions.py new file mode 100644 index 0000000..c1e2bb6 --- /dev/null +++ b/common/utils/eveai_exceptions.py @@ -0,0 +1,43 @@ +class EveAIException(Exception): + """Base exception class for EveAI API""" + + def __init__(self, message, status_code=400, payload=None): + super().__init__() + self.message = message + self.status_code = status_code + self.payload = payload + + def to_dict(self): + rv = dict(self.payload or ()) + rv['message'] = self.message + return rv + + +class EveAIInvalidLanguageException(EveAIException): + """Raised when an invalid language is provided""" + + def __init__(self, message="Langage is required", status_code=400, payload=None): + super().__init__(message, status_code, payload) + + +class EveAIDoubleURLException(EveAIException): + """Raised when an existing url is provided""" + + def __init__(self, message="URL already exists", status_code=400, payload=None): + super().__init__(message, status_code, payload) + + +class EveAIUnsupportedFileType(EveAIException): + """Raised when an invalid file type is provided""" + + def __init__(self, message="Filetype is not supported", status_code=400, payload=None): + super().__init__(message, status_code, payload) + + +class EveAIYoutubeError(EveAIException): + """Raised when adding a Youtube document fails""" + + def __init__(self, message="Youtube document creation failed", status_code=400, payload=None): + super().__init__(message, status_code, payload) + +# Add more custom exceptions as needed diff --git a/config/config.py b/config/config.py index fb0fa72..6433b50 100644 --- a/config/config.py +++ b/config/config.py @@ -136,6 +136,10 @@ class Config(object): MAIL_PASSWORD = environ.get('MAIL_PASSWORD') MAIL_DEFAULT_SENDER = ('eveAI Admin', MAIL_USERNAME) + SUPPORTED_FILE_TYPES = ['pdf', 'html', 'md', 'txt', 'mp3', 'mp4', 'ogg', 'srt'] + + + class DevConfig(Config): DEVELOPMENT = True diff --git a/config/logging_config.py b/config/logging_config.py index 790351e..e767138 100644 --- a/config/logging_config.py +++ b/config/logging_config.py @@ -60,6 +60,14 @@ LOGGING = { 'backupCount': 10, 'formatter': 'standard', }, + 'file_api': { + 'level': 'DEBUG', + 'class': 'logging.handlers.RotatingFileHandler', + 'filename': 'logs/eveai_api.log', + 'maxBytes': 1024 * 1024 * 5, # 5MB + 'backupCount': 10, + 'formatter': 'standard', + }, 'file_sqlalchemy': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', @@ -146,6 +154,11 @@ LOGGING = { 'level': 'DEBUG', 'propagate': False }, + 'eveai_api': { # logger for the eveai_chat_workers + 'handlers': ['file_api', 'graylog', ] if env == 'production' else ['file_api', ], + 'level': 'DEBUG', + 'propagate': False + }, 'sqlalchemy.engine': { # logger for the sqlalchemy 'handlers': ['file_sqlalchemy', 'graylog', ] if env == 'production' else ['file_sqlalchemy', ], 'level': 'DEBUG', diff --git a/eveai_api/__init__.py b/eveai_api/__init__.py index 90695a6..8dbc73d 100644 --- a/eveai_api/__init__.py +++ b/eveai_api/__init__.py @@ -1,4 +1,72 @@ -# from flask import Blueprint, request -# -# public_api_bp = Blueprint("public", __name__, url_prefix="/api/v1") -# tenant_api_bp = Blueprint("tenant", __name__, url_prefix="/api/v1/tenant") +from flask import Flask, jsonify +from flask_jwt_extended import get_jwt_identity +from common.extensions import db, api, jwt, minio_client +import os +import logging.config + +from common.utils.database import Database +from config.logging_config import LOGGING +from .api.document_api import AddDocumentResource +from .api.auth import TokenResource +from config.config import get_config +from common.utils.celery_utils import make_celery, init_celery +from common.utils.eveai_exceptions import EveAIException + + +def create_app(config_file=None): + app = Flask(__name__) + + environment = os.getenv('FLASK_ENV', 'development') + + match environment: + case 'development': + app.config.from_object(get_config('dev')) + case 'production': + app.config.from_object(get_config('prod')) + case _: + app.config.from_object(get_config('dev')) + + app.config['SESSION_KEY_PREFIX'] = 'eveai_api_' + + app.celery = make_celery(app.name, app.config) + init_celery(app.celery, app) + + logging.config.dictConfig(LOGGING) + logger = logging.getLogger(__name__) + + logger.info("eveai_api starting up") + + # Register Necessary Extensions + register_extensions(app) + + # Error handler for the API + @app.errorhandler(EveAIException) + def handle_eveai_exception(error): + response = jsonify(error.to_dict()) + response.status_code = error.status_code + return response + + @api.before_request + def before_request(): + # Extract tenant_id from the JWT token + tenant_id = get_jwt_identity() + + # Switch to the correct schema + Database(tenant_id).switch_schema() + + # Register resources + register_api_resources() + + return app + + +def register_extensions(app): + db.init_app(app) + api.init_app(app) + jwt.init_app(app) + minio_client.init_app(app) + + +def register_api_resources(): + api.add_resource(AddDocumentResource, '/api/v1/documents/add_document') + api.add_resource(TokenResource, '/api/v1/token') diff --git a/eveai_api/api/auth.py b/eveai_api/api/auth.py new file mode 100644 index 0000000..9b6c9ef --- /dev/null +++ b/eveai_api/api/auth.py @@ -0,0 +1,24 @@ +from flask_restful import Resource, reqparse +from flask_jwt_extended import create_access_token +from common.models.user import Tenant +from common.extensions import simple_encryption +from flask import current_app + + +class TokenResource(Resource): + def post(self): + parser = reqparse.RequestParser() + parser.add_argument('tenant_id', type=int, required=True) + parser.add_argument('api_key', type=str, required=True) + args = parser.parse_args() + + tenant = Tenant.query.get(args['tenant_id']) + if not tenant: + return {'message': 'Tenant not found'}, 404 + + decrypted_api_key = simple_encryption.decrypt_api_key(tenant.encrypted_api_key) + if args['api_key'] != decrypted_api_key: + return {'message': 'Invalid API key'}, 401 + + access_token = create_access_token(identity={'tenant_id': tenant.id}) + return {'access_token': access_token}, 200 diff --git a/eveai_api/api/document_api.py b/eveai_api/api/document_api.py new file mode 100644 index 0000000..c105c3e --- /dev/null +++ b/eveai_api/api/document_api.py @@ -0,0 +1,178 @@ +from flask_restful import Resource, reqparse +from flask import current_app +from flask_jwt_extended import jwt_required, get_jwt_identity +from werkzeug.datastructures import FileStorage +from werkzeug.utils import secure_filename +from common.utils.document_utils import ( + create_document_stack, process_url, start_embedding_task, + validate_file_type, EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, + process_multiple_urls, prepare_youtube_document +) +from common.utils.eveai_exceptions import EveAIYoutubeError + + +class AddDocumentResource(Resource): + @jwt_required() + def post(self): + parser = reqparse.RequestParser() + parser.add_argument('file', type=FileStorage, location='files', required=True) + parser.add_argument('name', type=str, required=False) + parser.add_argument('language', type=str, required=True) + parser.add_argument('user_context', type=str, required=False) + parser.add_argument('valid_from', type=str, required=False) + args = parser.parse_args() + + tenant_id = get_jwt_identity() + current_app.logger.info(f'Adding document for tenant {tenant_id}') + + try: + file = args['file'] + filename = secure_filename(file.filename) + extension = filename.rsplit('.', 1)[1].lower() + + validate_file_type(extension) + + api_input = { + 'name': args['name'] or filename, + 'language': args['language'], + 'user_context': args['user_context'], + 'valid_from': args['valid_from'] + } + + new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id) + task_id = start_embedding_task(tenant_id, new_doc_vers.id) + + return { + 'message': f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', + 'document_id': new_doc.id, + 'document_version_id': new_doc_vers.id, + 'task_id': task_id + }, 201 + + except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: + return {'message': str(e)}, 400 + except Exception as e: + current_app.logger.error(f'Error adding document: {str(e)}') + return {'message': 'Error adding document'}, 500 + + +class AddURLResource(Resource): + @jwt_required() + def post(self): + parser = reqparse.RequestParser() + parser.add_argument('url', type=str, required=True) + parser.add_argument('name', type=str, required=False) + parser.add_argument('language', type=str, required=True) + parser.add_argument('user_context', type=str, required=False) + parser.add_argument('valid_from', type=str, required=False) + args = parser.parse_args() + + tenant_id = get_jwt_identity() + current_app.logger.info(f'Adding document from URL for tenant {tenant_id}') + + try: + file_content, filename, extension = process_url(args['url'], tenant_id) + + api_input = { + 'url': args['url'], + 'name': args['name'] or filename, + 'language': args['language'], + 'user_context': args['user_context'], + 'valid_from': args['valid_from'] + } + + new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id) + task_id = start_embedding_task(tenant_id, new_doc_vers.id) + + return { + 'message': f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', + 'document_id': new_doc.id, + 'document_version_id': new_doc_vers.id, + 'task_id': task_id + }, 201 + + except EveAIDoubleURLException: + return {'message': f'A document with URL {args["url"]} already exists.'}, 400 + except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: + return {'message': str(e)}, 400 + except Exception as e: + current_app.logger.error(f'Error adding document from URL: {str(e)}') + return {'message': 'Error adding document from URL'}, 500 + + +class AddMultipleURLsResource(Resource): + @jwt_required() + def post(self): + parser = reqparse.RequestParser() + parser.add_argument('urls', type=str, action='append', required=True) + parser.add_argument('name', type=str, required=False) + parser.add_argument('language', type=str, required=True) + parser.add_argument('user_context', type=str, required=False) + parser.add_argument('valid_from', type=str, required=False) + args = parser.parse_args() + + tenant_id = get_jwt_identity() + current_app.logger.info(f'Adding multiple documents from URLs for tenant {tenant_id}') + + try: + api_input = { + 'name': args['name'], + 'language': args['language'], + 'user_context': args['user_context'], + 'valid_from': args['valid_from'] + } + + results = process_multiple_urls(args['urls'], tenant_id, api_input) + + return { + 'message': 'Processing of multiple URLs completed', + 'results': results + }, 201 + + except Exception as e: + current_app.logger.error(f'Error adding documents from URLs: {str(e)}') + return {'message': 'Error adding documents from URLs'}, 500 + + +class AddYoutubeResource(Resource): + @jwt_required() + def post(self): + parser = reqparse.RequestParser() + parser.add_argument('url', type=str, required=True) + parser.add_argument('name', type=str, required=False) + parser.add_argument('language', type=str, required=True) + parser.add_argument('user_context', type=str, required=False) + parser.add_argument('valid_from', type=str, required=False) + args = parser.parse_args() + + tenant_id = get_jwt_identity() + current_app.logger.info(f'Adding YouTube document for tenant {tenant_id}') + + try: + api_input = { + 'name': args['name'], + 'language': args['language'], + 'user_context': args['user_context'], + 'valid_from': args['valid_from'] + } + + new_doc, new_doc_vers = prepare_youtube_document(args['url'], tenant_id, api_input) + task_id = start_embedding_task(tenant_id, new_doc_vers.id) + + return { + 'message': f'Processing on YouTube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', + 'document_id': new_doc.id, + 'document_version_id': new_doc_vers.id, + 'task_id': task_id + }, 201 + + except EveAIYoutubeError as e: + return {'message': str(e)}, 400 + except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: + return {'message': str(e)}, 400 + except Exception as e: + current_app.logger.error(f'Error adding YouTube document: {str(e)}') + return {'message': 'Error adding YouTube document'}, 500 + + +# You can add more API resources here as needed diff --git a/eveai_api/auth.py b/eveai_api/auth.py deleted file mode 100644 index 7fb1d25..0000000 --- a/eveai_api/auth.py +++ /dev/null @@ -1,7 +0,0 @@ -from flask import request -from flask.views import MethodView - -class RegisterAPI(MethodView): - def post(self): - username = request.json['username'] - diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index e095c5b..2704d38 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -27,7 +27,6 @@ def create_app(config_file=None): app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1) environment = os.getenv('FLASK_ENV', 'development') - print(environment) match environment: case 'development': @@ -49,8 +48,6 @@ def create_app(config_file=None): logger = logging.getLogger(__name__) logger.info("eveai_app starting up") - logger.debug("start config") - logger.debug(app.config) # Register extensions @@ -95,14 +92,11 @@ def create_app(config_file=None): } return jsonify(response), 500 - @app.before_request - def before_request(): - # app.logger.debug(f"Before request - Session ID: {session.sid}") - app.logger.debug(f"Before request - Session data: {session}") - app.logger.debug(f"Before request - Request headers: {request.headers}") - - # Register API - register_api(app) + # @app.before_request + # def before_request(): + # # app.logger.debug(f"Before request - Session ID: {session.sid}") + # app.logger.debug(f"Before request - Session data: {session}") + # app.logger.debug(f"Before request - Request headers: {request.headers}") # Register template filters register_filters(app) @@ -138,9 +132,3 @@ def register_blueprints(app): app.register_blueprint(security_bp) from .views.interaction_views import interaction_bp app.register_blueprint(interaction_bp) - - -def register_api(app): - pass - # from . import api - # app.register_blueprint(api.bp, url_prefix='/api') diff --git a/eveai_app/templates/navbar.html b/eveai_app/templates/navbar.html index fde2711..8602420 100644 --- a/eveai_app/templates/navbar.html +++ b/eveai_app/templates/navbar.html @@ -84,7 +84,6 @@ {'name': 'Add Document', 'url': '/document/add_document', 'roles': ['Super User', 'Tenant Admin']}, {'name': 'Add URL', 'url': '/document/add_url', 'roles': ['Super User', 'Tenant Admin']}, {'name': 'Add a list of URLs', 'url': '/document/add_urls', 'roles': ['Super User', 'Tenant Admin']}, - {'name': 'Add Youtube Document' , 'url': '/document/add_youtube', 'roles': ['Super User', 'Tenant Admin']}, {'name': 'All Documents', 'url': '/document/documents', 'roles': ['Super User', 'Tenant Admin']}, {'name': 'All Document Versions', 'url': '/document/document_versions_list', 'roles': ['Super User', 'Tenant Admin']}, {'name': 'Library Operations', 'url': '/document/library_operations', 'roles': ['Super User', 'Tenant Admin']}, diff --git a/eveai_app/views/document_forms.py b/eveai_app/views/document_forms.py index caef422..4094a12 100644 --- a/eveai_app/views/document_forms.py +++ b/eveai_app/views/document_forms.py @@ -1,14 +1,21 @@ -from flask import session +from flask import session, current_app from flask_wtf import FlaskForm from wtforms import (StringField, BooleanField, SubmitField, DateField, SelectField, FieldList, FormField, TextAreaField, URLField) -from wtforms.validators import DataRequired, Length, Optional, URL +from wtforms.validators import DataRequired, Length, Optional, URL, ValidationError from flask_wtf.file import FileField, FileAllowed, FileRequired +def allowed_file(form, field): + if field.data: + filename = field.data.filename + allowed_extensions = current_app.config.get('SUPPORTED_FILE_TYPES', []) + if not ('.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions): + raise ValidationError('Unsupported file type.') + + class AddDocumentForm(FlaskForm): - file = FileField('File', validators=[FileAllowed(['pdf', 'txt', 'html']), - FileRequired()]) + file = FileField('File', validators=[FileRequired(), allowed_file]) name = StringField('Name', validators=[Length(max=100)]) language = SelectField('Language', choices=[], validators=[Optional()]) user_context = TextAreaField('User Context', validators=[Optional()]) diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index ec87f9c..7a66ce1 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -18,6 +18,10 @@ from minio.error import S3Error from common.models.document import Document, DocumentVersion from common.extensions import db, minio_client +from common.utils.document_utils import validate_file_type, create_document_stack, start_embedding_task, process_url, \ + process_multiple_urls, prepare_youtube_document, create_version_for_document, upload_file_for_version +from common.utils.eveai_exceptions import EveAIInvalidLanguageException, EveAIUnsupportedFileType, \ + EveAIDoubleURLException, EveAIYoutubeError from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, AddYoutubeForm, \ AddURLsForm from common.utils.middleware import mw_before_request @@ -56,30 +60,37 @@ def before_request(): @roles_accepted('Super User', 'Tenant Admin') def add_document(): form = AddDocumentForm() + current_app.logger.debug('Adding document') - # If the form is submitted if form.validate_on_submit(): - current_app.logger.info(f'Adding document for tenant {session["tenant"]["id"]}') - file = form.file.data - filename = secure_filename(file.filename) - extension = filename.rsplit('.', 1)[1].lower() - form_dict = form_to_dict(form) + try: + current_app.logger.debug('Validating file type') + tenant_id = session['tenant']['id'] + file = form.file.data + filename = secure_filename(file.filename) + extension = filename.rsplit('.', 1)[1].lower() - new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) + validate_file_type(extension) - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - session['tenant']['id'], - new_doc_vers.id, - ]) - current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' - f'Document Version {new_doc_vers.id}. ' - f'Embedding creation task: {task.id}') - flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', - 'success') + api_input = { + 'name': form.name.data, + 'language': form.language.data, + 'user_context': form.user_context.data, + 'valid_from': form.valid_from.data + } - return redirect(prefixed_url_for('document_bp.documents')) - else: - form_validation_failed(request, form) + new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id) + task_id = start_embedding_task(tenant_id, new_doc_vers.id) + + flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', + 'success') + return redirect(prefixed_url_for('document_bp.documents')) + + except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: + flash(str(e), 'error') + except Exception as e: + current_app.logger.error(f'Error adding document: {str(e)}') + flash('An error occurred while adding the document.', 'error') return render_template('document/add_document.html', form=form) @@ -90,189 +101,107 @@ def add_url(): form = AddURLForm() if form.validate_on_submit(): - current_app.logger.info(f'Adding url for tenant {session["tenant"]["id"]}') - url = form.url.data - try: - response = requests.head(url, allow_redirects=True) - content_type = response.headers.get('Content-Type', '').split(';')[0] + tenant_id = session['tenant']['id'] + url = form.url.data - # Determine file extension based on Content-Type - extension = get_extension_from_content_type(content_type) + file_content, filename, extension = process_url(url, tenant_id) - # Generate filename - parsed_url = urlparse(url) - path = unquote(parsed_url.path) - filename = os.path.basename(path) + api_input = { + 'name': form.name.data or filename, + 'url': url, + 'language': form.language.data, + 'user_context': form.user_context.data, + 'valid_from': form.valid_from.data + } - if not filename or '.' not in filename: - # Use the last part of the path or a default name - filename = path.strip('/').split('/')[-1] or 'document' - filename = secure_filename(f"{filename}.{extension}") - else: - filename = secure_filename(filename) + new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id) + task_id = start_embedding_task(tenant_id, new_doc_vers.id) - # Check if a document with this URL already exists - existing_doc = DocumentVersion.query.filter_by(url=url).first() - if existing_doc: - flash(f'A document with URL {url} already exists. No new document created.', 'info') - return redirect(prefixed_url_for('document_bp.documents')) - - # Download the content - response = requests.get(url) - response.raise_for_status() - file_content = response.content - - # Create document and document version - form_dict = form_to_dict(form) - new_doc, new_doc_vers = create_document_stack(form_dict, file_content, filename, extension) - - # Upload file to storage - minio_client.upload_document_file( - session['tenant']['id'], - new_doc_vers.doc_id, - new_doc_vers.language, - new_doc_vers.id, - filename, - file_content - ) - - # Start embedding task - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - session['tenant']['id'], - new_doc_vers.id, - ]) - - current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' - f'Document Version {new_doc_vers.id}. ' - f'Embedding creation task: {task.id}') - flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', + flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', 'success') - return redirect(prefixed_url_for('document_bp.documents')) - except requests.RequestException as e: - current_app.logger.error(f'Error fetching URL {url}: {str(e)}') - flash(f'Error fetching URL: {str(e)}', 'danger') - except SQLAlchemyError as e: - current_app.logger.error(f'Database error: {str(e)}') - flash('An error occurred while saving the document.', 'danger') + except EveAIDoubleURLException: + flash(f'A document with url {url} already exists. No new document created.', 'info') + except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: + flash(str(e), 'error') except Exception as e: - current_app.logger.error(f'Unexpected error: {str(e)}') - flash('An unexpected error occurred.', 'danger') + current_app.logger.error(f'Error adding document: {str(e)}') + flash('An error occurred while adding the document.', 'error') return render_template('document/add_url.html', form=form) -def get_extension_from_content_type(content_type): - content_type_map = { - 'text/html': 'html', - 'application/pdf': 'pdf', - 'text/plain': 'txt', - 'application/msword': 'doc', - 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', - # Add more mappings as needed - } - return content_type_map.get(content_type, 'html') # Default to 'html' if unknown - - @document_bp.route('/add_urls', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_urls(): form = AddURLsForm() if form.validate_on_submit(): - urls = form.urls.data.split('\n') - urls = [url.strip() for url in urls if url.strip()] + try: + tenant_id = session['tenant']['id'] + urls = form.urls.data.split('\n') + urls = [url.strip() for url in urls if url.strip()] - for i, url in enumerate(urls): - try: - doc_vers = DocumentVersion.query.filter_by(url=url).all() - if doc_vers: - current_app.logger.info(f'A document with url {url} already exists. No new document created.') - flash(f'A document with url {url} already exists. No new document created.', 'info') - continue + api_input = { + 'name': form.name.data, + 'language': form.language.data, + 'user_context': form.user_context.data, + 'valid_from': form.valid_from.data + } - html = fetch_html(url) - file = io.BytesIO(html) + results = process_multiple_urls(urls, tenant_id, api_input) - parsed_url = urlparse(url) - path_parts = parsed_url.path.split('/') - filename = path_parts[-1] if path_parts[-1] else 'index' - if not filename.endswith('.html'): - filename += '.html' + for result in results: + if result['status'] == 'success': + flash( + f"Processed URL: {result['url']} - Document ID: {result['document_id']}, Version ID: {result['document_version_id']}", + 'success') + else: + flash(f"Error processing URL: {result['url']} - {result['message']}", 'error') - # Use the name prefix if provided, otherwise use the filename - doc_name = f"{form.name.data}-{filename}" if form.name.data else filename + return redirect(prefixed_url_for('document_bp.documents')) - new_doc, new_doc_vers = create_document_stack({ - 'name': doc_name, - 'url': url, - 'language': form.language.data, - 'user_context': form.user_context.data, - 'valid_from': form.valid_from.data - }, file, filename, 'html') - - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - session['tenant']['id'], - new_doc_vers.id, - ]) - current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, ' - f'Document Version {new_doc_vers.id}. ' - f'Embedding creation task: {task.id}') - flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', - 'success') - - except Exception as e: - current_app.logger.error(f"Error processing URL {url}: {str(e)}") - flash(f'Error processing URL {url}: {str(e)}', 'danger') - - return redirect(prefixed_url_for('document_bp.documents')) - else: - form_validation_failed(request, form) + except Exception as e: + current_app.logger.error(f'Error adding multiple URLs: {str(e)}') + flash('An error occurred while adding the URLs.', 'error') return render_template('document/add_urls.html', form=form) + @document_bp.route('/add_youtube', methods=['GET', 'POST']) @roles_accepted('Super User', 'Tenant Admin') def add_youtube(): form = AddYoutubeForm() if form.validate_on_submit(): - current_app.logger.info(f'Adding Youtube document for tenant {session["tenant"]["id"]}') - url = form.url.data - current_app.logger.debug(f'Value of language field: {form.language.data}') + try: + tenant_id = session['tenant']['id'] + url = form.url.data - doc_vers = DocumentVersion.query.filter_by(url=url).all() - if doc_vers: - current_app.logger.info(f'A document with url {url} already exists. No new document created.') - flash(f'A document with url {url} already exists. No new document created.', 'info') + api_input = { + 'name': form.name.data, + 'language': form.language.data, + 'user_context': form.user_context.data, + 'valid_from': form.valid_from.data + } + + new_doc, new_doc_vers = prepare_youtube_document(url, tenant_id, api_input) + task_id = start_embedding_task(tenant_id, new_doc_vers.id) + + flash( + f'Processing on YouTube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.', + 'success') return redirect(prefixed_url_for('document_bp.documents')) - # As downloading a Youtube document can take quite some time, we offload this downloading to the worker - # We just pass a simple file to get things conform - file = "Youtube placeholder file" - filename = 'placeholder.youtube' - extension = 'youtube' - form_dict = form_to_dict(form) - current_app.logger.debug(f'Form data: {form_dict}') - - new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension) - - task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ - session['tenant']['id'], - new_doc_vers.id, - ]) - current_app.logger.info(f'Processing and Embedding on Youtube document started for tenant ' - f'{session["tenant"]["id"]}, ' - f'Document Version {new_doc_vers.id}. ' - f'Processing and Embedding Youtube task: {task.id}') - flash(f'Processing on Youtube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.', - 'success') - - return redirect(prefixed_url_for('document_bp.documents')) - else: - form_validation_failed(request, form) + except EveAIYoutubeError as e: + flash(str(e), 'error') + except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e: + flash(str(e), 'error') + except Exception as e: + current_app.logger.error(f'Error adding YouTube document: {str(e)}') + flash('An error occurred while adding the YouTube document.', 'error') return render_template('document/add_youtube.html', form=form) @@ -487,7 +416,7 @@ def refresh_document(doc_id): current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' f'Document Version {new_doc_vers.id}') - upload_file_for_version(new_doc_vers, file, extension) + upload_file_for_version(new_doc_vers, file, extension, session["tenant"]["id"]) task = current_celery.send_task('create_embeddings', queue='embeddings', args=[ session['tenant']['id'], @@ -535,116 +464,11 @@ def update_logging_information(obj, timestamp): obj.updated_by = current_user.id -def create_document_stack(form, file, filename, extension): - # Create the Document - new_doc = create_document(form, filename) - - # Create the DocumentVersion - new_doc_vers = create_version_for_document(new_doc, - form.get('url', ''), - form.get('language', 'en'), - form.get('user_context', '') - ) - - try: - db.session.add(new_doc) - db.session.add(new_doc_vers) - db.session.commit() - except SQLAlchemyError as e: - current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}') - flash('Error adding document.', 'alert') - db.session.rollback() - error = e.args - raise - except Exception as e: - current_app.logger.error('Unknown error') - raise - - current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, ' - f'Document Version {new_doc.id}') - - upload_file_for_version(new_doc_vers, file, extension) - - return new_doc, new_doc_vers - - def log_session_state(session, msg=""): current_app.logger.debug(f"{msg} - Session dirty: {session.dirty}") current_app.logger.debug(f"{msg} - Session new: {session.new}") -def create_document(form, filename): - new_doc = Document() - if form['name'] == '': - new_doc.name = filename.rsplit('.', 1)[0] - else: - new_doc.name = form['name'] - - if form['valid_from'] and form['valid_from'] != '': - new_doc.valid_from = form['valid_from'] - else: - new_doc.valid_from = dt.now(tz.utc) - new_doc.tenant_id = session['tenant']['id'] - set_logging_information(new_doc, dt.now(tz.utc)) - - return new_doc - - -def create_version_for_document(document, url, language, user_context): - new_doc_vers = DocumentVersion() - if url != '': - new_doc_vers.url = url - - if language == '': - new_doc_vers.language = session['default_language'] - else: - new_doc_vers.language = language - - if user_context != '': - new_doc_vers.user_context = user_context - - new_doc_vers.document = document - - set_logging_information(new_doc_vers, dt.now(tz.utc)) - - return new_doc_vers - - -def upload_file_for_version(doc_vers, file, extension): - doc_vers.file_type = extension - doc_vers.file_name = doc_vers.calc_file_name() - doc_vers.file_location = doc_vers.calc_file_location() - - # Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place. - tenant_id = session['tenant']['id'] - minio_client.create_tenant_bucket(tenant_id) - - try: - minio_client.upload_document_file( - tenant_id, - doc_vers.doc_id, - doc_vers.language, - doc_vers.id, - doc_vers.file_name, - file - ) - db.session.commit() - current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for ' - f'document version {doc_vers.id} while uploading file.') - except S3Error as e: - db.session.rollback() - flash('Error saving document to MinIO.', 'error') - current_app.logger.error( - f'Error saving document to MinIO for tenant {tenant_id}: {e}') - raise - except SQLAlchemyError as e: - db.session.rollback() - flash('Error saving document metadata.', 'error') - current_app.logger.error( - f'Error saving document metadata for tenant {tenant_id}: {e}') - raise - - def fetch_html(url): # Fetches HTML content from a URL try: diff --git a/eveai_workers/Processors/audio_processor.py b/eveai_workers/Processors/audio_processor.py new file mode 100644 index 0000000..61813b8 --- /dev/null +++ b/eveai_workers/Processors/audio_processor.py @@ -0,0 +1,187 @@ +import io +import os +from pydub import AudioSegment +import tempfile +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.runnables import RunnablePassthrough +from common.extensions import minio_client +from common.utils.model_utils import create_language_template +from .processor import Processor +import subprocess + + +class AudioProcessor(Processor): + def __init__(self, tenant, model_variables, document_version): + super().__init__(tenant, model_variables, document_version) + self.transcription_client = model_variables['transcription_client'] + self.transcription_model = model_variables['transcription_model'] + self.ffmpeg_path = 'ffmpeg' + + + def process(self): + self._log("Starting Audio processing") + try: + file_data = minio_client.download_document_file( + self.tenant.id, + self.document_version.doc_id, + self.document_version.language, + self.document_version.id, + self.document_version.file_name + ) + + compressed_audio = self._compress_audio(file_data) + transcription = self._transcribe_audio(compressed_audio) + markdown, title = self._generate_markdown_from_transcription(transcription) + + self._save_markdown(markdown) + self._log("Finished processing Audio") + return markdown, title + except Exception as e: + self._log(f"Error processing Audio: {str(e)}", level='error') + raise + + def _compress_audio(self, audio_data): + self._log("Compressing audio") + with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{self.document_version.file_type}') as temp_input: + temp_input.write(audio_data) + temp_input.flush() + + # Use a unique filename for the output to avoid conflicts + output_filename = f'compressed_{os.urandom(8).hex()}.mp3' + output_path = os.path.join(tempfile.gettempdir(), output_filename) + + try: + result = subprocess.run( + [self.ffmpeg_path, '-y', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', output_path], + capture_output=True, + text=True, + check=True + ) + + with open(output_path, 'rb') as f: + compressed_data = f.read() + + # Save compressed audio to MinIO + compressed_filename = f"{self.document_version.id}_compressed.mp3" + minio_client.upload_document_file( + self.tenant.id, + self.document_version.doc_id, + self.document_version.language, + self.document_version.id, + compressed_filename, + compressed_data + ) + self._log(f"Saved compressed audio to MinIO: {compressed_filename}") + + return compressed_data + + except subprocess.CalledProcessError as e: + error_message = f"Compression failed: {e.stderr}" + self._log(error_message, level='error') + raise Exception(error_message) + + finally: + # Clean up temporary files + os.unlink(temp_input.name) + if os.path.exists(output_path): + os.unlink(output_path) + + def _transcribe_audio(self, audio_data): + self._log("Starting audio transcription") + audio = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3") + + segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds + transcriptions = [] + + for i, chunk in enumerate(audio[::segment_length]): + self._log(f'Processing chunk {i + 1} of {len(audio) // segment_length + 1}') + + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: + chunk.export(temp_audio.name, format="mp3") + temp_audio.flush() + + try: + file_size = os.path.getsize(temp_audio.name) + self._log(f"Temporary audio file size: {file_size} bytes") + + with open(temp_audio.name, 'rb') as audio_file: + file_start = audio_file.read(100) + self._log(f"First 100 bytes of audio file: {file_start}") + audio_file.seek(0) # Reset file pointer to the beginning + + self._log("Calling transcription API") + transcription = self.transcription_client.audio.transcriptions.create( + file=audio_file, + model=self.transcription_model, + language=self.document_version.language, + response_format='verbose_json', + ) + self._log("Transcription API call completed") + + if transcription: + # Handle the transcription result based on its type + if isinstance(transcription, str): + self._log(f"Transcription result (string): {transcription[:100]}...") + transcriptions.append(transcription) + elif hasattr(transcription, 'text'): + self._log( + f"Transcription result (object with 'text' attribute): {transcription.text[:100]}...") + transcriptions.append(transcription.text) + else: + self._log(f"Transcription result (unknown type): {str(transcription)[:100]}...") + transcriptions.append(str(transcription)) + else: + self._log("Warning: Received empty transcription", level='warning') + + except Exception as e: + self._log(f"Error during transcription: {str(e)}", level='error') + finally: + os.unlink(temp_audio.name) + + full_transcription = " ".join(filter(None, transcriptions)) + + if not full_transcription: + self._log("Warning: No transcription was generated", level='warning') + full_transcription = "No transcription available." + + # Save transcription to MinIO + transcription_filename = f"{self.document_version.id}_transcription.txt" + minio_client.upload_document_file( + self.tenant.id, + self.document_version.doc_id, + self.document_version.language, + self.document_version.id, + transcription_filename, + full_transcription.encode('utf-8') + ) + self._log(f"Saved transcription to MinIO: {transcription_filename}") + + return full_transcription + + def _generate_markdown_from_transcription(self, transcription): + self._log("Generating markdown from transcription") + llm = self.model_variables['llm'] + template = self.model_variables['transcript_template'] + language_template = create_language_template(template, self.document_version.language) + transcript_prompt = ChatPromptTemplate.from_template(language_template) + setup = RunnablePassthrough() + output_parser = StrOutputParser() + + chain = setup | transcript_prompt | llm | output_parser + + input_transcript = {'transcript': transcription} + markdown = chain.invoke(input_transcript) + + # Extract title from the markdown + title = self._extract_title_from_markdown(markdown) + + return markdown, title + + def _extract_title_from_markdown(self, markdown): + # Simple extraction of the first header as the title + lines = markdown.split('\n') + for line in lines: + if line.startswith('# '): + return line[2:].strip() + return "Untitled Audio Transcription" diff --git a/eveai_workers/Processors/html_processor.py b/eveai_workers/Processors/html_processor.py new file mode 100644 index 0000000..db866fc --- /dev/null +++ b/eveai_workers/Processors/html_processor.py @@ -0,0 +1,142 @@ +from bs4 import BeautifulSoup +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.runnables import RunnablePassthrough +from common.extensions import db, minio_client +from common.utils.model_utils import create_language_template +from .processor import Processor + + +class HTMLProcessor(Processor): + def __init__(self, tenant, model_variables, document_version): + super().__init__(tenant, model_variables, document_version) + self.html_tags = model_variables['html_tags'] + self.html_end_tags = model_variables['html_end_tags'] + self.html_included_elements = model_variables['html_included_elements'] + self.html_excluded_elements = model_variables['html_excluded_elements'] + + def process(self): + self._log("Starting HTML processing") + try: + file_data = minio_client.download_document_file( + self.tenant.id, + self.document_version.doc_id, + self.document_version.language, + self.document_version.id, + self.document_version.file_name + ) + html_content = file_data.decode('utf-8') + + extracted_html, title = self._parse_html(html_content) + markdown = self._generate_markdown_from_html(extracted_html) + + self._save_markdown(markdown) + self._log("Finished processing HTML") + return markdown, title + except Exception as e: + self._log(f"Error processing HTML: {str(e)}", level='error') + raise + + def _parse_html(self, html_content): + self._log(f'Parsing HTML for tenant {self.tenant.id}') + soup = BeautifulSoup(html_content, 'html.parser') + extracted_html = '' + excluded_classes = self._parse_excluded_classes(self.tenant.html_excluded_classes) + + if self.html_included_elements: + elements_to_parse = soup.find_all(self.html_included_elements) + else: + elements_to_parse = [soup] + + for element in elements_to_parse: + for sub_element in element.find_all(self.html_tags): + if self._should_exclude_element(sub_element, excluded_classes): + continue + extracted_html += self._extract_element_content(sub_element) + + title = soup.find('title').get_text(strip=True) if soup.find('title') else '' + + self._log(f'Finished parsing HTML for tenant {self.tenant.id}') + return extracted_html, title + + def _generate_markdown_from_html(self, html_content): + self._log(f'Generating markdown from HTML for tenant {self.tenant.id}') + + llm = self.model_variables['llm'] + template = self.model_variables['html_parse_template'] + parse_prompt = ChatPromptTemplate.from_template(template) + setup = RunnablePassthrough() + output_parser = StrOutputParser() + chain = setup | parse_prompt | llm | output_parser + + soup = BeautifulSoup(html_content, 'lxml') + chunks = self._split_content(soup) + + markdown_chunks = [] + for chunk in chunks: + if self.embed_tuning: + self._log(f'Processing chunk: \n{chunk}\n') + input_html = {"html": chunk} + markdown_chunk = chain.invoke(input_html) + markdown_chunks.append(markdown_chunk) + if self.embed_tuning: + self._log(f'Processed markdown chunk: \n{markdown_chunk}\n') + + markdown = "\n\n".join(markdown_chunks) + self._log(f'Finished generating markdown from HTML for tenant {self.tenant.id}') + return markdown + + def _split_content(self, soup, max_size=20000): + chunks = [] + current_chunk = [] + current_size = 0 + + for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'span', 'table']): + element_html = str(element) + element_size = len(element_html) + + if current_size + element_size > max_size and current_chunk: + chunks.append(''.join(map(str, current_chunk))) + current_chunk = [] + current_size = 0 + + current_chunk.append(element) + current_size += element_size + + if element.name in ['h1', 'h2', 'h3'] and current_size > max_size: + chunks.append(''.join(map(str, current_chunk))) + current_chunk = [] + current_size = 0 + + if current_chunk: + chunks.append(''.join(map(str, current_chunk))) + + return chunks + + def _parse_excluded_classes(self, excluded_classes): + parsed = {} + for rule in excluded_classes: + element, cls = rule.split('.', 1) + parsed.setdefault(element, set()).add(cls) + return parsed + + def _should_exclude_element(self, element, excluded_classes): + if self.html_excluded_elements and element.find_parent(self.html_excluded_elements): + return True + return self._is_element_excluded_by_class(element, excluded_classes) + + def _is_element_excluded_by_class(self, element, excluded_classes): + for parent in element.parents: + if self._element_matches_exclusion(parent, excluded_classes): + return True + return self._element_matches_exclusion(element, excluded_classes) + + def _element_matches_exclusion(self, element, excluded_classes): + if '*' in excluded_classes and any(cls in excluded_classes['*'] for cls in element.get('class', [])): + return True + return element.name in excluded_classes and \ + any(cls in excluded_classes[element.name] for cls in element.get('class', [])) + + def _extract_element_content(self, element): + content = ' '.join(child.strip() for child in element.stripped_strings) + return f'<{element.name}>{content}\n' diff --git a/eveai_workers/Processors/PDF_Processor.py b/eveai_workers/Processors/pdf_processor.py similarity index 88% rename from eveai_workers/Processors/PDF_Processor.py rename to eveai_workers/Processors/pdf_processor.py index 75e40be..7c65085 100644 --- a/eveai_workers/Processors/PDF_Processor.py +++ b/eveai_workers/Processors/pdf_processor.py @@ -5,29 +5,23 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate import re - from langchain_core.runnables import RunnablePassthrough from common.extensions import minio_client from common.utils.model_utils import create_language_template +from .processor import Processor -class PDFProcessor: +class PDFProcessor(Processor): def __init__(self, tenant, model_variables, document_version): - self.tenant = tenant - self.model_variables = model_variables - self.document_version = document_version - - # Configuration parameters from model_variables + super().__init__(tenant, model_variables, document_version) + # PDF-specific initialization self.chunk_size = model_variables['PDF_chunk_size'] self.chunk_overlap = model_variables['PDF_chunk_overlap'] self.min_chunk_size = model_variables['PDF_min_chunk_size'] self.max_chunk_size = model_variables['PDF_max_chunk_size'] - # Set tuning variable for easy use - self.embed_tuning = model_variables['embed_tuning'] - - def process_pdf(self): + def process(self): self._log("Starting PDF processing") try: file_data = minio_client.download_document_file( @@ -51,11 +45,6 @@ class PDFProcessor: self._log(f"Error processing PDF: {str(e)}", level='error') raise - def _log(self, message, level='debug'): - logger = current_app.logger - log_method = getattr(logger, level) - log_method(f"PDFProcessor - Tenant {self.tenant.id}, Document {self.document_version.id}: {message}") - def _extract_content(self, file_data): extracted_content = [] with pdfplumber.open(io.BytesIO(file_data)) as pdf: @@ -248,24 +237,3 @@ class PDFProcessor: markdown_chunks.append(result) return "\n\n".join(markdown_chunks) - - def _save_markdown(self, markdown): - markdown_filename = f"{self.document_version.id}.md" - minio_client.upload_document_file( - self.tenant.id, - self.document_version.doc_id, - self.document_version.language, - self.document_version.id, - markdown_filename, - markdown.encode('utf-8') - ) - - def _save_intermediate(self, content, filename): - minio_client.upload_document_file( - self.tenant.id, - self.document_version.doc_id, - self.document_version.language, - self.document_version.id, - filename, - content.encode('utf-8') - ) diff --git a/eveai_workers/Processors/processor.py b/eveai_workers/Processors/processor.py new file mode 100644 index 0000000..207fd7b --- /dev/null +++ b/eveai_workers/Processors/processor.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod +from flask import current_app +from common.extensions import minio_client + + +class Processor(ABC): + def __init__(self, tenant, model_variables, document_version): + self.tenant = tenant + self.model_variables = model_variables + self.document_version = document_version + self.embed_tuning = model_variables['embed_tuning'] + + @abstractmethod + def process(self): + pass + + def _save_markdown(self, markdown): + markdown_filename = f"{self.document_version.id}.md" + minio_client.upload_document_file( + self.tenant.id, + self.document_version.doc_id, + self.document_version.language, + self.document_version.id, + markdown_filename, + markdown.encode('utf-8') + ) + + def _log(self, message, level='debug'): + logger = current_app.logger + log_method = getattr(logger, level) + log_method( + f"{self.__class__.__name__} - Tenant {self.tenant.id}, Document {self.document_version.id}: {message}") + + def _save_intermediate(self, content, filename): + minio_client.upload_document_file( + self.tenant.id, + self.document_version.doc_id, + self.document_version.language, + self.document_version.id, + filename, + content.encode('utf-8') + ) diff --git a/eveai_workers/Processors/srt_processor.py b/eveai_workers/Processors/srt_processor.py new file mode 100644 index 0000000..41085d0 --- /dev/null +++ b/eveai_workers/Processors/srt_processor.py @@ -0,0 +1,80 @@ +import re +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.runnables import RunnablePassthrough +from common.extensions import minio_client +from common.utils.model_utils import create_language_template +from .processor import Processor + + +class SRTProcessor(Processor): + def __init__(self, tenant, model_variables, document_version): + super().__init__(tenant, model_variables, document_version) + + def process(self): + self._log("Starting SRT processing") + try: + file_data = minio_client.download_document_file( + self.tenant.id, + self.document_version.doc_id, + self.document_version.language, + self.document_version.id, + self.document_version.file_name + ) + + srt_content = file_data.decode('utf-8') + cleaned_transcription = self._clean_srt(srt_content) + markdown, title = self._generate_markdown_from_transcription(cleaned_transcription) + + self._save_markdown(markdown) + self._log("Finished processing SRT") + return markdown, title + except Exception as e: + self._log(f"Error processing SRT: {str(e)}", level='error') + raise + + def _clean_srt(self, srt_content): + # Remove timecodes and subtitle numbers + cleaned_lines = [] + for line in srt_content.split('\n'): + # Skip empty lines, subtitle numbers, and timecodes + if line.strip() and not line.strip().isdigit() and not re.match( + r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', line): + cleaned_lines.append(line.strip()) + + # Join the cleaned lines + cleaned_text = ' '.join(cleaned_lines) + + # Remove any extra spaces + cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() + + return cleaned_text + + def _generate_markdown_from_transcription(self, transcription): + self._log("Generating markdown from transcription") + llm = self.model_variables['llm'] + template = self.model_variables['transcript_template'] + language_template = create_language_template(template, self.document_version.language) + transcript_prompt = ChatPromptTemplate.from_template(language_template) + setup = RunnablePassthrough() + output_parser = StrOutputParser() + + chain = setup | transcript_prompt | llm | output_parser + + input_transcript = {'transcript': transcription} + markdown = chain.invoke(input_transcript) + + # Extract title from the markdown + title = self._extract_title_from_markdown(markdown) + + return markdown, title + + def _extract_title_from_markdown(self, markdown): + # Simple extraction of the first header as the title + lines = markdown.split('\n') + for line in lines: + if line.startswith('# '): + return line[2:].strip() + return "Untitled SRT Transcription" + + diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 3ad0126..8f56544 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -1,26 +1,16 @@ import io import os from datetime import datetime as dt, timezone as tz -import subprocess -import gevent -from bs4 import BeautifulSoup -import html from celery import states from flask import current_app # OpenAI imports -from langchain.chains.summarize import load_summarize_chain -from langchain.text_splitter import CharacterTextSplitter, MarkdownHeaderTextSplitter -from langchain_core.documents import Document +from langchain.text_splitter import MarkdownHeaderTextSplitter from langchain_core.exceptions import LangChainException from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from sqlalchemy.exc import SQLAlchemyError -from pytube import YouTube -import PyPDF2 -from pydub import AudioSegment -import tempfile from common.extensions import db, minio_client from common.models.document import DocumentVersion, Embedding @@ -29,7 +19,10 @@ from common.utils.celery_utils import current_celery from common.utils.database import Database from common.utils.model_utils import select_model_variables, create_language_template from common.utils.os_utils import safe_remove, sync_folder -from eveai_workers.Processors.PDF_Processor import PDFProcessor +from eveai_workers.Processors.audio_processor import AudioProcessor +from eveai_workers.Processors.html_processor import HTMLProcessor +from eveai_workers.Processors.pdf_processor import PDFProcessor +from eveai_workers.Processors.srt_processor import SRTProcessor @current_celery.task(name='create_embeddings', queue='embeddings') @@ -85,8 +78,10 @@ def create_embeddings(tenant_id, document_version_id): process_pdf(tenant, model_variables, document_version) case 'html': process_html(tenant, model_variables, document_version) - case 'youtube': - process_youtube(tenant, model_variables, document_version) + case 'srt': + process_srt(tenant, model_variables, document_version) + case 'mp4' | 'mp3' | 'ogg': + process_audio(tenant, model_variables, document_version) case _: raise Exception(f'No functionality defined for file type {document_version.file_type} ' f'for tenant {tenant_id} ' @@ -104,83 +99,6 @@ def create_embeddings(tenant_id, document_version_id): raise -# def process_pdf(tenant, model_variables, document_version): -# file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language, -# document_version.id, document_version.file_name) -# -# pdf_text = '' -# pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data)) -# for page in pdf_reader.pages: -# pdf_text += page.extract_text() -# -# markdown = generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_text) -# markdown_file_name = f'{document_version.id}.md' -# minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, -# document_version.id, -# markdown_file_name, markdown.encode()) -# -# potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) -# chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], -# model_variables['max_chunk_size']) -# -# if len(chunks) > 1: -# summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) -# document_version.system_context = f'Summary: {summary}\n' -# else: -# document_version.system_context = '' -# -# enriched_chunks = enrich_chunks(tenant, document_version, chunks) -# embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) -# -# try: -# db.session.add(document_version) -# document_version.processing_finished_at = dt.now(tz.utc) -# document_version.processing = False -# db.session.add_all(embeddings) -# db.session.commit() -# except SQLAlchemyError as e: -# current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' -# f'on HTML, document version {document_version.id}' -# f'error: {e}') -# raise -# -# current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' -# f'on document version {document_version.id} :-)') - -def process_pdf(tenant, model_variables, document_version): - processor = PDFProcessor(tenant, model_variables, document_version) - markdown, title = processor.process_pdf() - - # Create potential chunks for embedding - potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") - - # Combine chunks for embedding - chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], - model_variables['max_chunk_size']) - - # Enrich chunks - enriched_chunks = enrich_chunks(tenant, document_version, title, chunks) - - # Create embeddings - embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) - - # Update document version and save embeddings - try: - db.session.add(document_version) - document_version.processing_finished_at = dt.now(tz.utc) - document_version.processing = False - db.session.add_all(embeddings) - db.session.commit() - except SQLAlchemyError as e: - current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' - f'on PDF, document version {document_version.id}' - f'error: {e}') - raise - - current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' - f'on document version {document_version.id} :-)') - - def delete_embeddings_for_document_version(document_version): embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all() for embedding in embeddings_to_delete: @@ -193,45 +111,53 @@ def delete_embeddings_for_document_version(document_version): raise +def process_pdf(tenant, model_variables, document_version): + processor = PDFProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() + + # Process markdown and embed + embed_markdown(tenant, model_variables, document_version, markdown, title) + + def process_html(tenant, model_variables, document_version): - file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language, - document_version.id, document_version.file_name) - html_content = file_data.decode('utf-8') + processor = HTMLProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() - # The tags to be considered can be dependent on the tenant - html_tags = model_variables['html_tags'] - html_end_tags = model_variables['html_end_tags'] - html_included_elements = model_variables['html_included_elements'] - html_excluded_elements = model_variables['html_excluded_elements'] + # Process markdown and embed + embed_markdown(tenant, model_variables, document_version, markdown, title) - extracted_html, title = parse_html(tenant, html_content, html_tags, included_elements=html_included_elements, - excluded_elements=html_excluded_elements) - extracted_file_name = f'{document_version.id}-extracted.html' - minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, - document_version.id, - extracted_file_name, extracted_html.encode()) +def process_audio(tenant, model_variables, document_version): + processor = AudioProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() - markdown = generate_markdown_from_html(tenant, model_variables, document_version, extracted_html) - markdown_file_name = f'{document_version.id}.md' - minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language, - document_version.id, - markdown_file_name, markdown.encode()) + # Process markdown and embed + embed_markdown(tenant, model_variables, document_version, markdown, title) - potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) + +def process_srt(tenant, model_variables, document_version): + processor = SRTProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() + + # Process markdown and embed + embed_markdown(tenant, model_variables, document_version, markdown, title) + + +def embed_markdown(tenant, model_variables, document_version, markdown, title): + # Create potential chunks + potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") + + # Combine chunks for embedding chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], model_variables['max_chunk_size']) - if len(chunks) > 1: - summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) - document_version.system_context = (f'Title: {title}\n' - f'Summary: {summary}\n') - else: - document_version.system_context = (f'Title: {title}\n') + # Enrich chunks + enriched_chunks = enrich_chunks(tenant, model_variables, document_version, title, chunks) - enriched_chunks = enrich_chunks(tenant, document_version, title, chunks) + # Create embeddings embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) + # Update document version and save embeddings try: db.session.add(document_version) document_version.processing_finished_at = dt.now(tz.utc) @@ -248,12 +174,18 @@ def process_html(tenant, model_variables, document_version): f'on document version {document_version.id} :-)') -def enrich_chunks(tenant, document_version, title, chunks): +def enrich_chunks(tenant, model_variables, document_version, title, chunks): current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') - current_app.logger.debug(f'Nr of chunks: {len(chunks)}') + + summary = '' + if len(chunks) > 1: + summary = summarize_chunk(tenant, model_variables, document_version, chunks[0]) + chunk_total_context = (f'Filename: {document_version.file_name}\n' f'User Context:\n{document_version.user_context}\n\n' + f'Title: {title}\n' + f'{summary}\n' f'{document_version.system_context}\n\n') enriched_chunks = [] initial_chunk = (f'Filename: {document_version.file_name}\n' @@ -272,95 +204,6 @@ def enrich_chunks(tenant, document_version, title, chunks): return enriched_chunks -def generate_markdown_from_html(tenant, model_variables, document_version, html_content): - current_app.logger.debug(f'Generating markdown from HTML for tenant {tenant.id} ' - f'on document version {document_version.id}') - - llm = model_variables['llm'] - template = model_variables['html_parse_template'] - parse_prompt = ChatPromptTemplate.from_template(template) - setup = RunnablePassthrough() - output_parser = StrOutputParser() - chain = setup | parse_prompt | llm | output_parser - - soup = BeautifulSoup(html_content, 'lxml') - - def split_content(soup, max_size=20000): - chunks = [] - current_chunk = [] - current_size = 0 - - for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'span', 'table']): - element_html = str(element) - element_size = len(element_html) - - if current_size + element_size > max_size and current_chunk: - chunks.append(''.join(map(str, current_chunk))) - current_chunk = [] - current_size = 0 - - current_chunk.append(element) - current_size += element_size - - if element.name in ['h1', 'h2', 'h3'] and current_size > max_size: - chunks.append(''.join(map(str, current_chunk))) - current_chunk = [] - current_size = 0 - - if current_chunk: - chunks.append(''.join(map(str, current_chunk))) - - return chunks - - chunks = split_content(soup) - - markdown_chunks = [] - - for chunk in chunks: - current_app.logger.debug(f'Processing chunk to generate markdown from HTML for tenant {tenant.id} ' - f'on document version {document_version.id}') - if tenant.embed_tuning: - current_app.embed_tuning_logger.debug(f'Processing chunk: \n ' - f'------------------\n' - f'{chunk}\n' - f'------------------\n') - input_html = {"html": chunk} - markdown_chunk = chain.invoke(input_html) - markdown_chunks.append(markdown_chunk) - if tenant.embed_tuning: - current_app.embed_tuning_logger.debug(f'Processed markdown chunk: \n ' - f'-------------------------\n' - f'{markdown_chunk}\n' - f'-------------------------\n') - current_app.logger.debug(f'Finished processing chunk to generate markdown from HTML for tenant {tenant.id} ' - f'on document version {document_version.id}') - - # Combine all markdown chunks - markdown = "\n\n".join(markdown_chunks) - - current_app.logger.debug(f'Finished generating markdown from HTML for tenant {tenant.id} ' - f'on document version {document_version.id}') - - return markdown - - -def generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_content): - current_app.logger.debug(f'Generating Markdown from PDF for tenant {tenant.id} ' - f'on document version {document_version.id}') - llm = model_variables['llm'] - template = model_variables['pdf_parse_template'] - parse_prompt = ChatPromptTemplate.from_template(template) - setup = RunnablePassthrough() - output_parser = StrOutputParser() - - chain = setup | parse_prompt | llm | output_parser - input_pdf = {"pdf_content": pdf_content} - - markdown = chain.invoke(input_pdf) - - return markdown - - def summarize_chunk(tenant, model_variables, document_version, chunk): current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') @@ -415,65 +258,6 @@ def embed_chunks(tenant, model_variables, document_version, chunks): return new_embeddings -def parse_html(tenant, html_content, tags, included_elements=None, excluded_elements=None): - current_app.logger.debug(f'Parsing HTML for tenant {tenant.id}') - soup = BeautifulSoup(html_content, 'html.parser') - extracted_html = '' - excluded_classes = parse_excluded_classes(tenant.html_excluded_classes) - - if included_elements: - elements_to_parse = soup.find_all(included_elements) - else: - elements_to_parse = [soup] - - log_parsing_info(tenant, tags, included_elements, excluded_elements, excluded_classes, elements_to_parse) - - for element in elements_to_parse: - for sub_element in element.find_all(tags): - if should_exclude_element(sub_element, excluded_elements, excluded_classes): - continue - extracted_html += extract_element_content(sub_element) - - title = soup.find('title').get_text(strip=True) if soup.find('title') else '' - - current_app.logger.debug(f'Finished parsing HTML for tenant {tenant.id}') - - return extracted_html, title - - -def parse_excluded_classes(excluded_classes): - parsed = {} - for rule in excluded_classes: - element, cls = rule.split('.', 1) - parsed.setdefault(element, set()).add(cls) - return parsed - - -def should_exclude_element(element, excluded_elements, excluded_classes): - if excluded_elements and element.find_parent(excluded_elements): - return True - return is_element_excluded_by_class(element, excluded_classes) - - -def is_element_excluded_by_class(element, excluded_classes): - for parent in element.parents: - if element_matches_exclusion(parent, excluded_classes): - return True - return element_matches_exclusion(element, excluded_classes) - - -def element_matches_exclusion(element, excluded_classes): - if '*' in excluded_classes and any(cls in excluded_classes['*'] for cls in element.get('class', [])): - return True - return element.name in excluded_classes and \ - any(cls in excluded_classes[element.name] for cls in element.get('class', [])) - - -def extract_element_content(element): - content = ' '.join(child.strip() for child in element.stripped_strings) - return f'<{element.name}>{content}\n' - - def log_parsing_info(tenant, tags, included_elements, excluded_elements, excluded_classes, elements_to_parse): if tenant.embed_tuning: current_app.embed_tuning_logger.debug(f'Tags to parse: {tags}') @@ -484,244 +268,242 @@ def log_parsing_info(tenant, tags, included_elements, excluded_elements, exclude current_app.embed_tuning_logger.debug(f'First element to parse: {elements_to_parse[0]}') -def process_youtube(tenant, model_variables, document_version): - base_path = os.path.join(current_app.config['UPLOAD_FOLDER'], - document_version.file_location) - download_file_name = f'{document_version.id}.mp4' - compressed_file_name = f'{document_version.id}.mp3' - transcription_file_name = f'{document_version.id}.txt' - markdown_file_name = f'{document_version.id}.md' - - # Remove existing files (in case of a re-processing of the file - minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, - document_version.id, download_file_name) - minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, - document_version.id, compressed_file_name) - minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, - document_version.id, transcription_file_name) - minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, - document_version.id, markdown_file_name) - - of, title, description, author = download_youtube(document_version.url, tenant.id, document_version, - download_file_name) - document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}' - compress_audio(tenant.id, document_version, download_file_name, compressed_file_name) - transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables) - annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables) - - potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) - actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], - model_variables['max_chunk_size']) - - enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks) - embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) - - try: - db.session.add(document_version) - document_version.processing_finished_at = dt.now(tz.utc) - document_version.processing = False - db.session.add_all(embeddings) - db.session.commit() - except SQLAlchemyError as e: - current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' - f'on Youtube document version {document_version.id}' - f'error: {e}') - raise - - current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' - f'on Youtube document version {document_version.id} :-)') - - -def download_youtube(url, tenant_id, document_version, file_name): - try: - current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}') - yt = YouTube(url) - stream = yt.streams.get_audio_only() - - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - stream.download(output_path=temp_file.name) - with open(temp_file.name, 'rb') as f: - file_data = f.read() - - minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, - document_version.id, - file_name, file_data) - - current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}') - return file_name, yt.title, yt.description, yt.author - except Exception as e: - current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}') - raise - - -def compress_audio(tenant_id, document_version, input_file, output_file): - try: - current_app.logger.info(f'Compressing audio for tenant: {tenant_id}') - - input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, - document_version.id, input_file) - - with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input: - temp_input.write(input_data) - temp_input.flush() - - with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output: - result = subprocess.run( - ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name], - capture_output=True, - text=True - ) - - if result.returncode != 0: - raise Exception(f"Compression failed: {result.stderr}") - - with open(temp_output.name, 'rb') as f: - compressed_data = f.read() - - minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, - document_version.id, - output_file, compressed_data) - - current_app.logger.info(f'Compressed audio for tenant: {tenant_id}') - except Exception as e: - current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}') - raise - - -def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables): - try: - current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}') - client = model_variables['transcription_client'] - model = model_variables['transcription_model'] - - # Download the audio file from MinIO - audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, - document_version.id, input_file) - - # Load the audio data into pydub - audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) - - # Define segment length (e.g., 10 minutes) - segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds - - transcriptions = [] - - # Split audio into segments and transcribe each - for i, chunk in enumerate(audio[::segment_length]): - current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}') - - with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: - chunk.export(temp_audio.name, format="mp3") - - with open(temp_audio.name, 'rb') as audio_segment: - transcription = client.audio.transcriptions.create( - file=audio_segment, - model=model, - language=document_version.language, - response_format='verbose_json', - ) - - transcriptions.append(transcription.text) - - os.unlink(temp_audio.name) # Delete the temporary file - - # Combine all transcriptions - full_transcription = " ".join(transcriptions) - - # Upload the full transcription to MinIO - minio_client.upload_document_file( - tenant_id, - document_version.doc_id, - document_version.language, - document_version.id, - output_file, - full_transcription.encode('utf-8') - ) - - current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}') - except Exception as e: - current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}') - raise - - -def annotate_transcription(tenant, document_version, input_file, output_file, model_variables): - try: - current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}') - - char_splitter = CharacterTextSplitter(separator='.', - chunk_size=model_variables['annotation_chunk_length'], - chunk_overlap=0) - - headers_to_split_on = [ - ("#", "Header 1"), - ("##", "Header 2"), - ] - markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) - - llm = model_variables['llm'] - template = model_variables['transcript_template'] - language_template = create_language_template(template, document_version.language) - transcript_prompt = ChatPromptTemplate.from_template(language_template) - setup = RunnablePassthrough() - output_parser = StrOutputParser() - - # Download the transcription file from MinIO - transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id, - document_version.language, document_version.id, - input_file) - transcript = transcript_data.decode('utf-8') - - chain = setup | transcript_prompt | llm | output_parser - - chunks = char_splitter.split_text(transcript) - all_markdown_chunks = [] - last_markdown_chunk = '' - for chunk in chunks: - current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}') - full_input = last_markdown_chunk + '\n' + chunk - if tenant.embed_tuning: - current_app.embed_tuning_logger.debug(f'Annotating chunk: \n ' - f'------------------\n' - f'{full_input}\n' - f'------------------\n') - input_transcript = {'transcript': full_input} - markdown = chain.invoke(input_transcript) - # GPT-4o returns some kind of content description: ```markdown ``` - if markdown.startswith("```markdown"): - markdown = "\n".join(markdown.strip().split("\n")[1:-1]) - if tenant.embed_tuning: - current_app.embed_tuning_logger.debug(f'Markdown Received: \n ' - f'------------------\n' - f'{markdown}\n' - f'------------------\n') - md_header_splits = markdown_splitter.split_text(markdown) - markdown_chunks = [doc.page_content for doc in md_header_splits] - # claude-3.5-sonnet returns introductory text - if not markdown_chunks[0].startswith('#'): - markdown_chunks.pop(0) - last_markdown_chunk = markdown_chunks[-1] - last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:]) - markdown_chunks.pop() - all_markdown_chunks += markdown_chunks - - all_markdown_chunks += [last_markdown_chunk] - - annotated_transcript = '\n'.join(all_markdown_chunks) - - # Upload the annotated transcript to MinIO - minio_client.upload_document_file( - tenant.id, - document_version.doc_id, - document_version.language, - document_version.id, - output_file, - annotated_transcript.encode('utf-8') - ) - - current_app.logger.info(f'Annotated transcription for tenant {tenant.id}') - except Exception as e: - current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}') - raise +# def process_youtube(tenant, model_variables, document_version): +# download_file_name = f'{document_version.id}.mp4' +# compressed_file_name = f'{document_version.id}.mp3' +# transcription_file_name = f'{document_version.id}.txt' +# markdown_file_name = f'{document_version.id}.md' +# +# # Remove existing files (in case of a re-processing of the file +# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, +# document_version.id, download_file_name) +# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, +# document_version.id, compressed_file_name) +# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, +# document_version.id, transcription_file_name) +# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, +# document_version.id, markdown_file_name) +# +# of, title, description, author = download_youtube(document_version.url, tenant.id, document_version, +# download_file_name) +# document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}' +# compress_audio(tenant.id, document_version, download_file_name, compressed_file_name) +# transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables) +# annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables) +# +# potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) +# actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], +# model_variables['max_chunk_size']) +# +# enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks) +# embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) +# +# try: +# db.session.add(document_version) +# document_version.processing_finished_at = dt.now(tz.utc) +# document_version.processing = False +# db.session.add_all(embeddings) +# db.session.commit() +# except SQLAlchemyError as e: +# current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' +# f'on Youtube document version {document_version.id}' +# f'error: {e}') +# raise +# +# current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' +# f'on Youtube document version {document_version.id} :-)') +# +# +# def download_youtube(url, tenant_id, document_version, file_name): +# try: +# current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}') +# yt = YouTube(url) +# stream = yt.streams.get_audio_only() +# +# with tempfile.NamedTemporaryFile(delete=False) as temp_file: +# stream.download(output_path=temp_file.name) +# with open(temp_file.name, 'rb') as f: +# file_data = f.read() +# +# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, +# document_version.id, +# file_name, file_data) +# +# current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}') +# return file_name, yt.title, yt.description, yt.author +# except Exception as e: +# current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}') +# raise +# +# +# def compress_audio(tenant_id, document_version, input_file, output_file): +# try: +# current_app.logger.info(f'Compressing audio for tenant: {tenant_id}') +# +# input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, +# document_version.id, input_file) +# +# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input: +# temp_input.write(input_data) +# temp_input.flush() +# +# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output: +# result = subprocess.run( +# ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name], +# capture_output=True, +# text=True +# ) +# +# if result.returncode != 0: +# raise Exception(f"Compression failed: {result.stderr}") +# +# with open(temp_output.name, 'rb') as f: +# compressed_data = f.read() +# +# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, +# document_version.id, +# output_file, compressed_data) +# +# current_app.logger.info(f'Compressed audio for tenant: {tenant_id}') +# except Exception as e: +# current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}') +# raise +# +# +# def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables): +# try: +# current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}') +# client = model_variables['transcription_client'] +# model = model_variables['transcription_model'] +# +# # Download the audio file from MinIO +# audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, +# document_version.id, input_file) +# +# # Load the audio data into pydub +# audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) +# +# # Define segment length (e.g., 10 minutes) +# segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds +# +# transcriptions = [] +# +# # Split audio into segments and transcribe each +# for i, chunk in enumerate(audio[::segment_length]): +# current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}') +# +# with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: +# chunk.export(temp_audio.name, format="mp3") +# +# with open(temp_audio.name, 'rb') as audio_segment: +# transcription = client.audio.transcriptions.create( +# file=audio_segment, +# model=model, +# language=document_version.language, +# response_format='verbose_json', +# ) +# +# transcriptions.append(transcription.text) +# +# os.unlink(temp_audio.name) # Delete the temporary file +# +# # Combine all transcriptions +# full_transcription = " ".join(transcriptions) +# +# # Upload the full transcription to MinIO +# minio_client.upload_document_file( +# tenant_id, +# document_version.doc_id, +# document_version.language, +# document_version.id, +# output_file, +# full_transcription.encode('utf-8') +# ) +# +# current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}') +# except Exception as e: +# current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}') +# raise +# +# +# def annotate_transcription(tenant, document_version, input_file, output_file, model_variables): +# try: +# current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}') +# +# char_splitter = CharacterTextSplitter(separator='.', +# chunk_size=model_variables['annotation_chunk_length'], +# chunk_overlap=0) +# +# headers_to_split_on = [ +# ("#", "Header 1"), +# ("##", "Header 2"), +# ] +# markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) +# +# llm = model_variables['llm'] +# template = model_variables['transcript_template'] +# language_template = create_language_template(template, document_version.language) +# transcript_prompt = ChatPromptTemplate.from_template(language_template) +# setup = RunnablePassthrough() +# output_parser = StrOutputParser() +# +# # Download the transcription file from MinIO +# transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id, +# document_version.language, document_version.id, +# input_file) +# transcript = transcript_data.decode('utf-8') +# +# chain = setup | transcript_prompt | llm | output_parser +# +# chunks = char_splitter.split_text(transcript) +# all_markdown_chunks = [] +# last_markdown_chunk = '' +# for chunk in chunks: +# current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}') +# full_input = last_markdown_chunk + '\n' + chunk +# if tenant.embed_tuning: +# current_app.embed_tuning_logger.debug(f'Annotating chunk: \n ' +# f'------------------\n' +# f'{full_input}\n' +# f'------------------\n') +# input_transcript = {'transcript': full_input} +# markdown = chain.invoke(input_transcript) +# # GPT-4o returns some kind of content description: ```markdown ``` +# if markdown.startswith("```markdown"): +# markdown = "\n".join(markdown.strip().split("\n")[1:-1]) +# if tenant.embed_tuning: +# current_app.embed_tuning_logger.debug(f'Markdown Received: \n ' +# f'------------------\n' +# f'{markdown}\n' +# f'------------------\n') +# md_header_splits = markdown_splitter.split_text(markdown) +# markdown_chunks = [doc.page_content for doc in md_header_splits] +# # claude-3.5-sonnet returns introductory text +# if not markdown_chunks[0].startswith('#'): +# markdown_chunks.pop(0) +# last_markdown_chunk = markdown_chunks[-1] +# last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:]) +# markdown_chunks.pop() +# all_markdown_chunks += markdown_chunks +# +# all_markdown_chunks += [last_markdown_chunk] +# +# annotated_transcript = '\n'.join(all_markdown_chunks) +# +# # Upload the annotated transcript to MinIO +# minio_client.upload_document_file( +# tenant.id, +# document_version.doc_id, +# document_version.language, +# document_version.id, +# output_file, +# annotated_transcript.encode('utf-8') +# ) +# +# current_app.logger.info(f'Annotated transcription for tenant {tenant.id}') +# except Exception as e: +# current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}') +# raise def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): @@ -779,4 +561,3 @@ def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars): actual_chunks.append(current_chunk) return actual_chunks - diff --git a/requirements.txt b/requirements.txt index 355a274..d64048e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -60,7 +60,6 @@ urllib3~=2.2.2 WTForms~=3.1.2 wtforms-html5~=0.6.1 zxcvbn~=4.4.28 -pytube~=15.0.0 groq~=0.9.0 pydub~=0.25.1 argparse~=1.4.0 @@ -73,4 +72,5 @@ graypy~=2.1.0 lxml~=5.3.0 pillow~=10.4.0 pdfplumber~=0.11.4 -PyPDF2~=3.0.1 \ No newline at end of file +PyPDF2~=3.0.1 +Flask-RESTful~=0.3.10 \ No newline at end of file