- Improvements on document uploads (accept other files than html-files when entering a URL)

- Introduction of API-functionality (to be continued). Deduplication of document and url uploads between views and api.
- Improvements on document processing - introduction of processor classes to streamline document inputs
- Removed pure Youtube functionality, as Youtube retrieval of documents continuously changes. But added upload of srt, mp3, ogg and mp4
This commit is contained in:
Josako
2024-09-02 12:37:44 +02:00
parent a158655247
commit d3d497fc2c
21 changed files with 1425 additions and 852 deletions

View File

@@ -9,6 +9,7 @@ from flask_socketio import SocketIO
from flask_jwt_extended import JWTManager
from flask_session import Session
from flask_wtf import CSRFProtect
from flask_restful import Api
from .utils.nginx_utils import prefixed_url_for
from .utils.simple_encryption import SimpleEncryption
@@ -27,6 +28,7 @@ cors = CORS()
socketio = SocketIO()
jwt = JWTManager()
session = Session()
api = Api()
# kms_client = JosKMSClient.from_service_account_json('config/gc_sa_eveai.json')

View File

@@ -12,7 +12,7 @@ class Document(db.Model):
# Versioning Information
created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now())
created_by = db.Column(db.Integer, db.ForeignKey(User.id), nullable=False)
created_by = db.Column(db.Integer, db.ForeignKey(User.id), nullable=True)
updated_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now(), onupdate=db.func.now())
updated_by = db.Column(db.Integer, db.ForeignKey(User.id))

View File

@@ -0,0 +1,230 @@
from datetime import datetime as dt, timezone as tz
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.utils import secure_filename
from common.models.document import Document, DocumentVersion
from common.extensions import db, minio_client
from common.utils.celery_utils import current_celery
from flask import current_app
import requests
from urllib.parse import urlparse, unquote
import os
from .eveai_exceptions import EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType, \
EveAIYoutubeError
def create_document_stack(api_input, file, filename, extension, tenant_id):
# Create the Document
new_doc = create_document(api_input, filename, tenant_id)
db.session.add(new_doc)
# Create the DocumentVersion
new_doc_vers = create_version_for_document(new_doc,
api_input.get('url', ''),
api_input.get('language', 'en'),
api_input.get('user_context', '')
)
db.session.add(new_doc_vers)
try:
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error adding document for tenant {tenant_id}: {e}')
db.session.rollback()
raise
current_app.logger.info(f'Document added successfully for tenant {tenant_id}, '
f'Document Version {new_doc.id}')
# Upload file to storage
upload_file_for_version(new_doc_vers, file, extension, tenant_id)
return new_doc, new_doc_vers
def create_document(form, filename, tenant_id):
new_doc = Document()
if form['name'] == '':
new_doc.name = filename.rsplit('.', 1)[0]
else:
new_doc.name = form['name']
if form['valid_from'] and form['valid_from'] != '':
new_doc.valid_from = form['valid_from']
else:
new_doc.valid_from = dt.now(tz.utc)
new_doc.tenant_id = tenant_id
set_logging_information(new_doc, dt.now(tz.utc))
return new_doc
def create_version_for_document(document, url, language, user_context):
new_doc_vers = DocumentVersion()
if url != '':
new_doc_vers.url = url
if language == '':
raise EveAIInvalidLanguageException('Language is required for document creation!')
else:
new_doc_vers.language = language
if user_context != '':
new_doc_vers.user_context = user_context
new_doc_vers.document = document
set_logging_information(new_doc_vers, dt.now(tz.utc))
return new_doc_vers
def upload_file_for_version(doc_vers, file, extension, tenant_id):
doc_vers.file_type = extension
doc_vers.file_name = doc_vers.calc_file_name()
doc_vers.file_location = doc_vers.calc_file_location()
# Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place.
minio_client.create_tenant_bucket(tenant_id)
try:
minio_client.upload_document_file(
tenant_id,
doc_vers.doc_id,
doc_vers.language,
doc_vers.id,
doc_vers.file_name,
file
)
db.session.commit()
current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for '
f'document version {doc_vers.id} while uploading file.')
except Exception as e:
db.session.rollback()
current_app.logger.error(
f'Error saving document to MinIO for tenant {tenant_id}: {e}')
raise
def set_logging_information(obj, timestamp):
obj.created_at = timestamp
obj.updated_at = timestamp
def update_logging_information(obj, timestamp):
obj.updated_at = timestamp
def get_extension_from_content_type(content_type):
content_type_map = {
'text/html': 'html',
'application/pdf': 'pdf',
'text/plain': 'txt',
'application/msword': 'doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
# Add more mappings as needed
}
return content_type_map.get(content_type, 'html') # Default to 'html' if unknown
def process_url(url, tenant_id):
response = requests.head(url, allow_redirects=True)
content_type = response.headers.get('Content-Type', '').split(';')[0]
# Determine file extension based on Content-Type
extension = get_extension_from_content_type(content_type)
# Generate filename
parsed_url = urlparse(url)
path = unquote(parsed_url.path)
filename = os.path.basename(path)
if not filename or '.' not in filename:
# Use the last part of the path or a default name
filename = path.strip('/').split('/')[-1] or 'document'
filename = secure_filename(f"{filename}.{extension}")
else:
filename = secure_filename(filename)
# Check if a document with this URL already exists
existing_doc = DocumentVersion.query.filter_by(url=url).first()
if existing_doc:
raise EveAIDoubleURLException
# Download the content
response = requests.get(url)
response.raise_for_status()
file_content = response.content
return file_content, filename, extension
def process_multiple_urls(urls, tenant_id, api_input):
results = []
for url in urls:
try:
file_content, filename, extension = process_url(url, tenant_id)
url_input = api_input.copy()
url_input.update({
'url': url,
'name': f"{api_input['name']}-{filename}" if api_input['name'] else filename
})
new_doc, new_doc_vers = create_document_stack(url_input, file_content, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
results.append({
'url': url,
'document_id': new_doc.id,
'document_version_id': new_doc_vers.id,
'task_id': task_id,
'status': 'success'
})
except Exception as e:
current_app.logger.error(f"Error processing URL {url}: {str(e)}")
results.append({
'url': url,
'status': 'error',
'message': str(e)
})
return results
def prepare_youtube_document(url, tenant_id, api_input):
try:
filename = f"placeholder.youtube"
extension = 'youtube'
new_doc = create_document(api_input, filename, tenant_id)
new_doc_vers = create_version_for_document(new_doc, url, api_input['language'], api_input['user_context'])
new_doc_vers.file_type = extension
new_doc_vers.file_name = new_doc_vers.calc_file_name()
new_doc_vers.file_location = new_doc_vers.calc_file_location()
db.session.add(new_doc)
db.session.add(new_doc_vers)
db.session.commit()
return new_doc, new_doc_vers
except Exception as e:
raise EveAIYoutubeError(f"Error preparing YouTube document: {str(e)}")
def start_embedding_task(tenant_id, doc_vers_id):
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
tenant_id,
doc_vers_id,
])
current_app.logger.info(f'Embedding creation started for tenant {tenant_id}, '
f'Document Version {doc_vers_id}. '
f'Embedding creation task: {task.id}')
return task.id
def validate_file_type(extension):
current_app.logger.debug(f'Validating file type {extension}')
current_app.logger.debug(f'Supported file types: {current_app.config["SUPPORTED_FILE_TYPES"]}')
if extension not in current_app.config['SUPPORTED_FILE_TYPES']:
raise EveAIUnsupportedFileType(f"Filetype {extension} is currently not supported. "
f"Supported filetypes: {', '.join(current_app.config['SUPPORTED_FILE_TYPES'])}")

View File

@@ -0,0 +1,43 @@
class EveAIException(Exception):
"""Base exception class for EveAI API"""
def __init__(self, message, status_code=400, payload=None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or ())
rv['message'] = self.message
return rv
class EveAIInvalidLanguageException(EveAIException):
"""Raised when an invalid language is provided"""
def __init__(self, message="Langage is required", status_code=400, payload=None):
super().__init__(message, status_code, payload)
class EveAIDoubleURLException(EveAIException):
"""Raised when an existing url is provided"""
def __init__(self, message="URL already exists", status_code=400, payload=None):
super().__init__(message, status_code, payload)
class EveAIUnsupportedFileType(EveAIException):
"""Raised when an invalid file type is provided"""
def __init__(self, message="Filetype is not supported", status_code=400, payload=None):
super().__init__(message, status_code, payload)
class EveAIYoutubeError(EveAIException):
"""Raised when adding a Youtube document fails"""
def __init__(self, message="Youtube document creation failed", status_code=400, payload=None):
super().__init__(message, status_code, payload)
# Add more custom exceptions as needed

View File

@@ -136,6 +136,10 @@ class Config(object):
MAIL_PASSWORD = environ.get('MAIL_PASSWORD')
MAIL_DEFAULT_SENDER = ('eveAI Admin', MAIL_USERNAME)
SUPPORTED_FILE_TYPES = ['pdf', 'html', 'md', 'txt', 'mp3', 'mp4', 'ogg', 'srt']
class DevConfig(Config):
DEVELOPMENT = True

