- Created a base mail template - Adapt and improve document API to usage of catalogs and processors - Adapt eveai_sync to new authentication mechanism and usage of catalogs and processors
366 lines
12 KiB
Python
366 lines
12 KiB
Python
from datetime import datetime as dt, timezone as tz
|
|
|
|
from sqlalchemy import desc
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from werkzeug.utils import secure_filename
|
|
from common.models.document import Document, DocumentVersion, Catalog
|
|
from common.extensions import db, minio_client
|
|
from common.utils.celery_utils import current_celery
|
|
from flask import current_app
|
|
from flask_security import current_user
|
|
import requests
|
|
from urllib.parse import urlparse, unquote, urlunparse
|
|
import os
|
|
from .eveai_exceptions import (EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType,
|
|
EveAIInvalidCatalog, EveAIInvalidDocument, EveAIInvalidDocumentVersion)
|
|
from ..models.user import Tenant
|
|
|
|
|
|
def create_document_stack(api_input, file, filename, extension, tenant_id):
|
|
# Create the Document
|
|
catalog_id = int(api_input.get('catalog_id'))
|
|
catalog = Catalog.query.get(catalog_id)
|
|
if not catalog:
|
|
raise EveAIInvalidCatalog(tenant_id, catalog_id)
|
|
new_doc = create_document(api_input, filename, catalog_id)
|
|
db.session.add(new_doc)
|
|
|
|
url = api_input.get('url', '')
|
|
if url != '':
|
|
url = cope_with_local_url(api_input.get('url', ''))
|
|
|
|
# Create the DocumentVersion
|
|
new_doc_vers = create_version_for_document(new_doc, tenant_id,
|
|
url,
|
|
api_input.get('sub_file_type', ''),
|
|
api_input.get('language', 'en'),
|
|
api_input.get('user_context', ''),
|
|
api_input.get('user_metadata'),
|
|
api_input.get('catalog_properties')
|
|
)
|
|
db.session.add(new_doc_vers)
|
|
|
|
try:
|
|
db.session.commit()
|
|
except SQLAlchemyError as e:
|
|
current_app.logger.error(f'Error adding document for tenant {tenant_id}: {e}')
|
|
db.session.rollback()
|
|
raise
|
|
|
|
current_app.logger.info(f'Document added successfully for tenant {tenant_id}, '
|
|
f'Document Version {new_doc.id}')
|
|
|
|
# Upload file to storage
|
|
upload_file_for_version(new_doc_vers, file, extension, tenant_id)
|
|
|
|
return new_doc, new_doc_vers
|
|
|
|
|
|
def create_document(form, filename, catalog_id):
|
|
new_doc = Document()
|
|
if form['name'] == '':
|
|
new_doc.name = filename.rsplit('.', 1)[0]
|
|
else:
|
|
new_doc.name = form['name']
|
|
|
|
if form['valid_from'] and form['valid_from'] != '':
|
|
new_doc.valid_from = form['valid_from']
|
|
else:
|
|
new_doc.valid_from = dt.now(tz.utc)
|
|
new_doc.catalog_id = catalog_id
|
|
set_logging_information(new_doc, dt.now(tz.utc))
|
|
|
|
return new_doc
|
|
|
|
|
|
def create_version_for_document(document, tenant_id, url, sub_file_type, language, user_context, user_metadata,
|
|
catalog_properties):
|
|
new_doc_vers = DocumentVersion()
|
|
if url != '':
|
|
new_doc_vers.url = url
|
|
|
|
if language == '':
|
|
raise EveAIInvalidLanguageException('Language is required for document creation!')
|
|
else:
|
|
new_doc_vers.language = language
|
|
|
|
if user_context != '':
|
|
new_doc_vers.user_context = user_context
|
|
|
|
if user_metadata != '' and user_metadata is not None:
|
|
new_doc_vers.user_metadata = user_metadata
|
|
|
|
if catalog_properties != '' and catalog_properties is not None:
|
|
new_doc_vers.catalog_properties = catalog_properties
|
|
|
|
if sub_file_type != '':
|
|
new_doc_vers.sub_file_type = sub_file_type
|
|
|
|
new_doc_vers.document = document
|
|
|
|
set_logging_information(new_doc_vers, dt.now(tz.utc))
|
|
|
|
mark_tenant_storage_dirty(tenant_id)
|
|
|
|
return new_doc_vers
|
|
|
|
|
|
def upload_file_for_version(doc_vers, file, extension, tenant_id):
|
|
doc_vers.file_type = extension
|
|
|
|
# Normally, the tenant bucket should exist. But let's be on the safe side if a migration took place.
|
|
minio_client.create_tenant_bucket(tenant_id)
|
|
|
|
try:
|
|
bn, on, size = minio_client.upload_document_file(
|
|
tenant_id,
|
|
doc_vers.doc_id,
|
|
doc_vers.language,
|
|
doc_vers.id,
|
|
f"{doc_vers.id}.{extension}",
|
|
file
|
|
)
|
|
doc_vers.bucket_name = bn
|
|
doc_vers.object_name = on
|
|
doc_vers.file_size = size / 1048576 # Convert bytes to MB
|
|
|
|
db.session.commit()
|
|
current_app.logger.info(f'Successfully saved document to MinIO for tenant {tenant_id} for '
|
|
f'document version {doc_vers.id} while uploading file.')
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
current_app.logger.error(
|
|
f'Error saving document to MinIO for tenant {tenant_id}: {e}')
|
|
raise
|
|
|
|
|
|
def set_logging_information(obj, timestamp):
|
|
obj.created_at = timestamp
|
|
obj.updated_at = timestamp
|
|
|
|
user_id = get_current_user_id()
|
|
if user_id:
|
|
obj.created_by = user_id
|
|
obj.updated_by = user_id
|
|
|
|
|
|
def update_logging_information(obj, timestamp):
|
|
obj.updated_at = timestamp
|
|
|
|
user_id = get_current_user_id()
|
|
if user_id:
|
|
obj.updated_by = user_id
|
|
|
|
|
|
def get_current_user_id():
|
|
try:
|
|
if current_user and current_user.is_authenticated:
|
|
return current_user.id
|
|
else:
|
|
return None
|
|
except Exception:
|
|
# This will catch any errors if current_user is not available (e.g., in API context)
|
|
return None
|
|
|
|
|
|
def get_extension_from_content_type(content_type):
|
|
content_type_map = {
|
|
'text/html': 'html',
|
|
'application/pdf': 'pdf',
|
|
'text/plain': 'txt',
|
|
'application/msword': 'doc',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
|
|
# Add more mappings as needed
|
|
}
|
|
return content_type_map.get(content_type, 'html') # Default to 'html' if unknown
|
|
|
|
|
|
def process_url(url, tenant_id):
|
|
url = cope_with_local_url(url)
|
|
|
|
response = requests.head(url, allow_redirects=True)
|
|
content_type = response.headers.get('Content-Type', '').split(';')[0]
|
|
|
|
# Determine file extension based on Content-Type
|
|
extension = get_extension_from_content_type(content_type)
|
|
|
|
# Generate filename
|
|
parsed_url = urlparse(url)
|
|
path = unquote(parsed_url.path)
|
|
filename = os.path.basename(path)
|
|
|
|
if not filename or '.' not in filename:
|
|
# Use the last part of the path or a default name
|
|
filename = path.strip('/').split('/')[-1] or 'document'
|
|
filename = secure_filename(f"{filename}.{extension}")
|
|
else:
|
|
filename = secure_filename(filename)
|
|
|
|
# Check if a document with this URL already exists
|
|
existing_doc = DocumentVersion.query.filter_by(url=url).first()
|
|
if existing_doc:
|
|
raise EveAIDoubleURLException
|
|
|
|
# Download the content
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
file_content = response.content
|
|
|
|
return file_content, filename, extension
|
|
|
|
|
|
def start_embedding_task(tenant_id, doc_vers_id):
|
|
task = current_celery.send_task('create_embeddings',
|
|
args=[tenant_id, doc_vers_id,],
|
|
queue='embeddings')
|
|
current_app.logger.info(f'Embedding creation started for tenant {tenant_id}, '
|
|
f'Document Version {doc_vers_id}. '
|
|
f'Embedding creation task: {task.id}')
|
|
return task.id
|
|
|
|
|
|
def validate_file_type(extension):
|
|
if extension not in current_app.config['SUPPORTED_FILE_TYPES']:
|
|
raise EveAIUnsupportedFileType(f"Filetype {extension} is currently not supported. "
|
|
f"Supported filetypes: {', '.join(current_app.config['SUPPORTED_FILE_TYPES'])}")
|
|
|
|
|
|
def get_filename_from_url(url):
|
|
parsed_url = urlparse(url)
|
|
path_parts = parsed_url.path.split('/')
|
|
filename = path_parts[-1]
|
|
if filename == '':
|
|
filename = 'index'
|
|
if not filename.endswith('.html'):
|
|
filename += '.html'
|
|
return filename
|
|
|
|
|
|
def get_documents_list(page, per_page):
|
|
query = Document.query.order_by(desc(Document.created_at))
|
|
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
|
return pagination
|
|
|
|
|
|
def edit_document(tenant_id, document_id, name, valid_from, valid_to):
|
|
doc = Document.query.get(document_id)
|
|
if not doc:
|
|
raise EveAIInvalidDocument(tenant_id, document_id)
|
|
if name:
|
|
doc.name = name
|
|
if valid_from:
|
|
doc.valid_from = valid_from
|
|
if valid_to:
|
|
doc.valid_to = valid_to
|
|
update_logging_information(doc, dt.now(tz.utc))
|
|
|
|
try:
|
|
db.session.add(doc)
|
|
db.session.commit()
|
|
return doc, None
|
|
except SQLAlchemyError as e:
|
|
db.session.rollback()
|
|
return None, str(e)
|
|
|
|
|
|
def edit_document_version(tenant_id, version_id, user_context, catalog_properties):
|
|
doc_vers = DocumentVersion.query.get(version_id)
|
|
if not doc_vers:
|
|
raise EveAIInvalidDocumentVersion(tenant_id, version_id)
|
|
doc_vers.user_context = user_context
|
|
doc_vers.catalog_properties = catalog_properties
|
|
update_logging_information(doc_vers, dt.now(tz.utc))
|
|
|
|
try:
|
|
db.session.add(doc_vers)
|
|
db.session.commit()
|
|
return doc_vers, None
|
|
except SQLAlchemyError as e:
|
|
db.session.rollback()
|
|
return None, str(e)
|
|
|
|
|
|
def refresh_document_with_info(doc_id, tenant_id, api_input):
|
|
doc = Document.query.get(doc_id)
|
|
if not doc:
|
|
raise EveAIInvalidDocument(tenant_id, doc_id)
|
|
old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first()
|
|
if not old_doc_vers.url:
|
|
return None, "This document has no URL. Only documents with a URL can be refreshed."
|
|
|
|
new_doc_vers = create_version_for_document(
|
|
doc, tenant_id,
|
|
old_doc_vers.url,
|
|
old_doc_vers.sub_file_type,
|
|
api_input.get('language', old_doc_vers.language),
|
|
api_input.get('user_context', old_doc_vers.user_context),
|
|
api_input.get('user_metadata', old_doc_vers.user_metadata),
|
|
api_input.get('catalog_properties', old_doc_vers.catalog_properties),
|
|
)
|
|
|
|
set_logging_information(new_doc_vers, dt.now(tz.utc))
|
|
|
|
try:
|
|
db.session.add(new_doc_vers)
|
|
db.session.commit()
|
|
except SQLAlchemyError as e:
|
|
db.session.rollback()
|
|
return None, str(e)
|
|
|
|
url = cope_with_local_url(old_doc_vers.url)
|
|
response = requests.head(url, allow_redirects=True)
|
|
content_type = response.headers.get('Content-Type', '').split(';')[0]
|
|
extension = get_extension_from_content_type(content_type)
|
|
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
file_content = response.content
|
|
|
|
upload_file_for_version(new_doc_vers, file_content, extension, tenant_id)
|
|
|
|
task = current_celery.send_task('create_embeddings', args=[tenant_id, new_doc_vers.id,], queue='embeddings')
|
|
current_app.logger.info(f'Embedding creation started for document {doc_id} on version {new_doc_vers.id} '
|
|
f'with task id: {task.id}.')
|
|
|
|
return new_doc_vers, task.id
|
|
|
|
|
|
# Update the existing refresh_document function to use the new refresh_document_with_info
|
|
def refresh_document(doc_id, tenant_id):
|
|
current_app.logger.info(f'Refreshing document {doc_id}')
|
|
doc = Document.query.get_or_404(doc_id)
|
|
old_doc_vers = DocumentVersion.query.filter_by(doc_id=doc_id).order_by(desc(DocumentVersion.id)).first()
|
|
|
|
api_input = {
|
|
'language': old_doc_vers.language,
|
|
'user_context': old_doc_vers.user_context,
|
|
'user_metadata': old_doc_vers.user_metadata,
|
|
'catalog_properties': old_doc_vers.catalog_properties,
|
|
}
|
|
|
|
return refresh_document_with_info(doc_id, tenant_id, api_input)
|
|
|
|
|
|
# Function triggered when a document_version is created or updated
|
|
def mark_tenant_storage_dirty(tenant_id):
|
|
tenant = db.session.query(Tenant).filter_by(id=int(tenant_id)).first()
|
|
tenant.storage_dirty = True
|
|
db.session.commit()
|
|
|
|
|
|
def cope_with_local_url(url):
|
|
current_app.logger.debug(f'Incomming URL: {url}')
|
|
parsed_url = urlparse(url)
|
|
# Check if this is an internal WordPress URL (TESTING) and rewrite it
|
|
if parsed_url.netloc in [current_app.config['EXTERNAL_WORDPRESS_BASE_URL']]:
|
|
parsed_url = parsed_url._replace(
|
|
scheme=current_app.config['WORDPRESS_PROTOCOL'],
|
|
netloc=f"{current_app.config['WORDPRESS_HOST']}:{current_app.config['WORDPRESS_PORT']}"
|
|
)
|
|
url = urlunparse(parsed_url)
|
|
current_app.logger.debug(f'Translated Wordpress URL to: {url}')
|
|
|
|
return url
|
|
|
|
|