diff --git a/scripts/drain_digests.py b/scripts/drain_digests.py index c824a98..fafe704 100644 --- a/scripts/drain_digests.py +++ b/scripts/drain_digests.py @@ -50,6 +50,14 @@ async def main() -> int: ) if healed and healed != "UPDATE 0": print(f"self-heal: reset failed-empty digests → pending ({healed})", flush=True) + # Self-heal stale 'processing': flock guarantees a single drainer, so at the + # start of THIS run any row left 'processing' is from a previous run that was + # killed mid-row (session/quota cutoff). Reset to 'pending' so it is retried. + stale = await pool.execute( + "UPDATE digests SET extraction_status = 'pending' WHERE extraction_status = 'processing'" + ) + if stale and stale != "UPDATE 0": + print(f"self-heal: reset stale processing → pending ({stale})", flush=True) rows = await pool.fetch( "SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at" )