View File

@@ -60,6 +60,14 @@ LOGGING = {
'backupCount': 10,
'formatter': 'standard',
},
'file_api': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'filename': 'logs/eveai_api.log',
'maxBytes': 1024 * 1024 * 5, # 5MB
'backupCount': 10,
'formatter': 'standard',
},
'file_sqlalchemy': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
@@ -146,6 +154,11 @@ LOGGING = {
'level': 'DEBUG',
'propagate': False
},
'eveai_api': { # logger for the eveai_chat_workers
'handlers': ['file_api', 'graylog', ] if env == 'production' else ['file_api', ],
'level': 'DEBUG',
'propagate': False
},
'sqlalchemy.engine': { # logger for the sqlalchemy
'handlers': ['file_sqlalchemy', 'graylog', ] if env == 'production' else ['file_sqlalchemy', ],
'level': 'DEBUG',

View File

@@ -1,4 +1,72 @@
# from flask import Blueprint, request
#
# public_api_bp = Blueprint("public", __name__, url_prefix="/api/v1")
# tenant_api_bp = Blueprint("tenant", __name__, url_prefix="/api/v1/tenant")
from flask import Flask, jsonify
from flask_jwt_extended import get_jwt_identity
from common.extensions import db, api, jwt, minio_client
import os
import logging.config
from common.utils.database import Database
from config.logging_config import LOGGING
from .api.document_api import AddDocumentResource
from .api.auth import TokenResource
from config.config import get_config
from common.utils.celery_utils import make_celery, init_celery
from common.utils.eveai_exceptions import EveAIException
def create_app(config_file=None):
app = Flask(__name__)
environment = os.getenv('FLASK_ENV', 'development')
match environment:
case 'development':
app.config.from_object(get_config('dev'))
case 'production':
app.config.from_object(get_config('prod'))
case _:
app.config.from_object(get_config('dev'))
app.config['SESSION_KEY_PREFIX'] = 'eveai_api_'
app.celery = make_celery(app.name, app.config)
init_celery(app.celery, app)
logging.config.dictConfig(LOGGING)
logger = logging.getLogger(__name__)
logger.info("eveai_api starting up")
# Register Necessary Extensions
register_extensions(app)
# Error handler for the API
@app.errorhandler(EveAIException)
def handle_eveai_exception(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response
@api.before_request
def before_request():
# Extract tenant_id from the JWT token
tenant_id = get_jwt_identity()
# Switch to the correct schema
Database(tenant_id).switch_schema()
# Register resources
register_api_resources()
return app
def register_extensions(app):
db.init_app(app)
api.init_app(app)
jwt.init_app(app)
minio_client.init_app(app)
def register_api_resources():
api.add_resource(AddDocumentResource, '/api/v1/documents/add_document')
api.add_resource(TokenResource, '/api/v1/token')

24
eveai_api/api/auth.py Normal file
View File

@@ -0,0 +1,24 @@
from flask_restful import Resource, reqparse
from flask_jwt_extended import create_access_token
from common.models.user import Tenant
from common.extensions import simple_encryption
from flask import current_app
class TokenResource(Resource):
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('tenant_id', type=int, required=True)
parser.add_argument('api_key', type=str, required=True)
args = parser.parse_args()
tenant = Tenant.query.get(args['tenant_id'])
if not tenant:
return {'message': 'Tenant not found'}, 404
decrypted_api_key = simple_encryption.decrypt_api_key(tenant.encrypted_api_key)
if args['api_key'] != decrypted_api_key:
return {'message': 'Invalid API key'}, 401
access_token = create_access_token(identity={'tenant_id': tenant.id})
return {'access_token': access_token}, 200

View File

@@ -0,0 +1,178 @@
from flask_restful import Resource, reqparse
from flask import current_app
from flask_jwt_extended import jwt_required, get_jwt_identity
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
from common.utils.document_utils import (
create_document_stack, process_url, start_embedding_task,
validate_file_type, EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType,
process_multiple_urls, prepare_youtube_document
)
from common.utils.eveai_exceptions import EveAIYoutubeError
class AddDocumentResource(Resource):
@jwt_required()
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('file', type=FileStorage, location='files', required=True)
parser.add_argument('name', type=str, required=False)
parser.add_argument('language', type=str, required=True)
parser.add_argument('user_context', type=str, required=False)
parser.add_argument('valid_from', type=str, required=False)
args = parser.parse_args()
tenant_id = get_jwt_identity()
current_app.logger.info(f'Adding document for tenant {tenant_id}')
try:
file = args['file']
filename = secure_filename(file.filename)
extension = filename.rsplit('.', 1)[1].lower()
validate_file_type(extension)
api_input = {
'name': args['name'] or filename,
'language': args['language'],
'user_context': args['user_context'],
'valid_from': args['valid_from']
}
new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
return {
'message': f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'document_id': new_doc.id,
'document_version_id': new_doc_vers.id,
'task_id': task_id
}, 201
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
return {'message': str(e)}, 400
except Exception as e:
current_app.logger.error(f'Error adding document: {str(e)}')
return {'message': 'Error adding document'}, 500
class AddURLResource(Resource):
@jwt_required()
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('url', type=str, required=True)
parser.add_argument('name', type=str, required=False)
parser.add_argument('language', type=str, required=True)
parser.add_argument('user_context', type=str, required=False)
parser.add_argument('valid_from', type=str, required=False)
args = parser.parse_args()
tenant_id = get_jwt_identity()
current_app.logger.info(f'Adding document from URL for tenant {tenant_id}')
try:
file_content, filename, extension = process_url(args['url'], tenant_id)
api_input = {
'url': args['url'],
'name': args['name'] or filename,
'language': args['language'],
'user_context': args['user_context'],
'valid_from': args['valid_from']
}
new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
return {
'message': f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'document_id': new_doc.id,
'document_version_id': new_doc_vers.id,
'task_id': task_id
}, 201
except EveAIDoubleURLException:
return {'message': f'A document with URL {args["url"]} already exists.'}, 400
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
return {'message': str(e)}, 400
except Exception as e:
current_app.logger.error(f'Error adding document from URL: {str(e)}')
return {'message': 'Error adding document from URL'}, 500
class AddMultipleURLsResource(Resource):
@jwt_required()
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('urls', type=str, action='append', required=True)
parser.add_argument('name', type=str, required=False)
parser.add_argument('language', type=str, required=True)
parser.add_argument('user_context', type=str, required=False)
parser.add_argument('valid_from', type=str, required=False)
args = parser.parse_args()
tenant_id = get_jwt_identity()
current_app.logger.info(f'Adding multiple documents from URLs for tenant {tenant_id}')
try:
api_input = {
'name': args['name'],
'language': args['language'],
'user_context': args['user_context'],
'valid_from': args['valid_from']
}
results = process_multiple_urls(args['urls'], tenant_id, api_input)
return {
'message': 'Processing of multiple URLs completed',
'results': results
}, 201
except Exception as e:
current_app.logger.error(f'Error adding documents from URLs: {str(e)}')
return {'message': 'Error adding documents from URLs'}, 500
class AddYoutubeResource(Resource):
@jwt_required()
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('url', type=str, required=True)
parser.add_argument('name', type=str, required=False)
parser.add_argument('language', type=str, required=True)
parser.add_argument('user_context', type=str, required=False)
parser.add_argument('valid_from', type=str, required=False)
args = parser.parse_args()
tenant_id = get_jwt_identity()
current_app.logger.info(f'Adding YouTube document for tenant {tenant_id}')
try:
api_input = {
'name': args['name'],
'language': args['language'],
'user_context': args['user_context'],
'valid_from': args['valid_from']
}
new_doc, new_doc_vers = prepare_youtube_document(args['url'], tenant_id, api_input)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
return {
'message': f'Processing on YouTube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'document_id': new_doc.id,
'document_version_id': new_doc_vers.id,
'task_id': task_id
}, 201
except EveAIYoutubeError as e:
return {'message': str(e)}, 400
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
return {'message': str(e)}, 400
except Exception as e:
current_app.logger.error(f'Error adding YouTube document: {str(e)}')
return {'message': 'Error adding YouTube document'}, 500
# You can add more API resources here as needed

View File

@@ -1,7 +0,0 @@
from flask import request
from flask.views import MethodView
class RegisterAPI(MethodView):
def post(self):
username = request.json['username']

View File

@@ -27,7 +27,6 @@ def create_app(config_file=None):
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
environment = os.getenv('FLASK_ENV', 'development')
print(environment)
match environment:
case 'development':
@@ -49,8 +48,6 @@ def create_app(config_file=None):
logger = logging.getLogger(__name__)
logger.info("eveai_app starting up")
logger.debug("start config")
logger.debug(app.config)
# Register extensions
@@ -95,14 +92,11 @@ def create_app(config_file=None):
}
return jsonify(response), 500
@app.before_request
def before_request():
# app.logger.debug(f"Before request - Session ID: {session.sid}")
app.logger.debug(f"Before request - Session data: {session}")
app.logger.debug(f"Before request - Request headers: {request.headers}")
# Register API
register_api(app)
# @app.before_request
# def before_request():
# # app.logger.debug(f"Before request - Session ID: {session.sid}")
# app.logger.debug(f"Before request - Session data: {session}")
# app.logger.debug(f"Before request - Request headers: {request.headers}")
# Register template filters
register_filters(app)
@@ -138,9 +132,3 @@ def register_blueprints(app):
app.register_blueprint(security_bp)
from .views.interaction_views import interaction_bp
app.register_blueprint(interaction_bp)
def register_api(app):
pass
# from . import api
# app.register_blueprint(api.bp, url_prefix='/api')

View File

@@ -84,7 +84,6 @@
{'name': 'Add Document', 'url': '/document/add_document', 'roles': ['Super User', 'Tenant Admin']},
{'name': 'Add URL', 'url': '/document/add_url', 'roles': ['Super User', 'Tenant Admin']},
{'name': 'Add a list of URLs', 'url': '/document/add_urls', 'roles': ['Super User', 'Tenant Admin']},
{'name': 'Add Youtube Document' , 'url': '/document/add_youtube', 'roles': ['Super User', 'Tenant Admin']},
{'name': 'All Documents', 'url': '/document/documents', 'roles': ['Super User', 'Tenant Admin']},
{'name': 'All Document Versions', 'url': '/document/document_versions_list', 'roles': ['Super User', 'Tenant Admin']},
{'name': 'Library Operations', 'url': '/document/library_operations', 'roles': ['Super User', 'Tenant Admin']},

View File

@@ -1,14 +1,21 @@
from flask import session
from flask import session, current_app
from flask_wtf import FlaskForm
from wtforms import (StringField, BooleanField, SubmitField, DateField,
SelectField, FieldList, FormField, TextAreaField, URLField)
from wtforms.validators import DataRequired, Length, Optional, URL
from wtforms.validators import DataRequired, Length, Optional, URL, ValidationError
from flask_wtf.file import FileField, FileAllowed, FileRequired
def allowed_file(form, field):
if field.data:
filename = field.data.filename
allowed_extensions = current_app.config.get('SUPPORTED_FILE_TYPES', [])
if not ('.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions):
raise ValidationError('Unsupported file type.')
class AddDocumentForm(FlaskForm):
file = FileField('File', validators=[FileAllowed(['pdf', 'txt', 'html']),
FileRequired()])
file = FileField('File', validators=[FileRequired(), allowed_file])
name = StringField('Name', validators=[Length(max=100)])
language = SelectField('Language', choices=[], validators=[Optional()])
user_context = TextAreaField('User Context', validators=[Optional()])

View File

@@ -18,6 +18,10 @@ from minio.error import S3Error
from common.models.document import Document, DocumentVersion
from common.extensions import db, minio_client
from common.utils.document_utils import validate_file_type, create_document_stack, start_embedding_task, process_url, \
process_multiple_urls, prepare_youtube_document, create_version_for_document, upload_file_for_version
from common.utils.eveai_exceptions import EveAIInvalidLanguageException, EveAIUnsupportedFileType, \
EveAIDoubleURLException, EveAIYoutubeError
from .document_forms import AddDocumentForm, AddURLForm, EditDocumentForm, EditDocumentVersionForm, AddYoutubeForm, \
AddURLsForm
from common.utils.middleware import mw_before_request
@@ -56,30 +60,37 @@ def before_request():
@roles_accepted('Super User', 'Tenant Admin')
def add_document():
form = AddDocumentForm()
current_app.logger.debug('Adding document')
# If the form is submitted
if form.validate_on_submit():
current_app.logger.info(f'Adding document for tenant {session["tenant"]["id"]}')
file = form.file.data
filename = secure_filename(file.filename)
extension = filename.rsplit('.', 1)[1].lower()
form_dict = form_to_dict(form)
try:
current_app.logger.debug('Validating file type')
tenant_id = session['tenant']['id']
file = form.file.data
filename = secure_filename(file.filename)
extension = filename.rsplit('.', 1)[1].lower()
new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension)
validate_file_type(extension)
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
session['tenant']['id'],
new_doc_vers.id,
])
current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, '
f'Document Version {new_doc_vers.id}. '
f'Embedding creation task: {task.id}')
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.',
'success')
api_input = {
'name': form.name.data,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data
}
return redirect(prefixed_url_for('document_bp.documents'))
else:
form_validation_failed(request, form)
new_doc, new_doc_vers = create_document_stack(api_input, file, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents'))
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
flash(str(e), 'error')
except Exception as e:
current_app.logger.error(f'Error adding document: {str(e)}')
flash('An error occurred while adding the document.', 'error')
return render_template('document/add_document.html', form=form)
@@ -90,189 +101,107 @@ def add_url():
form = AddURLForm()
if form.validate_on_submit():
current_app.logger.info(f'Adding url for tenant {session["tenant"]["id"]}')
url = form.url.data
try:
response = requests.head(url, allow_redirects=True)
content_type = response.headers.get('Content-Type', '').split(';')[0]
tenant_id = session['tenant']['id']
url = form.url.data
# Determine file extension based on Content-Type
extension = get_extension_from_content_type(content_type)
file_content, filename, extension = process_url(url, tenant_id)
# Generate filename
parsed_url = urlparse(url)
path = unquote(parsed_url.path)
filename = os.path.basename(path)
api_input = {
'name': form.name.data or filename,
'url': url,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data
}
if not filename or '.' not in filename:
# Use the last part of the path or a default name
filename = path.strip('/').split('/')[-1] or 'document'
filename = secure_filename(f"{filename}.{extension}")
else:
filename = secure_filename(filename)
new_doc, new_doc_vers = create_document_stack(api_input, file_content, filename, extension, tenant_id)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
# Check if a document with this URL already exists
existing_doc = DocumentVersion.query.filter_by(url=url).first()
if existing_doc:
flash(f'A document with URL {url} already exists. No new document created.', 'info')
return redirect(prefixed_url_for('document_bp.documents'))
# Download the content
response = requests.get(url)
response.raise_for_status()
file_content = response.content
# Create document and document version
form_dict = form_to_dict(form)
new_doc, new_doc_vers = create_document_stack(form_dict, file_content, filename, extension)
# Upload file to storage
minio_client.upload_document_file(
session['tenant']['id'],
new_doc_vers.doc_id,
new_doc_vers.language,
new_doc_vers.id,
filename,
file_content
)
# Start embedding task
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
session['tenant']['id'],
new_doc_vers.id,
])
current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, '
f'Document Version {new_doc_vers.id}. '
f'Embedding creation task: {task.id}')
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.',
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents'))
except requests.RequestException as e:
current_app.logger.error(f'Error fetching URL {url}: {str(e)}')
flash(f'Error fetching URL: {str(e)}', 'danger')
except SQLAlchemyError as e:
current_app.logger.error(f'Database error: {str(e)}')
flash('An error occurred while saving the document.', 'danger')
except EveAIDoubleURLException:
flash(f'A document with url {url} already exists. No new document created.', 'info')
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
flash(str(e), 'error')
except Exception as e:
current_app.logger.error(f'Unexpected error: {str(e)}')
flash('An unexpected error occurred.', 'danger')
current_app.logger.error(f'Error adding document: {str(e)}')
flash('An error occurred while adding the document.', 'error')
return render_template('document/add_url.html', form=form)
def get_extension_from_content_type(content_type):
content_type_map = {
'text/html': 'html',
'application/pdf': 'pdf',
'text/plain': 'txt',
'application/msword': 'doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
# Add more mappings as needed
}
return content_type_map.get(content_type, 'html') # Default to 'html' if unknown
@document_bp.route('/add_urls', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def add_urls():
form = AddURLsForm()
if form.validate_on_submit():
urls = form.urls.data.split('\n')
urls = [url.strip() for url in urls if url.strip()]
try:
tenant_id = session['tenant']['id']
urls = form.urls.data.split('\n')
urls = [url.strip() for url in urls if url.strip()]
for i, url in enumerate(urls):
try:
doc_vers = DocumentVersion.query.filter_by(url=url).all()
if doc_vers:
current_app.logger.info(f'A document with url {url} already exists. No new document created.')
flash(f'A document with url {url} already exists. No new document created.', 'info')
continue
api_input = {
'name': form.name.data,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data
}
html = fetch_html(url)
file = io.BytesIO(html)
results = process_multiple_urls(urls, tenant_id, api_input)
parsed_url = urlparse(url)
path_parts = parsed_url.path.split('/')
filename = path_parts[-1] if path_parts[-1] else 'index'
if not filename.endswith('.html'):
filename += '.html'
for result in results:
if result['status'] == 'success':
flash(
f"Processed URL: {result['url']} - Document ID: {result['document_id']}, Version ID: {result['document_version_id']}",
'success')
else:
flash(f"Error processing URL: {result['url']} - {result['message']}", 'error')
# Use the name prefix if provided, otherwise use the filename
doc_name = f"{form.name.data}-{filename}" if form.name.data else filename
return redirect(prefixed_url_for('document_bp.documents'))
new_doc, new_doc_vers = create_document_stack({
'name': doc_name,
'url': url,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data
}, file, filename, 'html')
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
session['tenant']['id'],
new_doc_vers.id,
])
current_app.logger.info(f'Embedding creation started for tenant {session["tenant"]["id"]}, '
f'Document Version {new_doc_vers.id}. '
f'Embedding creation task: {task.id}')
flash(f'Processing on document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.',
'success')
except Exception as e:
current_app.logger.error(f"Error processing URL {url}: {str(e)}")
flash(f'Error processing URL {url}: {str(e)}', 'danger')
return redirect(prefixed_url_for('document_bp.documents'))
else:
form_validation_failed(request, form)
except Exception as e:
current_app.logger.error(f'Error adding multiple URLs: {str(e)}')
flash('An error occurred while adding the URLs.', 'error')
return render_template('document/add_urls.html', form=form)
@document_bp.route('/add_youtube', methods=['GET', 'POST'])
@roles_accepted('Super User', 'Tenant Admin')
def add_youtube():
form = AddYoutubeForm()
if form.validate_on_submit():
current_app.logger.info(f'Adding Youtube document for tenant {session["tenant"]["id"]}')
url = form.url.data
current_app.logger.debug(f'Value of language field: {form.language.data}')
try:
tenant_id = session['tenant']['id']
url = form.url.data
doc_vers = DocumentVersion.query.filter_by(url=url).all()
if doc_vers:
current_app.logger.info(f'A document with url {url} already exists. No new document created.')
flash(f'A document with url {url} already exists. No new document created.', 'info')
api_input = {
'name': form.name.data,
'language': form.language.data,
'user_context': form.user_context.data,
'valid_from': form.valid_from.data
}
new_doc, new_doc_vers = prepare_youtube_document(url, tenant_id, api_input)
task_id = start_embedding_task(tenant_id, new_doc_vers.id)
flash(
f'Processing on YouTube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task_id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents'))
# As downloading a Youtube document can take quite some time, we offload this downloading to the worker
# We just pass a simple file to get things conform
file = "Youtube placeholder file"
filename = 'placeholder.youtube'
extension = 'youtube'
form_dict = form_to_dict(form)
current_app.logger.debug(f'Form data: {form_dict}')
new_doc, new_doc_vers = create_document_stack(form_dict, file, filename, extension)
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
session['tenant']['id'],
new_doc_vers.id,
])
current_app.logger.info(f'Processing and Embedding on Youtube document started for tenant '
f'{session["tenant"]["id"]}, '
f'Document Version {new_doc_vers.id}. '
f'Processing and Embedding Youtube task: {task.id}')
flash(f'Processing on Youtube document {new_doc.name}, version {new_doc_vers.id} started. Task ID: {task.id}.',
'success')
return redirect(prefixed_url_for('document_bp.documents'))
else:
form_validation_failed(request, form)
except EveAIYoutubeError as e:
flash(str(e), 'error')
except (EveAIInvalidLanguageException, EveAIUnsupportedFileType) as e:
flash(str(e), 'error')
except Exception as e:
current_app.logger.error(f'Error adding YouTube document: {str(e)}')
flash('An error occurred while adding the YouTube document.', 'error')
return render_template('document/add_youtube.html', form=form)
@@ -487,7 +416,7 @@ def refresh_document(doc_id):
current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, '
f'Document Version {new_doc_vers.id}')
upload_file_for_version(new_doc_vers, file, extension)
upload_file_for_version(new_doc_vers, file, extension, session["tenant"]["id"])
task = current_celery.send_task('create_embeddings', queue='embeddings', args=[
session['tenant']['id'],
@@ -535,116 +464,11 @@ def update_logging_information(obj, timestamp):
obj.updated_by = current_user.id
def create_document_stack(form, file, filename, extension):
# Create the Document
new_doc = create_document(form, filename)
# Create the DocumentVersion
new_doc_vers = create_version_for_document(new_doc,
form.get('url', ''),
form.get('language', 'en'),
form.get('user_context', '')
)
try:
db.session.add(new_doc)
db.session.add(new_doc_vers)
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {e}')
flash('Error adding document.', 'alert')
db.session.rollback()
error = e.args
raise
except Exception as e:
current_app.logger.error('Unknown error')
raise
current_app.logger.info(f'Document added successfully for tenant {session["tenant"]["id"]}, '
f'Document Version {new_doc.id}')
upload_file_for_version(new_doc_vers, file, extension)
return new_doc, new_doc_vers
def log_session_state(session, msg=""):
current_app.logger.debug(f"{msg} - Session dirty: {session.dirty}")
current_app.logger.debug(f"{msg} - Session new: {session.new}")
def create_document(form, filename):
new_doc = Document()
if form['name'] == '':
new_doc.name = filename.rsplit('.', 1)[0]
else:
new_doc.name = form['name']
if form['valid_from'] and form['valid_from'] != '':
new_doc.valid_from = form['valid_from']
else:
new_doc.valid_from = dt.now(tz.utc)
new_doc.tenant_id = session['tenant']['id']
set_logging_information(new_doc, dt.now(tz.utc))
return new_doc
def create_version_for_document(document, url, language, user_context):
new_doc_vers = DocumentVersion()
if url != '':
new_doc_vers.url = url
if language == '':
new_doc_vers.language = session['default_language']
else:
new_doc_vers.language = language
if user_context != '':
new_doc_vers.user_context = user_context
new_doc_vers.document = document
set_logging_information(new_doc_vers, dt.now(tz.utc))
return new_doc_vers
def upload_file_for_version(doc_vers, file, extension):
doc_vers.file_type = extension
doc_vers.file_name = doc_vers.calc_file_name()
doc_vers.file_location = doc_vers.calc_file_location()
# Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place.
tenant_id = session['tenant']['id']
minio_client.create_tenant_bucket(tenant_id)
try:
minio_client.upload_document_file(
tenant_id,
doc_vers.doc_id,
doc_vers.language,
doc_vers.id,
doc_vers.file_name,
file
)
db.session.commit()
current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for '
f'document version {doc_vers.id} while uploading file.')
except S3Error as e:
db.session.rollback()
flash('Error saving document to MinIO.', 'error')
current_app.logger.error(
f'Error saving document to MinIO for tenant {tenant_id}: {e}')
raise
except SQLAlchemyError as e:
db.session.rollback()
flash('Error saving document metadata.', 'error')
current_app.logger.error(
f'Error saving document metadata for tenant {tenant_id}: {e}')
raise
def fetch_html(url):
# Fetches HTML content from a URL
try:

View File

@@ -0,0 +1,187 @@
import io
import os
from pydub import AudioSegment
import tempfile
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from common.extensions import minio_client
from common.utils.model_utils import create_language_template
from .processor import Processor
import subprocess
class AudioProcessor(Processor):
def __init__(self, tenant, model_variables, document_version):
super().__init__(tenant, model_variables, document_version)
self.transcription_client = model_variables['transcription_client']
self.transcription_model = model_variables['transcription_model']
self.ffmpeg_path = 'ffmpeg'
def process(self):
self._log("Starting Audio processing")
try:
file_data = minio_client.download_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
self.document_version.file_name
)
compressed_audio = self._compress_audio(file_data)
transcription = self._transcribe_audio(compressed_audio)
markdown, title = self._generate_markdown_from_transcription(transcription)
self._save_markdown(markdown)
self._log("Finished processing Audio")
return markdown, title
except Exception as e:
self._log(f"Error processing Audio: {str(e)}", level='error')
raise
def _compress_audio(self, audio_data):
self._log("Compressing audio")
with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{self.document_version.file_type}') as temp_input:
temp_input.write(audio_data)
temp_input.flush()
# Use a unique filename for the output to avoid conflicts
output_filename = f'compressed_{os.urandom(8).hex()}.mp3'
output_path = os.path.join(tempfile.gettempdir(), output_filename)
try:
result = subprocess.run(
[self.ffmpeg_path, '-y', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', output_path],
capture_output=True,
text=True,
check=True
)
with open(output_path, 'rb') as f:
compressed_data = f.read()
# Save compressed audio to MinIO
compressed_filename = f"{self.document_version.id}_compressed.mp3"
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
compressed_filename,
compressed_data
)
self._log(f"Saved compressed audio to MinIO: {compressed_filename}")
return compressed_data
except subprocess.CalledProcessError as e:
error_message = f"Compression failed: {e.stderr}"
self._log(error_message, level='error')
raise Exception(error_message)
finally:
# Clean up temporary files
os.unlink(temp_input.name)
if os.path.exists(output_path):
os.unlink(output_path)
def _transcribe_audio(self, audio_data):
self._log("Starting audio transcription")
audio = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds
transcriptions = []
for i, chunk in enumerate(audio[::segment_length]):
self._log(f'Processing chunk {i + 1} of {len(audio) // segment_length + 1}')
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio:
chunk.export(temp_audio.name, format="mp3")
temp_audio.flush()
try:
file_size = os.path.getsize(temp_audio.name)
self._log(f"Temporary audio file size: {file_size} bytes")
with open(temp_audio.name, 'rb') as audio_file:
file_start = audio_file.read(100)
self._log(f"First 100 bytes of audio file: {file_start}")
audio_file.seek(0) # Reset file pointer to the beginning
self._log("Calling transcription API")
transcription = self.transcription_client.audio.transcriptions.create(
file=audio_file,
model=self.transcription_model,
language=self.document_version.language,
response_format='verbose_json',
)
self._log("Transcription API call completed")
if transcription:
# Handle the transcription result based on its type
if isinstance(transcription, str):
self._log(f"Transcription result (string): {transcription[:100]}...")
transcriptions.append(transcription)
elif hasattr(transcription, 'text'):
self._log(
f"Transcription result (object with 'text' attribute): {transcription.text[:100]}...")
transcriptions.append(transcription.text)
else:
self._log(f"Transcription result (unknown type): {str(transcription)[:100]}...")
transcriptions.append(str(transcription))
else:
self._log("Warning: Received empty transcription", level='warning')
except Exception as e:
self._log(f"Error during transcription: {str(e)}", level='error')
finally:
os.unlink(temp_audio.name)
full_transcription = " ".join(filter(None, transcriptions))
if not full_transcription:
self._log("Warning: No transcription was generated", level='warning')
full_transcription = "No transcription available."
# Save transcription to MinIO
transcription_filename = f"{self.document_version.id}_transcription.txt"
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
transcription_filename,
full_transcription.encode('utf-8')
)
self._log(f"Saved transcription to MinIO: {transcription_filename}")
return full_transcription
def _generate_markdown_from_transcription(self, transcription):
self._log("Generating markdown from transcription")
llm = self.model_variables['llm']
template = self.model_variables['transcript_template']
language_template = create_language_template(template, self.document_version.language)
transcript_prompt = ChatPromptTemplate.from_template(language_template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | transcript_prompt | llm | output_parser
input_transcript = {'transcript': transcription}
markdown = chain.invoke(input_transcript)
# Extract title from the markdown
title = self._extract_title_from_markdown(markdown)
return markdown, title
def _extract_title_from_markdown(self, markdown):
# Simple extraction of the first header as the title
lines = markdown.split('\n')
for line in lines:
if line.startswith('# '):
return line[2:].strip()
return "Untitled Audio Transcription"

View File

@@ -0,0 +1,142 @@
from bs4 import BeautifulSoup
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from common.extensions import db, minio_client
from common.utils.model_utils import create_language_template
from .processor import Processor
class HTMLProcessor(Processor):
def __init__(self, tenant, model_variables, document_version):
super().__init__(tenant, model_variables, document_version)
self.html_tags = model_variables['html_tags']
self.html_end_tags = model_variables['html_end_tags']
self.html_included_elements = model_variables['html_included_elements']
self.html_excluded_elements = model_variables['html_excluded_elements']
def process(self):
self._log("Starting HTML processing")
try:
file_data = minio_client.download_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
self.document_version.file_name
)
html_content = file_data.decode('utf-8')
extracted_html, title = self._parse_html(html_content)
markdown = self._generate_markdown_from_html(extracted_html)
self._save_markdown(markdown)
self._log("Finished processing HTML")
return markdown, title
except Exception as e:
self._log(f"Error processing HTML: {str(e)}", level='error')
raise
def _parse_html(self, html_content):
self._log(f'Parsing HTML for tenant {self.tenant.id}')
soup = BeautifulSoup(html_content, 'html.parser')
extracted_html = ''
excluded_classes = self._parse_excluded_classes(self.tenant.html_excluded_classes)
if self.html_included_elements:
elements_to_parse = soup.find_all(self.html_included_elements)
else:
elements_to_parse = [soup]
for element in elements_to_parse:
for sub_element in element.find_all(self.html_tags):
if self._should_exclude_element(sub_element, excluded_classes):
continue
extracted_html += self._extract_element_content(sub_element)
title = soup.find('title').get_text(strip=True) if soup.find('title') else ''
self._log(f'Finished parsing HTML for tenant {self.tenant.id}')
return extracted_html, title
def _generate_markdown_from_html(self, html_content):
self._log(f'Generating markdown from HTML for tenant {self.tenant.id}')
llm = self.model_variables['llm']
template = self.model_variables['html_parse_template']
parse_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | parse_prompt | llm | output_parser
soup = BeautifulSoup(html_content, 'lxml')
chunks = self._split_content(soup)
markdown_chunks = []
for chunk in chunks:
if self.embed_tuning:
self._log(f'Processing chunk: \n{chunk}\n')
input_html = {"html": chunk}
markdown_chunk = chain.invoke(input_html)
markdown_chunks.append(markdown_chunk)
if self.embed_tuning:
self._log(f'Processed markdown chunk: \n{markdown_chunk}\n')
markdown = "\n\n".join(markdown_chunks)
self._log(f'Finished generating markdown from HTML for tenant {self.tenant.id}')
return markdown
def _split_content(self, soup, max_size=20000):
chunks = []
current_chunk = []
current_size = 0
for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'span', 'table']):
element_html = str(element)
element_size = len(element_html)
if current_size + element_size > max_size and current_chunk:
chunks.append(''.join(map(str, current_chunk)))
current_chunk = []
current_size = 0
current_chunk.append(element)
current_size += element_size
if element.name in ['h1', 'h2', 'h3'] and current_size > max_size:
chunks.append(''.join(map(str, current_chunk)))
current_chunk = []
current_size = 0
if current_chunk:
chunks.append(''.join(map(str, current_chunk)))
return chunks
def _parse_excluded_classes(self, excluded_classes):
parsed = {}
for rule in excluded_classes:
element, cls = rule.split('.', 1)
parsed.setdefault(element, set()).add(cls)
return parsed
def _should_exclude_element(self, element, excluded_classes):
if self.html_excluded_elements and element.find_parent(self.html_excluded_elements):
return True
return self._is_element_excluded_by_class(element, excluded_classes)
def _is_element_excluded_by_class(self, element, excluded_classes):
for parent in element.parents:
if self._element_matches_exclusion(parent, excluded_classes):
return True
return self._element_matches_exclusion(element, excluded_classes)
def _element_matches_exclusion(self, element, excluded_classes):
if '*' in excluded_classes and any(cls in excluded_classes['*'] for cls in element.get('class', [])):
return True
return element.name in excluded_classes and \
any(cls in excluded_classes[element.name] for cls in element.get('class', []))
def _extract_element_content(self, element):
content = ' '.join(child.strip() for child in element.stripped_strings)
return f'<{element.name}>{content}</{element.name}>\n'

