fix(db): serialise schema migrations with an advisory lock + stagger drain crons
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 6s

legal-halacha-drain crashed 29× with asyncpg DeadlockDetectedError. Root cause:
every short-lived cron drain re-runs the idempotent schema migrations on startup
(get_pool → _run_schema_migrations), and three jobs (metadata-drain, halacha-drain,
halacha-supervisor) all fired on the same minute (*/15 / top-of-hour). Two
processes running the DDL concurrently took AccessExclusiveLock in opposite order
→ Postgres killed one with a deadlock.

Two-layer fix:
- Root cause: wrap _run_schema_migrations in a session-level pg_advisory_lock so
  only one process applies DDL at a time; concurrent migrators wait instead of
  deadlocking. DDL body extracted to _apply_schema_ddl. Idempotent, schema
  unchanged.
- Defence-in-depth: give each cron drain a distinct firing minute —
  metadata :00, supervisor :05, halacha-drain :10, digest :12, court-fetch :17 —
  so siblings no longer start at the same instant. SCRIPTS.md updated to match.

Invariants: G1 (fix at source — the single migration path — not the symptom);
G2 (no parallel control path introduced).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-13 08:08:19 +00:00
parent 387cd37255
commit 49acde591e
5 changed files with 35 additions and 8 deletions

View File

@@ -1512,8 +1512,27 @@ ALTER TABLE drain_controls ADD COLUMN IF NOT EXISTS burst_until TIMESTAMPTZ;
"""
# Stable, arbitrary key for the session-level advisory lock that serialises
# schema DDL across processes. Every short-lived process (cron drains, services)
# re-runs the idempotent migrations on startup; without this lock two processes
# that fire at the same minute race on AccessExclusiveLock and Postgres kills one
# with DeadlockDetectedError. The lock makes a concurrent migrator wait instead.
_MIGRATION_LOCK_KEY = 778899001
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
async with pool.acquire() as conn:
# Serialise DDL across processes: block until any sibling migrator
# finishes, then run against the (now up-to-date, idempotent) schema.
await conn.execute("SELECT pg_advisory_lock($1)", _MIGRATION_LOCK_KEY)
try:
await _apply_schema_ddl(conn)
finally:
await conn.execute("SELECT pg_advisory_unlock($1)", _MIGRATION_LOCK_KEY)
logger.info("Database schema initialized (v1-v37)")
async def _apply_schema_ddl(conn: asyncpg.Connection) -> None:
await conn.execute(SCHEMA_SQL)
await conn.execute(MIGRATIONS_SQL)
await conn.execute(SCHEMA_V2_SQL)
@@ -1552,7 +1571,6 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await conn.execute(SCHEMA_V35_SQL)
await conn.execute(SCHEMA_V36_SQL)
await conn.execute(SCHEMA_V37_SQL)
logger.info("Database schema initialized (v1-v37)")
async def init_schema() -> None: