- Zapier Document Refresh action (create) added

This commit is contained in:
Josako
2024-12-17 16:40:21 +01:00
parent 53c625599a
commit f7cd58ed2a
9 changed files with 381 additions and 194 deletions

View File

@@ -1,14 +1,18 @@
import io
import json
from datetime import datetime
from typing import Tuple, Any
import pytz
import requests
from flask import current_app, request
from flask_restx import Namespace, Resource, fields, reqparse
from flask_jwt_extended import jwt_required, get_jwt_identity
from sqlalchemy import desc
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
from common.models.document import DocumentVersion
from common.utils.document_utils import (
create_document_stack, process_url, start_embedding_task,
EveAIInvalidLanguageException, EveAIDoubleURLException, EveAIUnsupportedFileType,
@@ -37,7 +41,8 @@ document_ns = Namespace('documents', description='Document related operations')
# Define models for request parsing and response serialization
upload_parser = reqparse.RequestParser()
upload_parser.add_argument('catalog_id', location='form', type=int, required=True, help='The catalog to add the file to')
upload_parser.add_argument('catalog_id', location='form', type=int, required=True,
help='The catalog to add the file to')
upload_parser.add_argument('file', location='files', type=FileStorage, required=True, help='The file to upload')
upload_parser.add_argument('name', location='form', type=str, required=False, help='Name of the document')
upload_parser.add_argument('language', location='form', type=str, required=True, help='Language of the document')
@@ -69,7 +74,11 @@ class AddDocument(Resource):
@document_ns.response(500, 'Internal Server Error')
def post(self):
"""
Add a new document by providing the content of a file (Multipart/form-data).
Upload a new document to EveAI by directly providing the file content.
This endpoint accepts multipart/form-data with the file content and metadata. It processes
the file, creates a new document in the specified catalog, and initiates the embedding
process.
"""
tenant_id = get_jwt_identity()
current_app.logger.info(f'Adding document for tenant {tenant_id}')
@@ -126,10 +135,11 @@ add_document_through_url = document_ns.model('AddDocumentThroughURL', {
'valid_from': fields.String(required=False, description='Valid from date for the document'),
'user_metadata': fields.String(required=False, description='User metadata for the document'),
'system_metadata': fields.String(required=False, description='System metadata for the document'),
'catalog_properties': fields.String(required=False, description='The catalog configuration to be passed along (JSON '
'format). Validity is against catalog requirements '
'is not checked, and is the responsibility of the '
'calling client.'),
'catalog_properties': fields.String(required=False,
description='The catalog configuration to be passed along (JSON '
'format). Validity is against catalog requirements '
'is not checked, and is the responsibility of the '
'calling client.'),
})
add_document_through_url_response = document_ns.model('AddDocumentThroughURLResponse', {
@@ -139,6 +149,7 @@ add_document_through_url_response = document_ns.model('AddDocumentThroughURLResp
'task_id': fields.String(description='ID of the embedding task')
})
@document_ns.route('/add_document_through_url')
class AddDocumentThroughURL(Resource):
@jwt_required()
@@ -150,8 +161,10 @@ class AddDocumentThroughURL(Resource):
@document_ns.response(500, 'Internal Server Error')
def post(self):
"""
Add a new document using a URL. The URL can be temporary, and will not be stored.
Mainly used for passing temporary URLs like used in e.g. Zapier
Add a new document to EveAI using a temporary URL.
This endpoint is primarily used for integration with services that provide temporary URLs
(like Zapier). The URL content is downloaded and processed as a new document.
"""
tenant_id = get_jwt_identity()
current_app.logger.info(f'Adding document through url for tenant {tenant_id}')
@@ -164,29 +177,19 @@ class AddDocumentThroughURL(Resource):
raise
try:
# Step 1: Download from stashed URL
stashed_url = args['temp_url']
current_app.logger.info(f"Downloading stashed file from URL: {stashed_url}")
response = requests.get(stashed_url, stream=True)
response.raise_for_status()
hydration_url = response.text.strip()
current_app.logger.info(f"Downloading actual file from URL: {hydration_url}")
# Step 2: Download from hydration URL
actual_file_response = requests.get(hydration_url, stream=True)
actual_file_response.raise_for_status()
hydrated_file_content = actual_file_response.content
user_metadata = json.loads(args.get('user_metadata', '{}'))
actual_file_content, actual_file_content_type = download_file_content(args['temp_url'], user_metadata)
# Get filename from URL or use provided name
filename = secure_filename(args.get('name'))
extension = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
# Create FileStorage object from downloaded content
file_content = io.BytesIO(hydrated_file_content)
file_content = io.BytesIO(actual_file_content)
file = FileStorage(
stream=file_content,
filename=filename,
content_type=response.headers.get('content-type', 'application/octet-stream')
content_type=actual_file_content_type
)
current_app.logger.info(f"Successfully downloaded file: {filename}")
@@ -233,10 +236,11 @@ add_url_model = document_ns.model('AddURL', {
'valid_from': fields.String(required=False, description='Valid from date for the document'),
'user_metadata': fields.String(required=False, description='User metadata for the document'),
'system_metadata': fields.String(required=False, description='System metadata for the document'),
'catalog_properties': fields.String(required=False, description='The catalog configuration to be passed along (JSON '
'format). Validity is against catalog requirements '
'is not checked, and is the responsibility of the '
'calling client.'),
'catalog_properties': fields.String(required=False,
description='The catalog configuration to be passed along (JSON '
'format). Validity is against catalog requirements '
'is not checked, and is the responsibility of the '
'calling client.'),
})
add_url_response = document_ns.model('AddURLResponse', {
@@ -257,8 +261,10 @@ class AddURL(Resource):
@document_ns.response(500, 'Internal Server Error')
def post(self):
"""
Add a new document from URL. The URL in this case is stored and can be used to refresh the document.
As a consequence, this must be a permanent and accessible URL.
Add a new document to EveAI from a permanent URL.
This endpoint is used for URLs that will remain accessible. The URL is stored and can
be used to refresh the document's content later.
"""
tenant_id = get_jwt_identity()
current_app.logger.info(f'Adding document from URL for tenant {tenant_id}')
@@ -383,7 +389,8 @@ class DocumentVersionResource(Resource):
"""Edit a document version"""
data = request.json
tenant_id = get_jwt_identity()
updated_version, error = edit_document_version(tenant_id, version_id, data['user_context'], data.get('catalog_properties'))
updated_version, error = edit_document_version(tenant_id, version_id, data['user_context'],
data.get('catalog_properties'))
if updated_version:
return {'message': f'Document Version {updated_version.id} updated successfully'}, 200
else:
@@ -518,58 +525,87 @@ class DocumentLookup(Resource):
return {'message': f'Missing required field: {str(e)}'}, 400
refresh_content_model = document_ns.model('RefreshDocumentContent', {
'file_content': fields.Raw(required=True, description='The new file content'),
refresh_url_model = document_ns.model('RefreshDocumentThroughURL', {
'temp_url': fields.String(required=True, description='Temporary URL of the updated document content'),
'language': fields.String(required=False, description='Language of the document'),
'user_context': fields.String(required=False, description='User context for the document'),
'user_metadata': fields.Raw(required=False, description='Custom metadata fields'),
'catalog_properties': fields.Raw(required=False, description='Catalog-specific properties'),
'trigger_service': fields.String(required=False, description='Service that triggered the update')
})
@document_ns.route('/<int:document_id>/refresh_content')
class RefreshDocumentContent(Resource):
@document_ns.route('/<int:document_id>/refresh_through_url')
class RefreshDocumentThroughURL(Resource):
@jwt_required()
@requires_service('DOCAPI')
@document_ns.expect(refresh_content_model)
@document_ns.expect(refresh_url_model)
@document_ns.response(200, 'Document refreshed successfully')
def post(self, document_id):
"""Refresh a document with new content"""
"""Refresh a document using content from a URL"""
tenant_id = get_jwt_identity()
current_app.logger.info(f'Refreshing document {document_id} through URL for tenant {tenant_id}')
try:
data = request.json
file_content = data['file_content']
# Get filename from the existing version
old_doc_vers = (DocumentVersion.query.filter_by(doc_id=document_id).
order_by(desc(DocumentVersion.id)).first())
filename = f"{old_doc_vers.id}.{old_doc_vers.file_type}"
# Build user_metadata by merging:
# 1. Existing metadata (if any)
# 2. New metadata from request
# 3. Zapier-specific fields
user_metadata = data.get('user_metadata', {})
user_metadata.update({
'source': 'zapier',
'trigger_service': data.get('trigger_service')
})
data['user_metadata'] = user_metadata
args = request.json
user_metadata = json.loads(args.get('user_metadata', '{}'))
# Keep catalog_properties separate
if 'catalog_properties' in data:
# We could add validation here against catalog configuration
data['catalog_properties'] = data['catalog_properties']
try:
actual_file_content, actual_file_content_type = download_file_content(args['temp_url'], user_metadata)
file_content = io.BytesIO(actual_file_content)
file = FileStorage(
stream=file_content,
filename=filename,
content_type=actual_file_content_type
)
new_version, task_id = refresh_document_with_content(
document_id,
tenant_id,
file_content,
data
)
new_version, task_id = refresh_document_with_content(
document_id,
tenant_id,
actual_file_content,
args
)
return {
'message': f'Document refreshed successfully. New version: {new_version.id}. Task ID: {task_id}',
'document_id': document_id,
'document_version_id': new_version.id,
'task_id': task_id
}, 200
return {
'message': f'Document refreshed successfully. New version: {new_version.id}. Task ID: {task_id}',
'document_id': document_id,
'document_version_id': new_version.id,
'task_id': task_id
}, 200
except requests.RequestException as e:
current_app.logger.error(f"Error downloading file: {str(e)}")
return {'message': f'Error downloading file: {str(e)}'}, 422
except EveAIException as e:
return e.to_dict(), e.status_code
def download_file_content(url: str, user_metadata: dict) -> tuple[Any, Any]:
if user_metadata and 'service' in user_metadata and 'Zapier' in user_metadata['service']:
# Zapier uses a system of Stashed URLs
# Step 1: Download from stashed URL
stashed_url = url
current_app.logger.info(f"Downloading stashed file from URL: {stashed_url}")
response = requests.get(stashed_url, stream=True)
response.raise_for_status()
hydration_url = response.text.strip()
current_app.logger.info(f"Downloading actual file from URL: {hydration_url}")
# Step 2: Download from hydration URL
actual_file_response = requests.get(hydration_url, stream=True)
actual_file_response.raise_for_status()
actual_file_content = actual_file_response.content
else:
actual_url = url
actual_file_response = requests.get(actual_url, stream=True)
actual_file_response.raise_for_status()
actual_file_content = actual_file_response.content
actual_file_content_type = actual_file_response.headers.get('content-type', 'application/octet-stream')
return actual_file_content, actual_file_content_type