Files
eveAI/eveai_chat_workers/__init__.py
Josako 6ccba7d1e3 - Add test environment to __init__.py for all eveai services
- Add postgresql certificate to secrets for secure communication in staging and production environments
- Adapt for TLS communication with PostgreSQL
- Adapt tasks to handle invalid connections from the connection pool
- Migrate to psycopg3 for connection to PostgreSQL
2025-09-10 11:40:38 +02:00

66 lines
1.9 KiB
Python

import logging
import logging.config
from flask import Flask
import os
from common.utils.celery_utils import make_celery, init_celery
from common.extensions import db, cache_manager, minio_client
from config.logging_config import configure_logging
from config.config import get_config
def create_app(config_file=None):
app = Flask(__name__)
environment = os.getenv('FLASK_ENV', 'development')
match environment:
case 'development':
app.config.from_object(get_config('dev'))
case 'test':
app.config.from_object(get_config('test'))
case 'staging':
app.config.from_object(get_config('staging'))
case 'production':
app.config.from_object(get_config('prod'))
case _:
app.config.from_object(get_config('dev'))
configure_logging()
app.logger.info('Starting up eveai_chat_workers...')
register_extensions(app)
from . import specialists, retrievers
celery = make_celery(app.name, app.config)
init_celery(celery, app)
register_cache_handlers(app)
from eveai_chat_workers import tasks
print(tasks.tasks_ping())
return app, celery
def register_extensions(app):
db.init_app(app)
cache_manager.init_app(app)
minio_client.init_app(app)
def register_cache_handlers(app):
from common.utils.cache.config_cache import register_config_cache_handlers
register_config_cache_handlers(cache_manager)
from common.utils.cache.crewai_processed_config_cache import register_specialist_cache_handlers
register_specialist_cache_handlers(cache_manager)
from eveai_chat_workers.chat_session_cache import register_chat_session_cache_handlers
register_chat_session_cache_handlers(cache_manager)
from common.utils.cache.translation_cache import register_translation_cache_handlers
register_translation_cache_handlers(cache_manager)
app, celery = create_app()