Files
eveAI/eveai_app/views/healthz_views.py
2025-09-07 14:45:47 +02:00

108 lines
2.9 KiB
Python

from flask import Blueprint, current_app, request, jsonify
from flask_healthz import HealthError
from sqlalchemy.exc import SQLAlchemyError
from celery.exceptions import TimeoutError as CeleryTimeoutError
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
import time
from common.extensions import db, metrics, minio_client
from common.utils.celery_utils import current_celery
healthz_bp = Blueprint('healthz', __name__, url_prefix='/healthz')
# Define Prometheus metrics
api_request_counter = Counter('api_request_count', 'API Request Count', ['method', 'endpoint'])
api_request_latency = Histogram('api_request_latency_seconds', 'API Request latency')
def liveness():
try:
# Basic check to see if the app is running
return True
except Exception:
raise HealthError("Liveness check failed")
def readiness():
checks = {
"database": check_database(),
"celery": check_celery(),
"minio": check_minio(),
# Add more checks as needed
}
if not all(checks.values()):
raise HealthError("Readiness check failed")
def check_database():
try:
# Perform a simple database query
db.session.execute("SELECT 1")
return True
except SQLAlchemyError:
current_app.logger.error("Database check failed", exc_info=True)
return False
def check_celery():
try:
# Send a simple task to Celery
result = current_celery.send_task('ping', queue='embeddings')
response = result.get(timeout=10) # Wait for up to 10 seconds for a response
return response == 'pong'
except CeleryTimeoutError:
current_app.logger.error("Celery check timed out", exc_info=True)
return False
except Exception as e:
current_app.logger.error(f"Celery check failed: {str(e)}", exc_info=True)
return False
def check_minio():
try:
# List buckets to check if MinIO is accessible
minio_client.list_buckets()
return True
except Exception as e:
current_app.logger.error(f"MinIO check failed: {str(e)}", exc_info=True)
return False
@healthz_bp.route('/ready')
def ready():
"""
Health check endpoint for readiness probe
"""
return jsonify({"status": "ok"})
@healthz_bp.route('/live')
def live():
"""
Health check endpoint for liveness probe
"""
return jsonify({"status": "ok"})
@healthz_bp.route('/metrics')
@metrics.do_not_track()
def prometheus_metrics():
return generate_latest(), 200, {'Content-Type': CONTENT_TYPE_LATEST}
# Custom metrics example
@healthz_bp.before_request
def before_request():
request.start_time = time.time()
api_request_counter.labels(
method=request.method, endpoint=request.endpoint
).inc()
@healthz_bp.after_request
def after_request(response):
request_duration = time.time() - request.start_time
api_request_latency.observe(request_duration)
return response