View File

@@ -5,29 +5,23 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
import re
from langchain_core.runnables import RunnablePassthrough
from common.extensions import minio_client
from common.utils.model_utils import create_language_template
from .processor import Processor
class PDFProcessor:
class PDFProcessor(Processor):
def __init__(self, tenant, model_variables, document_version):
self.tenant = tenant
self.model_variables = model_variables
self.document_version = document_version
# Configuration parameters from model_variables
super().__init__(tenant, model_variables, document_version)
# PDF-specific initialization
self.chunk_size = model_variables['PDF_chunk_size']
self.chunk_overlap = model_variables['PDF_chunk_overlap']
self.min_chunk_size = model_variables['PDF_min_chunk_size']
self.max_chunk_size = model_variables['PDF_max_chunk_size']
# Set tuning variable for easy use
self.embed_tuning = model_variables['embed_tuning']
def process_pdf(self):
def process(self):
self._log("Starting PDF processing")
try:
file_data = minio_client.download_document_file(
@@ -51,11 +45,6 @@ class PDFProcessor:
self._log(f"Error processing PDF: {str(e)}", level='error')
raise
def _log(self, message, level='debug'):
logger = current_app.logger
log_method = getattr(logger, level)
log_method(f"PDFProcessor - Tenant {self.tenant.id}, Document {self.document_version.id}: {message}")
def _extract_content(self, file_data):
extracted_content = []
with pdfplumber.open(io.BytesIO(file_data)) as pdf:
@@ -248,24 +237,3 @@ class PDFProcessor:
markdown_chunks.append(result)
return "\n\n".join(markdown_chunks)
def _save_markdown(self, markdown):
markdown_filename = f"{self.document_version.id}.md"
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
markdown_filename,
markdown.encode('utf-8')
)
def _save_intermediate(self, content, filename):
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
filename,
content.encode('utf-8')
)

