Files
eveAI/common/utils/minio_utils.py
Josako 6ccba7d1e3 - Add test environment to __init__.py for all eveai services
- Add postgresql certificate to secrets for secure communication in staging and production environments
- Adapt for TLS communication with PostgreSQL
- Adapt tasks to handle invalid connections from the connection pool
- Migrate to psycopg3 for connection to PostgreSQL
2025-09-10 11:40:38 +02:00

158 lines
7.2 KiB
Python

from minio import Minio
from minio.error import S3Error
from flask import Flask, current_app
import io
from werkzeug.datastructures import FileStorage
MIB_CONVERTOR = 1_048_576
class MinioClient:
def __init__(self):
self.client = None
def init_app(self, app: Flask):
app.logger.debug(f"Initializing MinIO client with endpoint: {app.config['MINIO_ENDPOINT']} and secure: {app.config.get('MINIO_USE_HTTPS', False)}")
self.client = Minio(
app.config['MINIO_ENDPOINT'],
access_key=app.config['MINIO_ACCESS_KEY'],
secret_key=app.config['MINIO_SECRET_KEY'],
secure=app.config.get('MINIO_USE_HTTPS', False)
)
app.logger.info(f"MinIO client initialized with endpoint: {app.config['MINIO_ENDPOINT']}")
def generate_bucket_name(self, tenant_id):
tenant_base = current_app.config.get('OBJECT_STORAGE_TENANT_BASE', 'bucket')
if tenant_base == 'bucket':
return f"tenant-{tenant_id}-bucket"
elif tenant_base == 'folder':
return current_app.config.get('OBJECT_STORAGE_BUCKET_NAME')
else:
raise ValueError(f"Invalid OBJECT_STORAGE_TENANT_BASE value: {tenant_base}")
def create_tenant_bucket(self, tenant_id):
tenant_base = current_app.config.get('OBJECT_STORAGE_TENANT_BASE', 'bucket')
if tenant_base == 'bucket':
bucket_name = self.generate_bucket_name(tenant_id)
try:
if not self.client.bucket_exists(bucket_name):
self.client.make_bucket(bucket_name)
return bucket_name
return bucket_name
except S3Error as err:
raise Exception(f"Error occurred while creating bucket: {err}")
elif tenant_base == 'folder': # In this case, we are working within a predefined bucket
return current_app.config.get('OBJECT_STORAGE_BUCKET_NAME')
else:
raise ValueError(f"Invalid OBJECT_STORAGE_TENANT_BASE value: {tenant_base}")
def generate_object_name(self, tenant_id, document_id, language, version_id, filename):
tenant_base = current_app.config.get('OBJECT_STORAGE_TENANT_BASE', 'bucket')
if tenant_base == 'bucket':
return f"{document_id}/{language}/{version_id}/{filename}"
elif tenant_base == 'folder':
return f"tenant-{tenant_id}/documents/{document_id}/{language}/{version_id}/{filename}"
else:
raise ValueError(f"Invalid OBJECT_STORAGE_TENANT_BASE value: {tenant_base}")
def generate_asset_name(self, tenant_id, asset_id, asset_type, content_type):
tenant_base = current_app.config.get('OBJECT_STORAGE_TENANT_BASE', 'bucket')
if tenant_base == 'bucket':
return f"assets/{asset_type}/{asset_id}.{content_type}"
elif tenant_base == 'folder':
return f"tenant-{tenant_id}/assets/{asset_type}/{asset_id}.{content_type}"
else:
raise ValueError(f"Invalid OBJECT_STORAGE_TENANT_BASE value: {tenant_base}")
def upload_document_file(self, tenant_id, document_id, language, version_id, filename, file_data):
bucket_name = self.generate_bucket_name(tenant_id)
object_name = self.generate_object_name(tenant_id, document_id, language, version_id, filename)
try:
if isinstance(file_data, FileStorage):
file_data = file_data.read()
elif isinstance(file_data, io.BytesIO):
file_data = file_data.getvalue()
elif isinstance(file_data, str):
file_data = file_data.encode('utf-8')
elif not isinstance(file_data, bytes):
raise TypeError('Unsupported file type. Expected FileStorage, BytesIO, str, or bytes.')
self.client.put_object(
bucket_name, object_name, io.BytesIO(file_data), len(file_data)
)
return bucket_name, object_name, len(file_data)
except S3Error as err:
raise Exception(f"Error occurred while uploading file: {err}")
def upload_asset_file(self, tenant_id: int, asset_id: int, asset_type: str, file_type: str,
file_data: bytes | FileStorage | io.BytesIO | str, ) -> tuple[str, str, int]:
bucket_name = self.generate_bucket_name(tenant_id)
object_name = self.generate_asset_name(tenant_id, asset_id, asset_type, file_type)
try:
if isinstance(file_data, FileStorage):
file_data = file_data.read()
elif isinstance(file_data, io.BytesIO):
file_data = file_data.getvalue()
elif isinstance(file_data, str):
file_data = file_data.encode('utf-8')
elif not isinstance(file_data, bytes):
raise TypeError('Unsupported file type. Expected FileStorage, BytesIO, str, or bytes.')
self.client.put_object(
bucket_name, object_name, io.BytesIO(file_data), len(file_data)
)
return bucket_name, object_name, len(file_data)
except S3Error as err:
raise Exception(f"Error occurred while uploading asset: {err}")
def download_document_file(self, tenant_id, bucket_name, object_name):
try:
response = self.client.get_object(bucket_name, object_name)
return response.read()
except S3Error as err:
raise Exception(f"Error occurred while downloading file: {err}")
def download_asset_file(self, tenant_id, bucket_name, object_name):
try:
response = self.client.get_object(bucket_name, object_name)
return response.read()
except S3Error as err:
raise Exception(f"Error occurred while downloading asset: {err}")
def list_document_files(self, tenant_id, document_id, language=None, version_id=None):
bucket_name = self.generate_bucket_name(tenant_id)
prefix = f"{document_id}/"
if language:
prefix += f"{language}/"
if version_id:
prefix += f"{version_id}/"
try:
objects = self.client.list_objects(bucket_name, prefix=prefix, recursive=True)
return [obj.object_name for obj in objects]
except S3Error as err:
raise Exception(f"Error occurred while listing files: {err}")
def delete_document_file(self, tenant_id, document_id, language, version_id, filename):
bucket_name = self.generate_bucket_name(tenant_id)
object_name = self.generate_object_name(tenant_id, document_id, language, version_id, filename)
try:
self.client.remove_object(bucket_name, object_name)
return True
except S3Error as err:
raise Exception(f"Error occurred while deleting file: {err}")
def delete_object(self, bucket_name, object_name):
try:
self.client.remove_object(bucket_name, object_name)
except S3Error as err:
raise Exception(f"Error occurred while deleting object: {err}")
def get_bucket_size(self, tenant_id: int) -> int:
bucket_name = self.generate_bucket_name(tenant_id)
total_size = 0
for obj in self.client.list_objects(bucket_name, recursive=True):
total_size += obj.size
return total_size