fix(digests): self-heal stale 'processing' rows in drain (fully unattended)

drain_digests רץ תחת flock (drainer יחיד), אז כל שורה 'processing' בתחילת ריצה
היא שריד מריצה קודמת שנקטעה באמצע-שורה (סשן/מכסה). מאפסים אותה ל-'pending'
לריצה חוזרת — סוגר את הפער האחרון ל-resume אוטומטי מלא ללא התערבות.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-08 04:52:55 +00:00
parent 2c75666d26
commit b5f7b60fb5

View File

@@ -50,6 +50,14 @@ async def main() -> int:
) )
if healed and healed != "UPDATE 0": if healed and healed != "UPDATE 0":
print(f"self-heal: reset failed-empty digests → pending ({healed})", flush=True) print(f"self-heal: reset failed-empty digests → pending ({healed})", flush=True)
# Self-heal stale 'processing': flock guarantees a single drainer, so at the
# start of THIS run any row left 'processing' is from a previous run that was
# killed mid-row (session/quota cutoff). Reset to 'pending' so it is retried.
stale = await pool.execute(
"UPDATE digests SET extraction_status = 'pending' WHERE extraction_status = 'processing'"
)
if stale and stale != "UPDATE 0":
print(f"self-heal: reset stale processing → pending ({stale})", flush=True)
rows = await pool.fetch( rows = await pool.fetch(
"SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at" "SELECT id FROM digests WHERE extraction_status = 'pending' ORDER BY created_at"
) )