View File

@@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from flask import current_app
from common.extensions import minio_client
class Processor(ABC):
def __init__(self, tenant, model_variables, document_version):
self.tenant = tenant
self.model_variables = model_variables
self.document_version = document_version
self.embed_tuning = model_variables['embed_tuning']
@abstractmethod
def process(self):
pass
def _save_markdown(self, markdown):
markdown_filename = f"{self.document_version.id}.md"
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
markdown_filename,
markdown.encode('utf-8')
)
def _log(self, message, level='debug'):
logger = current_app.logger
log_method = getattr(logger, level)
log_method(
f"{self.__class__.__name__} - Tenant {self.tenant.id}, Document {self.document_version.id}: {message}")
def _save_intermediate(self, content, filename):
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
filename,
content.encode('utf-8')
)

View File

@@ -0,0 +1,80 @@
import re
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from common.extensions import minio_client
from common.utils.model_utils import create_language_template
from .processor import Processor
class SRTProcessor(Processor):
def __init__(self, tenant, model_variables, document_version):
super().__init__(tenant, model_variables, document_version)
def process(self):
self._log("Starting SRT processing")
try:
file_data = minio_client.download_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
self.document_version.file_name
)
srt_content = file_data.decode('utf-8')
cleaned_transcription = self._clean_srt(srt_content)
markdown, title = self._generate_markdown_from_transcription(cleaned_transcription)
self._save_markdown(markdown)
self._log("Finished processing SRT")
return markdown, title
except Exception as e:
self._log(f"Error processing SRT: {str(e)}", level='error')
raise
def _clean_srt(self, srt_content):
# Remove timecodes and subtitle numbers
cleaned_lines = []
for line in srt_content.split('\n'):
# Skip empty lines, subtitle numbers, and timecodes
if line.strip() and not line.strip().isdigit() and not re.match(
r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', line):
cleaned_lines.append(line.strip())
# Join the cleaned lines
cleaned_text = ' '.join(cleaned_lines)
# Remove any extra spaces
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
return cleaned_text
def _generate_markdown_from_transcription(self, transcription):
self._log("Generating markdown from transcription")
llm = self.model_variables['llm']
template = self.model_variables['transcript_template']
language_template = create_language_template(template, self.document_version.language)
transcript_prompt = ChatPromptTemplate.from_template(language_template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | transcript_prompt | llm | output_parser
input_transcript = {'transcript': transcription}
markdown = chain.invoke(input_transcript)
# Extract title from the markdown
title = self._extract_title_from_markdown(markdown)
return markdown, title
def _extract_title_from_markdown(self, markdown):
# Simple extraction of the first header as the title
lines = markdown.split('\n')
for line in lines:
if line.startswith('# '):
return line[2:].strip()
return "Untitled SRT Transcription"

View File

@@ -1,26 +1,16 @@
import io
import os
from datetime import datetime as dt, timezone as tz
import subprocess
import gevent
from bs4 import BeautifulSoup
import html
from celery import states
from flask import current_app
# OpenAI imports
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import CharacterTextSplitter, MarkdownHeaderTextSplitter
from langchain_core.documents import Document
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain_core.exceptions import LangChainException
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from sqlalchemy.exc import SQLAlchemyError
from pytube import YouTube
import PyPDF2
from pydub import AudioSegment
import tempfile
from common.extensions import db, minio_client
from common.models.document import DocumentVersion, Embedding
@@ -29,7 +19,10 @@ from common.utils.celery_utils import current_celery
from common.utils.database import Database
from common.utils.model_utils import select_model_variables, create_language_template
from common.utils.os_utils import safe_remove, sync_folder
from eveai_workers.Processors.PDF_Processor import PDFProcessor
from eveai_workers.Processors.audio_processor import AudioProcessor
from eveai_workers.Processors.html_processor import HTMLProcessor
from eveai_workers.Processors.pdf_processor import PDFProcessor
from eveai_workers.Processors.srt_processor import SRTProcessor
@current_celery.task(name='create_embeddings', queue='embeddings')
@@ -85,8 +78,10 @@ def create_embeddings(tenant_id, document_version_id):
process_pdf(tenant, model_variables, document_version)
case 'html':
process_html(tenant, model_variables, document_version)
case 'youtube':
process_youtube(tenant, model_variables, document_version)
case 'srt':
process_srt(tenant, model_variables, document_version)
case 'mp4' | 'mp3' | 'ogg':
process_audio(tenant, model_variables, document_version)
case _:
raise Exception(f'No functionality defined for file type {document_version.file_type} '
f'for tenant {tenant_id} '
@@ -104,83 +99,6 @@ def create_embeddings(tenant_id, document_version_id):
raise
# def process_pdf(tenant, model_variables, document_version):
# file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, document_version.file_name)
#
# pdf_text = ''
# pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data))
# for page in pdf_reader.pages:
# pdf_text += page.extract_text()
#
# markdown = generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_text)
# markdown_file_name = f'{document_version.id}.md'
# minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id,
# markdown_file_name, markdown.encode())
#
# potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name)
# chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
# model_variables['max_chunk_size'])
#
# if len(chunks) > 1:
# summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
# document_version.system_context = f'Summary: {summary}\n'
# else:
# document_version.system_context = ''
#
# enriched_chunks = enrich_chunks(tenant, document_version, chunks)
# embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
#
# try:
# db.session.add(document_version)
# document_version.processing_finished_at = dt.now(tz.utc)
# document_version.processing = False
# db.session.add_all(embeddings)
# db.session.commit()
# except SQLAlchemyError as e:
# current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
# f'on HTML, document version {document_version.id}'
# f'error: {e}')
# raise
#
# current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
# f'on document version {document_version.id} :-)')
def process_pdf(tenant, model_variables, document_version):
processor = PDFProcessor(tenant, model_variables, document_version)
markdown, title = processor.process_pdf()
# Create potential chunks for embedding
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md")
# Combine chunks for embedding
chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
model_variables['max_chunk_size'])
# Enrich chunks
enriched_chunks = enrich_chunks(tenant, document_version, title, chunks)
# Create embeddings
embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
# Update document version and save embeddings
try:
db.session.add(document_version)
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing = False
db.session.add_all(embeddings)
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
f'on PDF, document version {document_version.id}'
f'error: {e}')
raise
current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
f'on document version {document_version.id} :-)')
def delete_embeddings_for_document_version(document_version):
embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all()
for embedding in embeddings_to_delete:
@@ -193,45 +111,53 @@ def delete_embeddings_for_document_version(document_version):
raise
def process_pdf(tenant, model_variables, document_version):
processor = PDFProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
def process_html(tenant, model_variables, document_version):
file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id, document_version.file_name)
html_content = file_data.decode('utf-8')
processor = HTMLProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
# The tags to be considered can be dependent on the tenant
html_tags = model_variables['html_tags']
html_end_tags = model_variables['html_end_tags']
html_included_elements = model_variables['html_included_elements']
html_excluded_elements = model_variables['html_excluded_elements']
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
extracted_html, title = parse_html(tenant, html_content, html_tags, included_elements=html_included_elements,
excluded_elements=html_excluded_elements)
extracted_file_name = f'{document_version.id}-extracted.html'
minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id,
extracted_file_name, extracted_html.encode())
def process_audio(tenant, model_variables, document_version):
processor = AudioProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
markdown = generate_markdown_from_html(tenant, model_variables, document_version, extracted_html)
markdown_file_name = f'{document_version.id}.md'
minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id,
markdown_file_name, markdown.encode())
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name)
def process_srt(tenant, model_variables, document_version):
processor = SRTProcessor(tenant, model_variables, document_version)
markdown, title = processor.process()
# Process markdown and embed
embed_markdown(tenant, model_variables, document_version, markdown, title)
def embed_markdown(tenant, model_variables, document_version, markdown, title):
# Create potential chunks
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md")
# Combine chunks for embedding
chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
model_variables['max_chunk_size'])
if len(chunks) > 1:
summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
document_version.system_context = (f'Title: {title}\n'
f'Summary: {summary}\n')
else:
document_version.system_context = (f'Title: {title}\n')
# Enrich chunks
enriched_chunks = enrich_chunks(tenant, model_variables, document_version, title, chunks)
enriched_chunks = enrich_chunks(tenant, document_version, title, chunks)
# Create embeddings
embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
# Update document version and save embeddings
try:
db.session.add(document_version)
document_version.processing_finished_at = dt.now(tz.utc)
@@ -248,12 +174,18 @@ def process_html(tenant, model_variables, document_version):
f'on document version {document_version.id} :-)')
def enrich_chunks(tenant, document_version, title, chunks):
def enrich_chunks(tenant, model_variables, document_version, title, chunks):
current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} '
f'on document version {document_version.id}')
current_app.logger.debug(f'Nr of chunks: {len(chunks)}')
summary = ''
if len(chunks) > 1:
summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
chunk_total_context = (f'Filename: {document_version.file_name}\n'
f'User Context:\n{document_version.user_context}\n\n'
f'Title: {title}\n'
f'{summary}\n'
f'{document_version.system_context}\n\n')
enriched_chunks = []
initial_chunk = (f'Filename: {document_version.file_name}\n'
@@ -272,95 +204,6 @@ def enrich_chunks(tenant, document_version, title, chunks):
return enriched_chunks
def generate_markdown_from_html(tenant, model_variables, document_version, html_content):
current_app.logger.debug(f'Generating markdown from HTML for tenant {tenant.id} '
f'on document version {document_version.id}')
llm = model_variables['llm']
template = model_variables['html_parse_template']
parse_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | parse_prompt | llm | output_parser
soup = BeautifulSoup(html_content, 'lxml')
def split_content(soup, max_size=20000):
chunks = []
current_chunk = []
current_size = 0
for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'span', 'table']):
element_html = str(element)
element_size = len(element_html)
if current_size + element_size > max_size and current_chunk:
chunks.append(''.join(map(str, current_chunk)))
current_chunk = []
current_size = 0
current_chunk.append(element)
current_size += element_size
if element.name in ['h1', 'h2', 'h3'] and current_size > max_size:
chunks.append(''.join(map(str, current_chunk)))
current_chunk = []
current_size = 0
if current_chunk:
chunks.append(''.join(map(str, current_chunk)))
return chunks
chunks = split_content(soup)
markdown_chunks = []
for chunk in chunks:
current_app.logger.debug(f'Processing chunk to generate markdown from HTML for tenant {tenant.id} '
f'on document version {document_version.id}')
if tenant.embed_tuning:
current_app.embed_tuning_logger.debug(f'Processing chunk: \n '
f'------------------\n'
f'{chunk}\n'
f'------------------\n')
input_html = {"html": chunk}
markdown_chunk = chain.invoke(input_html)
markdown_chunks.append(markdown_chunk)
if tenant.embed_tuning:
current_app.embed_tuning_logger.debug(f'Processed markdown chunk: \n '
f'-------------------------\n'
f'{markdown_chunk}\n'
f'-------------------------\n')
current_app.logger.debug(f'Finished processing chunk to generate markdown from HTML for tenant {tenant.id} '
f'on document version {document_version.id}')
# Combine all markdown chunks
markdown = "\n\n".join(markdown_chunks)
current_app.logger.debug(f'Finished generating markdown from HTML for tenant {tenant.id} '
f'on document version {document_version.id}')
return markdown
def generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_content):
current_app.logger.debug(f'Generating Markdown from PDF for tenant {tenant.id} '
f'on document version {document_version.id}')
llm = model_variables['llm']
template = model_variables['pdf_parse_template']
parse_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | parse_prompt | llm | output_parser
input_pdf = {"pdf_content": pdf_content}
markdown = chain.invoke(input_pdf)
return markdown
def summarize_chunk(tenant, model_variables, document_version, chunk):
current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} '
f'on document version {document_version.id}')
@@ -415,65 +258,6 @@ def embed_chunks(tenant, model_variables, document_version, chunks):
return new_embeddings
def parse_html(tenant, html_content, tags, included_elements=None, excluded_elements=None):
current_app.logger.debug(f'Parsing HTML for tenant {tenant.id}')
soup = BeautifulSoup(html_content, 'html.parser')
extracted_html = ''
excluded_classes = parse_excluded_classes(tenant.html_excluded_classes)
if included_elements:
elements_to_parse = soup.find_all(included_elements)
else:
elements_to_parse = [soup]
log_parsing_info(tenant, tags, included_elements, excluded_elements, excluded_classes, elements_to_parse)
for element in elements_to_parse:
for sub_element in element.find_all(tags):
if should_exclude_element(sub_element, excluded_elements, excluded_classes):
continue
extracted_html += extract_element_content(sub_element)
title = soup.find('title').get_text(strip=True) if soup.find('title') else ''
current_app.logger.debug(f'Finished parsing HTML for tenant {tenant.id}')
return extracted_html, title
def parse_excluded_classes(excluded_classes):
parsed = {}
for rule in excluded_classes:
element, cls = rule.split('.', 1)
parsed.setdefault(element, set()).add(cls)
return parsed
def should_exclude_element(element, excluded_elements, excluded_classes):
if excluded_elements and element.find_parent(excluded_elements):
return True
return is_element_excluded_by_class(element, excluded_classes)
def is_element_excluded_by_class(element, excluded_classes):
for parent in element.parents:
if element_matches_exclusion(parent, excluded_classes):
return True
return element_matches_exclusion(element, excluded_classes)
def element_matches_exclusion(element, excluded_classes):
if '*' in excluded_classes and any(cls in excluded_classes['*'] for cls in element.get('class', [])):
return True
return element.name in excluded_classes and \
any(cls in excluded_classes[element.name] for cls in element.get('class', []))
def extract_element_content(element):
content = ' '.join(child.strip() for child in element.stripped_strings)
return f'<{element.name}>{content}</{element.name}>\n'
def log_parsing_info(tenant, tags, included_elements, excluded_elements, excluded_classes, elements_to_parse):
if tenant.embed_tuning:
current_app.embed_tuning_logger.debug(f'Tags to parse: {tags}')
@@ -484,244 +268,242 @@ def log_parsing_info(tenant, tags, included_elements, excluded_elements, exclude
current_app.embed_tuning_logger.debug(f'First element to parse: {elements_to_parse[0]}')
def process_youtube(tenant, model_variables, document_version):
base_path = os.path.join(current_app.config['UPLOAD_FOLDER'],
document_version.file_location)
download_file_name = f'{document_version.id}.mp4'
compressed_file_name = f'{document_version.id}.mp3'
transcription_file_name = f'{document_version.id}.txt'
markdown_file_name = f'{document_version.id}.md'
# Remove existing files (in case of a re-processing of the file
minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id, download_file_name)
minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id, compressed_file_name)
minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id, transcription_file_name)
minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id, markdown_file_name)
of, title, description, author = download_youtube(document_version.url, tenant.id, document_version,
download_file_name)
document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}'
compress_audio(tenant.id, document_version, download_file_name, compressed_file_name)
transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables)
annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables)
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name)
actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
model_variables['max_chunk_size'])
enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks)
embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
try:
db.session.add(document_version)
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing = False
db.session.add_all(embeddings)
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
f'on Youtube document version {document_version.id}'
f'error: {e}')
raise
current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
f'on Youtube document version {document_version.id} :-)')
def download_youtube(url, tenant_id, document_version, file_name):
try:
current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}')
yt = YouTube(url)
stream = yt.streams.get_audio_only()
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
stream.download(output_path=temp_file.name)
with open(temp_file.name, 'rb') as f:
file_data = f.read()
minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language,
document_version.id,
file_name, file_data)
current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}')
return file_name, yt.title, yt.description, yt.author
except Exception as e:
current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}')
raise
def compress_audio(tenant_id, document_version, input_file, output_file):
try:
current_app.logger.info(f'Compressing audio for tenant: {tenant_id}')
input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language,
document_version.id, input_file)
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input:
temp_input.write(input_data)
temp_input.flush()
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output:
result = subprocess.run(
['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name],
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(f"Compression failed: {result.stderr}")
with open(temp_output.name, 'rb') as f:
compressed_data = f.read()
minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language,
document_version.id,
output_file, compressed_data)
current_app.logger.info(f'Compressed audio for tenant: {tenant_id}')
except Exception as e:
current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}')
raise
def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables):
try:
current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}')
client = model_variables['transcription_client']
model = model_variables['transcription_model']
# Download the audio file from MinIO
audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language,
document_version.id, input_file)
# Load the audio data into pydub
audio = AudioSegment.from_mp3(io.BytesIO(audio_data))
# Define segment length (e.g., 10 minutes)
segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds
transcriptions = []
# Split audio into segments and transcribe each
for i, chunk in enumerate(audio[::segment_length]):
current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}')
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio:
chunk.export(temp_audio.name, format="mp3")
with open(temp_audio.name, 'rb') as audio_segment:
transcription = client.audio.transcriptions.create(
file=audio_segment,
model=model,
language=document_version.language,
response_format='verbose_json',
)
transcriptions.append(transcription.text)
os.unlink(temp_audio.name) # Delete the temporary file
# Combine all transcriptions
full_transcription = " ".join(transcriptions)
# Upload the full transcription to MinIO
minio_client.upload_document_file(
tenant_id,
document_version.doc_id,
document_version.language,
document_version.id,
output_file,
full_transcription.encode('utf-8')
)
current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}')
except Exception as e:
current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}')
raise
def annotate_transcription(tenant, document_version, input_file, output_file, model_variables):
try:
current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}')
char_splitter = CharacterTextSplitter(separator='.',
chunk_size=model_variables['annotation_chunk_length'],
chunk_overlap=0)
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False)
llm = model_variables['llm']
template = model_variables['transcript_template']
language_template = create_language_template(template, document_version.language)
transcript_prompt = ChatPromptTemplate.from_template(language_template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
# Download the transcription file from MinIO
transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id,
document_version.language, document_version.id,
input_file)
transcript = transcript_data.decode('utf-8')
chain = setup | transcript_prompt | llm | output_parser
chunks = char_splitter.split_text(transcript)
all_markdown_chunks = []
last_markdown_chunk = ''
for chunk in chunks:
current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}')
full_input = last_markdown_chunk + '\n' + chunk
if tenant.embed_tuning:
current_app.embed_tuning_logger.debug(f'Annotating chunk: \n '
f'------------------\n'
f'{full_input}\n'
f'------------------\n')
input_transcript = {'transcript': full_input}
markdown = chain.invoke(input_transcript)
# GPT-4o returns some kind of content description: ```markdown <text> ```
if markdown.startswith("```markdown"):
markdown = "\n".join(markdown.strip().split("\n")[1:-1])
if tenant.embed_tuning:
current_app.embed_tuning_logger.debug(f'Markdown Received: \n '
f'------------------\n'
f'{markdown}\n'
f'------------------\n')
md_header_splits = markdown_splitter.split_text(markdown)
markdown_chunks = [doc.page_content for doc in md_header_splits]
# claude-3.5-sonnet returns introductory text
if not markdown_chunks[0].startswith('#'):
markdown_chunks.pop(0)
last_markdown_chunk = markdown_chunks[-1]
last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:])
markdown_chunks.pop()
all_markdown_chunks += markdown_chunks
all_markdown_chunks += [last_markdown_chunk]
annotated_transcript = '\n'.join(all_markdown_chunks)
# Upload the annotated transcript to MinIO
minio_client.upload_document_file(
tenant.id,
document_version.doc_id,
document_version.language,
document_version.id,
output_file,
annotated_transcript.encode('utf-8')
)
current_app.logger.info(f'Annotated transcription for tenant {tenant.id}')
except Exception as e:
current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}')
raise
# def process_youtube(tenant, model_variables, document_version):
# download_file_name = f'{document_version.id}.mp4'
# compressed_file_name = f'{document_version.id}.mp3'
# transcription_file_name = f'{document_version.id}.txt'
# markdown_file_name = f'{document_version.id}.md'
#
# # Remove existing files (in case of a re-processing of the file
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, download_file_name)
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, compressed_file_name)
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, transcription_file_name)
# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, markdown_file_name)
#
# of, title, description, author = download_youtube(document_version.url, tenant.id, document_version,
# download_file_name)
# document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}'
# compress_audio(tenant.id, document_version, download_file_name, compressed_file_name)
# transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables)
# annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables)
#
# potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name)
# actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
# model_variables['max_chunk_size'])
#
# enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks)
# embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
#
# try:
# db.session.add(document_version)
# document_version.processing_finished_at = dt.now(tz.utc)
# document_version.processing = False
# db.session.add_all(embeddings)
# db.session.commit()
# except SQLAlchemyError as e:
# current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
# f'on Youtube document version {document_version.id}'
# f'error: {e}')
# raise
#
# current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
# f'on Youtube document version {document_version.id} :-)')
#
#
# def download_youtube(url, tenant_id, document_version, file_name):
# try:
# current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}')
# yt = YouTube(url)
# stream = yt.streams.get_audio_only()
#
# with tempfile.NamedTemporaryFile(delete=False) as temp_file:
# stream.download(output_path=temp_file.name)
# with open(temp_file.name, 'rb') as f:
# file_data = f.read()
#
# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id,
# file_name, file_data)
#
# current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}')
# return file_name, yt.title, yt.description, yt.author
# except Exception as e:
# current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}')
# raise
#
#
# def compress_audio(tenant_id, document_version, input_file, output_file):
# try:
# current_app.logger.info(f'Compressing audio for tenant: {tenant_id}')
#
# input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id, input_file)
#
# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input:
# temp_input.write(input_data)
# temp_input.flush()
#
# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output:
# result = subprocess.run(
# ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name],
# capture_output=True,
# text=True
# )
#
# if result.returncode != 0:
# raise Exception(f"Compression failed: {result.stderr}")
#
# with open(temp_output.name, 'rb') as f:
# compressed_data = f.read()
#
# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id,
# output_file, compressed_data)
#
# current_app.logger.info(f'Compressed audio for tenant: {tenant_id}')
# except Exception as e:
# current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}')
# raise
#
#
# def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables):
# try:
# current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}')
# client = model_variables['transcription_client']
# model = model_variables['transcription_model']
#
# # Download the audio file from MinIO
# audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language,
# document_version.id, input_file)
#
# # Load the audio data into pydub
# audio = AudioSegment.from_mp3(io.BytesIO(audio_data))
#
# # Define segment length (e.g., 10 minutes)
# segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds
#
# transcriptions = []
#
# # Split audio into segments and transcribe each
# for i, chunk in enumerate(audio[::segment_length]):
# current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}')
#
# with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio:
# chunk.export(temp_audio.name, format="mp3")
#
# with open(temp_audio.name, 'rb') as audio_segment:
# transcription = client.audio.transcriptions.create(
# file=audio_segment,
# model=model,
# language=document_version.language,
# response_format='verbose_json',
# )
#
# transcriptions.append(transcription.text)
#
# os.unlink(temp_audio.name) # Delete the temporary file
#
# # Combine all transcriptions
# full_transcription = " ".join(transcriptions)
#
# # Upload the full transcription to MinIO
# minio_client.upload_document_file(
# tenant_id,
# document_version.doc_id,
# document_version.language,
# document_version.id,
# output_file,
# full_transcription.encode('utf-8')
# )
#
# current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}')
# except Exception as e:
# current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}')
# raise
#
#
# def annotate_transcription(tenant, document_version, input_file, output_file, model_variables):
# try:
# current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}')
#
# char_splitter = CharacterTextSplitter(separator='.',
# chunk_size=model_variables['annotation_chunk_length'],
# chunk_overlap=0)
#
# headers_to_split_on = [
# ("#", "Header 1"),
# ("##", "Header 2"),
# ]
# markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False)
#
# llm = model_variables['llm']
# template = model_variables['transcript_template']
# language_template = create_language_template(template, document_version.language)
# transcript_prompt = ChatPromptTemplate.from_template(language_template)
# setup = RunnablePassthrough()
# output_parser = StrOutputParser()
#
# # Download the transcription file from MinIO
# transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id,
# document_version.language, document_version.id,
# input_file)
# transcript = transcript_data.decode('utf-8')
#
# chain = setup | transcript_prompt | llm | output_parser
#
# chunks = char_splitter.split_text(transcript)
# all_markdown_chunks = []
# last_markdown_chunk = ''
# for chunk in chunks:
# current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}')
# full_input = last_markdown_chunk + '\n' + chunk
# if tenant.embed_tuning:
# current_app.embed_tuning_logger.debug(f'Annotating chunk: \n '
# f'------------------\n'
# f'{full_input}\n'
# f'------------------\n')
# input_transcript = {'transcript': full_input}
# markdown = chain.invoke(input_transcript)
# # GPT-4o returns some kind of content description: ```markdown <text> ```
# if markdown.startswith("```markdown"):
# markdown = "\n".join(markdown.strip().split("\n")[1:-1])
# if tenant.embed_tuning:
# current_app.embed_tuning_logger.debug(f'Markdown Received: \n '
# f'------------------\n'
# f'{markdown}\n'
# f'------------------\n')
# md_header_splits = markdown_splitter.split_text(markdown)
# markdown_chunks = [doc.page_content for doc in md_header_splits]
# # claude-3.5-sonnet returns introductory text
# if not markdown_chunks[0].startswith('#'):
# markdown_chunks.pop(0)
# last_markdown_chunk = markdown_chunks[-1]
# last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:])
# markdown_chunks.pop()
# all_markdown_chunks += markdown_chunks
#
# all_markdown_chunks += [last_markdown_chunk]
#
# annotated_transcript = '\n'.join(all_markdown_chunks)
#
# # Upload the annotated transcript to MinIO
# minio_client.upload_document_file(
# tenant.id,
# document_version.doc_id,
# document_version.language,
# document_version.id,
# output_file,
# annotated_transcript.encode('utf-8')
# )
#
# current_app.logger.info(f'Annotated transcription for tenant {tenant.id}')
# except Exception as e:
# current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}')
# raise
def create_potential_chunks_for_markdown(tenant_id, document_version, input_file):
@@ -779,4 +561,3 @@ def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars):
actual_chunks.append(current_chunk)
return actual_chunks

View File

@@ -60,7 +60,6 @@ urllib3~=2.2.2
WTForms~=3.1.2
wtforms-html5~=0.6.1
zxcvbn~=4.4.28
pytube~=15.0.0
groq~=0.9.0
pydub~=0.25.1
argparse~=1.4.0
@@ -73,4 +72,5 @@ graypy~=2.1.0
lxml~=5.3.0
pillow~=10.4.0
pdfplumber~=0.11.4
PyPDF2~=3.0.1
PyPDF2~=3.0.1
Flask-RESTful~=0.3.10