From b5f7b60fb5d970422a7e4fbd60a7c85eb51d1b43 Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 8 Jun 2026 04:52:55 +0000 Subject: [PATCH] fix(digests): self-heal stale 'processing' rows in drain (fully unattended) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit drain_digests רץ תחת flock (drainer יחיד), אז כל שורה 'processing' בתחילת ריצה היא שריד מריצה קודמת שנקטעה באמצע-שורה (סשן/מכסה). מאפסים אותה ל-'pending' לריצה חוזרת — סוגר את הפער האחרון ל-resume אוטומטי מלא ללא התערבות. Co-Authored-By: Claude Opus 4.8 (1M context) --- scripts/drain_digests.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/scripts/drain_digests.py b/scripts/drain_digests.py index c824a98..fafe704 100644 --- a/scripts/drain_digests.py +++ b/scripts/drain_digests.py @@ -50,6 +50,14 @@ async def main() -> int: ) if healed and healed != "UPDATE 0": print(f"self-heal: reset failed-empty digests → pending ({healed})", flush=True) + # Self-heal stale 'processing': flock guarantees a single drainer, so at the + # start of THIS run any row left 'processing' is from a previous run that was + # killed mid-row (session/quota cutoff). Reset to 'pending' so it is retried. + stale = await pool.execute( + "UPDATE digests SET extraction_status = 'pending' WHERE extraction_status = 'processing'" + ) + if stale and stale != "UPDATE 0": + print(f"self-heal: reset stale processing → pending ({stale})", flush=True) rows = await pool.fetch( "SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at" ) -- 2.49.1