diff --git a/common/utils/minio_utils.py b/common/utils/minio_utils.py index f950532..667bf07 100644 --- a/common/utils/minio_utils.py +++ b/common/utils/minio_utils.py @@ -12,6 +12,7 @@ class MinioClient: self.client = None def init_app(self, app: Flask): + app.logger.debug(f"Initializing MinIO client with endpoint: {app.config['MINIO_ENDPOINT']} and secure: {app.config.get('MINIO_USE_HTTPS', False)}") self.client = Minio( app.config['MINIO_ENDPOINT'], access_key=app.config['MINIO_ACCESS_KEY'], diff --git a/config/config.py b/config/config.py index 6260cde..995467b 100644 --- a/config/config.py +++ b/config/config.py @@ -23,9 +23,40 @@ class Config(object): DB_PASS = environ.get('DB_PASS') DB_NAME = environ.get('DB_NAME') DB_PORT = environ.get('DB_PORT') - SQLALCHEMY_DATABASE_URI = f'postgresql+pg8000://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}' + SQLALCHEMY_DATABASE_URI = f'postgresql+psycopg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}' SQLALCHEMY_BINDS = {'public': SQLALCHEMY_DATABASE_URI} + # Database Engine Options (health checks and keepalives) + PGSQL_CERT_DATA = environ.get('PGSQL_CERT') + PGSQL_CA_CERT_PATH = None + if PGSQL_CERT_DATA: + _tmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') + _tmp.write(PGSQL_CERT_DATA) + _tmp.flush() + _tmp.close() + PGSQL_CA_CERT_PATH = _tmp.name + + # Psycopg3 connect args (libpq parameters) + _CONNECT_ARGS = { + 'connect_timeout': 5, + 'keepalives': 1, + 'keepalives_idle': 60, + 'keepalives_interval': 30, + 'keepalives_count': 5, + } + if PGSQL_CA_CERT_PATH: + _CONNECT_ARGS.update({ + 'sslmode': 'require', + 'sslrootcert': PGSQL_CA_CERT_PATH, + }) + + SQLALCHEMY_ENGINE_OPTIONS = { + 'pool_pre_ping': True, + 'pool_recycle': 180, + 'pool_use_lifo': True, + 'connect_args': _CONNECT_ARGS, + } + # Redis Settings ------------------------------------------------------------------------------ REDIS_URL = environ.get('REDIS_URL') REDIS_PORT = environ.get('REDIS_PORT', '6379') @@ -342,9 +373,38 @@ class DevConfig(Config): # PATH settings ffmpeg_path = '/usr/bin/ffmpeg' + # OBJECT STORAGE + OBJECT_STORAGE_TYPE = 'MINIO' + OBJECT_STORAGE_TENANT_BASE = 'folder' + OBJECT_STORAGE_BUCKET_NAME = ('eveai-tenants') + # MINIO + MINIO_ENDPOINT = 'minio:9000' + MINIO_ACCESS_KEY = 'minioadmin' + MINIO_SECRET_KEY = 'minioadmin' + MINIO_USE_HTTPS = False + + +class TestConfig(Config): + DEVELOPMENT = True + DEBUG = True + FLASK_DEBUG = True + EXPLAIN_TEMPLATE_LOADING = False + + # Define the nginx prefix used for the specific apps + EVEAI_APP_LOCATION_PREFIX = '' + EVEAI_CHAT_LOCATION_PREFIX = '/chat' + CHAT_CLIENT_PREFIX = 'chat-client/chat/' + + # Define the static path + STATIC_URL = 'https://evie-staging-static.askeveai.com' + + # PATH settings + ffmpeg_path = '/usr/bin/ffmpeg' + # OBJECT STORAGE OBJECT_STORAGE_TYPE = 'MINIO' OBJECT_STORAGE_TENANT_BASE = 'bucket' + OBJECT_STORAGE_BUCKET_NAME = 'main' # MINIO MINIO_ENDPOINT = 'minio:9000' MINIO_ACCESS_KEY = 'minioadmin' @@ -411,6 +471,7 @@ class ProdConfig(Config): def get_config(config_name='dev'): configs = { 'dev': DevConfig, + 'test': TestConfig, 'staging': StagingConfig, 'prod': ProdConfig, 'default': DevConfig, diff --git a/eveai_api/__init__.py b/eveai_api/__init__.py index 5d5bf55..18016a8 100644 --- a/eveai_api/__init__.py +++ b/eveai_api/__init__.py @@ -30,6 +30,8 @@ def create_app(config_file=None): match environment: case 'development': app.config.from_object(get_config('dev')) + case 'test': + app.config.from_object(get_config('test')) case 'staging': app.config.from_object(get_config('staging')) case 'production': diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index 5089939..905eafc 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -32,6 +32,8 @@ def create_app(config_file=None): match environment: case 'development': app.config.from_object(get_config('dev')) + case 'test': + app.config.from_object(get_config('test')) case 'staging': app.config.from_object(get_config('staging')) case 'production': diff --git a/eveai_chat_client/__init__.py b/eveai_chat_client/__init__.py index b20c7b3..db933e9 100644 --- a/eveai_chat_client/__init__.py +++ b/eveai_chat_client/__init__.py @@ -26,6 +26,10 @@ def create_app(config_file=None): match environment: case 'development': app.config.from_object(get_config('dev')) + case 'test': + app.config.from_object(get_config('test')) + case 'staging': + app.config.from_object(get_config('staging')) case 'production': app.config.from_object(get_config('prod')) case _: diff --git a/eveai_chat_workers/__init__.py b/eveai_chat_workers/__init__.py index 7bfeb96..e2dd4fc 100644 --- a/eveai_chat_workers/__init__.py +++ b/eveai_chat_workers/__init__.py @@ -17,6 +17,10 @@ def create_app(config_file=None): match environment: case 'development': app.config.from_object(get_config('dev')) + case 'test': + app.config.from_object(get_config('test')) + case 'staging': + app.config.from_object(get_config('staging')) case 'production': app.config.from_object(get_config('prod')) case _: diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index 380cee8..3140101 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -3,7 +3,8 @@ from typing import Dict, Any, Optional import traceback from flask import current_app -from sqlalchemy.exc import SQLAlchemyError +from celery import states +from sqlalchemy.exc import SQLAlchemyError, InterfaceError, OperationalError from common.utils.config_field_types import TaggingFields from common.utils.database import Database @@ -214,7 +215,7 @@ def prepare_arguments(specialist: Any, arguments: Dict[str, Any]) -> Dict[str, A raise ArgumentPreparationError(str(e)) -@current_celery.task(name='execute_specialist', queue='llm_interactions', bind=True) +@current_celery.task(bind=True, name='execute_specialist', queue='llm_interactions', autoretry_for=(InterfaceError, OperationalError), retry_backoff=True, retry_jitter=True, max_retries=5) def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict[str, Any], session_id: str, user_timezone: str) -> dict: """ @@ -356,6 +357,7 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict stacktrace = traceback.format_exc() current_app.logger.error(f'execute_specialist: Error updating interaction: {e}\n{stacktrace}') + self.update_state(state=states.FAILURE) raise diff --git a/eveai_entitlements/__init__.py b/eveai_entitlements/__init__.py index f599e1b..ea1fd2b 100644 --- a/eveai_entitlements/__init__.py +++ b/eveai_entitlements/__init__.py @@ -17,6 +17,10 @@ def create_app(config_file=None): match environment: case 'development': app.config.from_object(get_config('dev')) + case 'test': + app.config.from_object(get_config('test')) + case 'staging': + app.config.from_object(get_config('staging')) case 'production': app.config.from_object(get_config('prod')) case _: diff --git a/eveai_entitlements/tasks.py b/eveai_entitlements/tasks.py index 598bd94..2d7bf1f 100644 --- a/eveai_entitlements/tasks.py +++ b/eveai_entitlements/tasks.py @@ -3,8 +3,9 @@ import os from datetime import datetime as dt, timezone as tz, datetime from flask import current_app +from celery import states from sqlalchemy import or_, and_, text -from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.exc import SQLAlchemyError, InterfaceError, OperationalError from common.extensions import db from common.models.user import Tenant from common.models.entitlements import BusinessEventLog, LicenseUsage, License @@ -20,8 +21,8 @@ def ping(): return 'pong' -@current_celery.task(name='persist_business_events', queue='entitlements') -def persist_business_events(log_entries): +@current_celery.task(bind=True, name='persist_business_events', queue='entitlements', autoretry_for=(InterfaceError, OperationalError), retry_backoff=True, retry_jitter=True, max_retries=5) +def persist_business_events(self, log_entries): """ Persist multiple business event logs to the database in a single transaction diff --git a/eveai_ops/__init__.py b/eveai_ops/__init__.py index 49a6f1d..bae5b69 100644 --- a/eveai_ops/__init__.py +++ b/eveai_ops/__init__.py @@ -26,6 +26,8 @@ def create_app(config_file=None): match environment: case 'development': app.config.from_object(get_config('dev')) + case 'test': + app.config.from_object(get_config('test')) case 'staging': app.config.from_object(get_config('staging')) case 'production': diff --git a/eveai_ops/healthz.py b/eveai_ops/healthz.py new file mode 100644 index 0000000..78966fd --- /dev/null +++ b/eveai_ops/healthz.py @@ -0,0 +1,13 @@ +from flask import Blueprint, jsonify + +healthz_bp = Blueprint('ops_healthz', __name__) + +@healthz_bp.route('/healthz/live', methods=['GET']) +def live(): + # Minimal liveness: process is running + return jsonify(status='ok'), 200 + +@healthz_bp.route('/healthz/ready', methods=['GET']) +def ready(): + # Minimal readiness for now (no external checks) + return jsonify(status='ready'), 200 diff --git a/eveai_workers/__init__.py b/eveai_workers/__init__.py index 1ce9ad1..bd857e8 100644 --- a/eveai_workers/__init__.py +++ b/eveai_workers/__init__.py @@ -17,6 +17,10 @@ def create_app(config_file=None): match environment: case 'development': app.config.from_object(get_config('dev')) + case 'test': + app.config.from_object(get_config('test')) + case 'staging': + app.config.from_object(get_config('staging')) case 'production': app.config.from_object(get_config('prod')) case _: diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 89ddf95..3ec8bd1 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -10,7 +10,7 @@ from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from sqlalchemy import or_ -from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.exc import SQLAlchemyError, InterfaceError, OperationalError import traceback from common.extensions import db, cache_manager @@ -37,8 +37,10 @@ def ping(): return 'pong' -@current_celery.task(name='create_embeddings', queue='embeddings') -def create_embeddings(tenant_id, document_version_id): +@current_celery.task(bind=True, name='create_embeddings', queue='embeddings', + autoretry_for=(InterfaceError, OperationalError), + retry_backoff=True, retry_jitter=True, max_retries=5) +def create_embeddings(self, tenant_id, document_version_id): document_version = None try: # Retrieve Tenant for which we are processing @@ -127,7 +129,7 @@ def create_embeddings(tenant_id, document_version_id): document_version.processing_finished_at = dt.now(tz.utc) document_version.processing_error = str(e)[:255] db.session.commit() - create_embeddings.update_state(state=states.FAILURE) + self.update_state(state=states.FAILURE) raise diff --git a/requirements.txt b/requirements.txt index 6e18641..803aa54 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,7 +34,7 @@ langchain-text-splitters~=0.3.10 langcodes~=3.4.0 langdetect~=1.0.9 openai~=1.102.0 -pg8000~=1.31.2 +psycopg[binary]~=3.2 pgvector~=0.2.5 pycryptodome~=3.20.0 pydantic>=2.10.3,<3 diff --git a/scaleway/manifests/base/secrets/eveai-external-secrets.yaml b/scaleway/manifests/base/secrets/eveai-external-secrets.yaml index 6852c8b..16da196 100644 --- a/scaleway/manifests/base/secrets/eveai-external-secrets.yaml +++ b/scaleway/manifests/base/secrets/eveai-external-secrets.yaml @@ -39,3 +39,6 @@ spec: - secretKey: REDIS_CERT remoteRef: key: name:eveai-redis-certificate + - secretKey: PGSQL_CERT + remoteRef: + key: name:eveai-postgresql-certificate