Bug Fix where - in exceptional cases - a connection without correct search path could be used (out of the connection pool).
This commit is contained in:
@@ -1,9 +1,9 @@
|
|||||||
"""Database related functions"""
|
"""Database related functions"""
|
||||||
from os import popen
|
from os import popen
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text, event
|
||||||
from sqlalchemy.schema import CreateSchema
|
from sqlalchemy.schema import CreateSchema
|
||||||
from sqlalchemy.exc import InternalError
|
from sqlalchemy.exc import InternalError
|
||||||
from sqlalchemy.orm import sessionmaker, scoped_session
|
from sqlalchemy.orm import sessionmaker, scoped_session, Session as SASession
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
|
||||||
@@ -16,6 +16,66 @@ class Database:
|
|||||||
def __init__(self, tenant: str) -> None:
|
def __init__(self, tenant: str) -> None:
|
||||||
self.schema = str(tenant)
|
self.schema = str(tenant)
|
||||||
|
|
||||||
|
# --- Session / Transaction events to ensure correct search_path per transaction ---
|
||||||
|
@event.listens_for(SASession, "after_begin")
|
||||||
|
def _set_search_path_per_tx(session, transaction, connection):
|
||||||
|
"""Ensure each transaction sees the right tenant schema, regardless of
|
||||||
|
which pooled connection is used. Uses SET LOCAL so it is scoped to the tx.
|
||||||
|
"""
|
||||||
|
schema = session.info.get("tenant_schema")
|
||||||
|
if schema:
|
||||||
|
try:
|
||||||
|
connection.exec_driver_sql(f'SET LOCAL search_path TO "{schema}", public')
|
||||||
|
# Optional visibility/logging for debugging
|
||||||
|
sp = connection.exec_driver_sql("SHOW search_path").scalar()
|
||||||
|
try:
|
||||||
|
current_app.logger.info(f"DBCTX tx_begin conn_id={id(connection.connection)} search_path={sp}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
current_app.logger.error(f"Failed to SET LOCAL search_path for schema {schema}: {e!r}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _log_db_context(self, origin: str = "") -> None:
|
||||||
|
"""Log key DB context info to diagnose schema/search_path issues.
|
||||||
|
|
||||||
|
Collects and logs in a single structured line:
|
||||||
|
- current_database()
|
||||||
|
- inet_server_addr(), inet_server_port()
|
||||||
|
- SHOW search_path
|
||||||
|
- current_schema()
|
||||||
|
- to_regclass('interaction')
|
||||||
|
- to_regclass('<tenant>.interaction')
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
db_name = db.session.execute(text("SELECT current_database()"))\
|
||||||
|
.scalar()
|
||||||
|
host = db.session.execute(text("SELECT inet_server_addr()"))\
|
||||||
|
.scalar()
|
||||||
|
port = db.session.execute(text("SELECT inet_server_port()"))\
|
||||||
|
.scalar()
|
||||||
|
search_path = db.session.execute(text("SHOW search_path"))\
|
||||||
|
.scalar()
|
||||||
|
current_schema = db.session.execute(text("SELECT current_schema()"))\
|
||||||
|
.scalar()
|
||||||
|
reg_unqualified = db.session.execute(text("SELECT to_regclass('interaction')"))\
|
||||||
|
.scalar()
|
||||||
|
qualified = f"{self.schema}.interaction"
|
||||||
|
reg_qualified = db.session.execute(
|
||||||
|
text("SELECT to_regclass(:qn)"),
|
||||||
|
{"qn": qualified}
|
||||||
|
).scalar()
|
||||||
|
current_app.logger.info(
|
||||||
|
"DBCTX origin=%s db=%s host=%s port=%s search_path=%s current_schema=%s to_regclass(interaction)=%s to_regclass(%s)=%s",
|
||||||
|
origin, db_name, host, port, search_path, current_schema, reg_unqualified, qualified, reg_qualified
|
||||||
|
)
|
||||||
|
except SQLAlchemyError as e:
|
||||||
|
current_app.logger.error(
|
||||||
|
f"DBCTX logging failed at {origin} for schema {self.schema}: {e!r}"
|
||||||
|
)
|
||||||
|
|
||||||
def get_engine(self):
|
def get_engine(self):
|
||||||
"""create new schema engine"""
|
"""create new schema engine"""
|
||||||
return db.engine.execution_options(
|
return db.engine.execution_options(
|
||||||
@@ -52,9 +112,32 @@ class Database:
|
|||||||
current_app.logger.error(f"💔 Error creating tables for schema {self.schema}: {e.args}")
|
current_app.logger.error(f"💔 Error creating tables for schema {self.schema}: {e.args}")
|
||||||
|
|
||||||
def switch_schema(self):
|
def switch_schema(self):
|
||||||
"""switch between tenant/public database schema"""
|
"""switch between tenant/public database schema with diagnostics logging"""
|
||||||
db.session.execute(text(f'set search_path to "{self.schema}", public'))
|
# Record the desired tenant schema on the active Session so events can use it
|
||||||
db.session.commit()
|
try:
|
||||||
|
db.session.info["tenant_schema"] = self.schema
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# Log the context before switching
|
||||||
|
self._log_db_context("before_switch")
|
||||||
|
try:
|
||||||
|
db.session.execute(text(f'set search_path to "{self.schema}", public'))
|
||||||
|
db.session.commit()
|
||||||
|
except SQLAlchemyError as e:
|
||||||
|
# Rollback on error to avoid InFailedSqlTransaction and log details
|
||||||
|
try:
|
||||||
|
db.session.rollback()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
current_app.logger.error(
|
||||||
|
f"Error switching search_path to {self.schema}: {e!r}"
|
||||||
|
)
|
||||||
|
# Also log context after failure
|
||||||
|
self._log_db_context("after_switch_failed")
|
||||||
|
# Re-raise to let caller decide handling if needed
|
||||||
|
raise
|
||||||
|
# Log the context after successful switch
|
||||||
|
self._log_db_context("after_switch")
|
||||||
|
|
||||||
def migrate_tenant_schema(self):
|
def migrate_tenant_schema(self):
|
||||||
"""migrate tenant database schema for new tenant"""
|
"""migrate tenant database schema for new tenant"""
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from dataclasses import dataclass
|
|||||||
from flask import current_app
|
from flask import current_app
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
from sqlalchemy.orm import joinedload
|
from sqlalchemy.orm import joinedload
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
from common.extensions import db, cache_manager
|
from common.extensions import db, cache_manager
|
||||||
from common.models.interaction import ChatSession, Interaction
|
from common.models.interaction import ChatSession, Interaction
|
||||||
@@ -111,6 +112,14 @@ class ChatSessionCacheHandler(CacheHandler[CachedSession]):
|
|||||||
Note:
|
Note:
|
||||||
Only adds the interaction if it has an answer
|
Only adds the interaction if it has an answer
|
||||||
"""
|
"""
|
||||||
|
# Log connection context right before any potential lazy load of interaction properties
|
||||||
|
try:
|
||||||
|
sp = db.session.execute(text("SHOW search_path")).scalar()
|
||||||
|
cid = id(db.session.connection().connection)
|
||||||
|
current_app.logger.info(f"DBCTX before_lazy_load conn_id={cid} search_path={sp}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
if not interaction.specialist_results:
|
if not interaction.specialist_results:
|
||||||
return # Skip incomplete interactions
|
return # Skip incomplete interactions
|
||||||
|
|
||||||
|
|||||||
@@ -351,16 +351,27 @@ def execute_specialist(self, tenant_id: int, specialist_id: int, arguments: Dict
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# Ensure DB session is usable after an error
|
||||||
|
try:
|
||||||
|
db.session.rollback()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
stacktrace = traceback.format_exc()
|
stacktrace = traceback.format_exc()
|
||||||
ept.send_update(task_id, "EveAI Specialist Error", {'Error': str(e)})
|
ept.send_update(task_id, "EveAI Specialist Error", {'Error': str(e)})
|
||||||
current_app.logger.error(f'execute_specialist: Error executing specialist: {e}\n{stacktrace}')
|
current_app.logger.error(f'execute_specialist: Error executing specialist: {e}\n{stacktrace}')
|
||||||
new_interaction.processing_error = str(e)[:255]
|
if new_interaction is not None:
|
||||||
try:
|
new_interaction.processing_error = str(e)[:255]
|
||||||
db.session.add(new_interaction)
|
try:
|
||||||
db.session.commit()
|
db.session.add(new_interaction)
|
||||||
except SQLAlchemyError as e:
|
db.session.commit()
|
||||||
stacktrace = traceback.format_exc()
|
except SQLAlchemyError as e:
|
||||||
current_app.logger.error(f'execute_specialist: Error updating interaction: {e}\n{stacktrace}')
|
# On failure to update, rollback and log
|
||||||
|
try:
|
||||||
|
db.session.rollback()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
stacktrace = traceback.format_exc()
|
||||||
|
current_app.logger.error(f'execute_specialist: Error updating interaction: {e}\n{stacktrace}')
|
||||||
|
|
||||||
self.update_state(state=states.FAILURE)
|
self.update_state(state=states.FAILURE)
|
||||||
raise
|
raise
|
||||||
|
|||||||
Reference in New Issue
Block a user