From b3ee2f7ce923d20206f8abbf6f72384d9f31c425 Mon Sep 17 00:00:00 2001 From: Josako Date: Fri, 24 Oct 2025 11:42:50 +0200 Subject: [PATCH] Bug Fix where - in exceptional cases - a connection without correct search path could be used (out of the connection pool). --- common/utils/database.py | 93 ++++++++++++++++++++++-- eveai_chat_workers/chat_session_cache.py | 9 +++ eveai_chat_workers/tasks.py | 25 +++++-- 3 files changed, 115 insertions(+), 12 deletions(-) diff --git a/common/utils/database.py b/common/utils/database.py index 1f14869..bb05d0f 100644 --- a/common/utils/database.py +++ b/common/utils/database.py @@ -1,9 +1,9 @@ """Database related functions""" from os import popen -from sqlalchemy import text +from sqlalchemy import text, event from sqlalchemy.schema import CreateSchema from sqlalchemy.exc import InternalError -from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.orm import sessionmaker, scoped_session, Session as SASession from sqlalchemy.exc import SQLAlchemyError from flask import current_app @@ -16,6 +16,66 @@ class Database: def __init__(self, tenant: str) -> None: self.schema = str(tenant) + # --- Session / Transaction events to ensure correct search_path per transaction --- + @event.listens_for(SASession, "after_begin") + def _set_search_path_per_tx(session, transaction, connection): + """Ensure each transaction sees the right tenant schema, regardless of + which pooled connection is used. Uses SET LOCAL so it is scoped to the tx. + """ + schema = session.info.get("tenant_schema") + if schema: + try: + connection.exec_driver_sql(f'SET LOCAL search_path TO "{schema}", public') + # Optional visibility/logging for debugging + sp = connection.exec_driver_sql("SHOW search_path").scalar() + try: + current_app.logger.info(f"DBCTX tx_begin conn_id={id(connection.connection)} search_path={sp}") + except Exception: + pass + except Exception as e: + try: + current_app.logger.error(f"Failed to SET LOCAL search_path for schema {schema}: {e!r}") + except Exception: + pass + + def _log_db_context(self, origin: str = "") -> None: + """Log key DB context info to diagnose schema/search_path issues. + + Collects and logs in a single structured line: + - current_database() + - inet_server_addr(), inet_server_port() + - SHOW search_path + - current_schema() + - to_regclass('interaction') + - to_regclass('.interaction') + """ + try: + db_name = db.session.execute(text("SELECT current_database()"))\ + .scalar() + host = db.session.execute(text("SELECT inet_server_addr()"))\ + .scalar() + port = db.session.execute(text("SELECT inet_server_port()"))\ + .scalar() + search_path = db.session.execute(text("SHOW search_path"))\ + .scalar() + current_schema = db.session.execute(text("SELECT current_schema()"))\ + .scalar() + reg_unqualified = db.session.execute(text("SELECT to_regclass('interaction')"))\ + .scalar() + qualified = f"{self.schema}.interaction" + reg_qualified = db.session.execute( + text("SELECT to_regclass(:qn)"), + {"qn": qualified} + ).scalar() + current_app.logger.info( + "DBCTX origin=%s db=%s host=%s port=%s search_path=%s current_schema=%s to_regclass(interaction)=%s to_regclass(%s)=%s", + origin, db_name, host, port, search_path, current_schema, reg_unqualified, qualified, reg_qualified + ) + except SQLAlchemyError as e: + current_app.logger.error( + f"DBCTX logging failed at {origin} for schema {self.schema}: {e!r}" + ) + def get_engine(self): """create new schema engine""" return db.engine.execution_options( @@ -52,9 +112,32 @@ class Database: current_app.logger.error(f"💔 Error creating tables for schema {self.schema}: {e.args}") def switch_schema(self): - """switch between tenant/public database schema""" - db.session.execute(text(f'set search_path to "{self.schema}", public')) - db.session.commit() + """switch between tenant/public database schema with diagnostics logging""" + # Record the desired tenant schema on the active Session so events can use it + try: + db.session.info["tenant_schema"] = self.schema + except Exception: + pass + # Log the context before switching + self._log_db_context("before_switch") + try: + db.session.execute(text(f'set search_path to "{self.schema}", public')) + db.session.commit() + except SQLAlchemyError as e: + # Rollback on error to avoid InFailedSqlTransaction and log details + try: + db.session.rollback() + except Exception: + pass + current_app.logger.error( + f"Error switching search_path to {self.schema}: {e!r}" + ) + # Also log context after failure + self._log_db_context("after_switch_failed") + # Re-raise to let caller decide handling if needed + raise + # Log the context after successful switch + self._log_db_context("after_switch") def migrate_tenant_schema(self): """migrate tenant database schema for new tenant""" diff --git a/eveai_chat_workers/chat_session_cache.py b/eveai_chat_workers/chat_session_cache.py index 1e2a58e..6da4c28 100644 --- a/eveai_chat_workers/chat_session_cache.py +++ b/eveai_chat_workers/chat_session_cache.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from flask import current_app from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import joinedload +from sqlalchemy import text from common.extensions import db, cache_manager from common.models.interaction import ChatSession, Interaction @@ -111,6 +112,14 @@ class ChatSessionCacheHandler(CacheHandler[CachedSession]): Note: Only adds the interaction if it has an answer """ + # Log connection context right before any potential lazy load of interaction properties + try: + sp = db.session.execute(text("SHOW search_path")).scalar() + cid = id(db.session.connection().connection) + current_app.logger.info(f"DBCTX before_lazy_load conn_id={cid} search_path={sp}") + except Exception: + pass + if not interaction.specialist_results: return # Skip incomplete interactions diff --git a/eveai_chat_workers/tasks.py b/eveai_chat_workers/tasks.py index b3411ea..596355a 100644 --- a/eveai_chat_workers/tasks.py +++ b/eveai_chat_workers/tasks.py @@ -351,16 +351,27 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict return response except Exception as e: + # Ensure DB session is usable after an error + try: + db.session.rollback() + except Exception: + pass stacktrace = traceback.format_exc() ept.send_update(task_id, "EveAI Specialist Error", {'Error': str(e)}) current_app.logger.error(f'execute_specialist: Error executing specialist: {e}\n{stacktrace}') - new_interaction.processing_error = str(e)[:255] - try: - db.session.add(new_interaction) - db.session.commit() - except SQLAlchemyError as e: - stacktrace = traceback.format_exc() - current_app.logger.error(f'execute_specialist: Error updating interaction: {e}\n{stacktrace}') + if new_interaction is not None: + new_interaction.processing_error = str(e)[:255] + try: + db.session.add(new_interaction) + db.session.commit() + except SQLAlchemyError as e: + # On failure to update, rollback and log + try: + db.session.rollback() + except Exception: + pass + stacktrace = traceback.format_exc() + current_app.logger.error(f'execute_specialist: Error updating interaction: {e}\n{stacktrace}') self.update_state(state=states.FAILURE) raise