Merge pull request 'feat(digests): digest_kind classification — robust extraction for all issue types (X12)' (#141) from worktree-digest-kind into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m58s

This commit was merged in pull request #141.
This commit is contained in:
2026-06-08 06:02:36 +00:00
5 changed files with 67 additions and 21 deletions

View File

@@ -44,6 +44,13 @@
`underlying_date` (מתן הפסק) שונה מ-`digest_date` (גיליון היומון) — מקור-באגים נפוץ; חילוץ-המטא-דאטה
מבחין ביניהם מפורשות.
**`digest_kind` (סיווג-גיליון, V32):** רוב הגיליונות הם `decision` (סיכום פס"ד → `underlying_citation`),
אך חלקם `announcement` — עדכון/הודעה ללא הכרעה (חקיקה, נוהל, ברכת-שנה) שאין לו מראה-מקום. החילוץ
מסווג כל גיליון ותמיד מחלץ `concept_tag`/`headline`/`summary` (קיימים לכל סוג); `underlying_citation`
רק ל-`decision`. **שימוש קריטי:** הגדרת-"כשל" של ה-drain self-heal היא `completed` **עם
`digest_kind=''`** (מעולם לא סווג) — כך הודעה (kind=`announcement`, בלי citation) **אינה** נחשבת כשל
ואינה מנוסה-מחדש לנצח. ההיוריסטיקה הישנה ("שני השדות ריקים") טיפלה בהודעות בטעות כ-retry אינסופי.
---
## 3. למה זה לא קורפוס-ציטוט רביעי (הקושיה המרכזית — G2)

View File

@@ -1383,6 +1383,24 @@ CREATE INDEX IF NOT EXISTS idx_court_fetch_jobs_digest ON court_fetch_jobs(diges
WHERE digest_id IS NOT NULL;
"""
SCHEMA_V32_SQL = """
-- digest_kind (X12): classify each "כל יום" issue. Most are decision-summaries
-- (point at a ruling → underlying_citation set), but some are non-decision
-- ANNOUNCEMENTS (legislative/planning updates, new-year notices) that legitimately
-- have no ruling. Classifying explicitly lets enrich treat an announcement as a
-- SUCCESS (concept+summary, no citation) instead of a both-empty "failure" that
-- the drain self-heal would retry forever. '' = not yet classified (= a genuine
-- extraction failure once enriched).
ALTER TABLE digests ADD COLUMN IF NOT EXISTS digest_kind TEXT NOT NULL DEFAULT '';
-- Backfill legacy rows cheaply (no LLM): a row with a citation is a decision,
-- otherwise an announcement. MUST run before the new self-heal keys on
-- digest_kind='' (else it would reset every legacy row). Idempotent.
UPDATE digests SET digest_kind =
CASE WHEN coalesce(underlying_citation,'') <> '' THEN 'decision' ELSE 'announcement' END
WHERE coalesce(digest_kind,'') = '' AND extraction_status = 'completed';
CREATE INDEX IF NOT EXISTS idx_digests_kind ON digests(digest_kind);
"""
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
async with pool.acquire() as conn:
@@ -1418,7 +1436,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await conn.execute(SCHEMA_V29_SQL)
await conn.execute(SCHEMA_V30_SQL)
await conn.execute(SCHEMA_V31_SQL)
logger.info("Database schema initialized (v1-v31)")
await conn.execute(SCHEMA_V32_SQL)
logger.info("Database schema initialized (v1-v32)")
async def init_schema() -> None:
@@ -3600,7 +3619,7 @@ _DIGEST_COLS = (
"headline_holding, analysis_text, summary, underlying_citation, "
"underlying_court, underlying_date, underlying_judge, practice_area, "
"appeal_subtype, subject_tags, linked_case_law_id, source_document_path, "
"content_hash, extraction_status, created_at, updated_at"
"content_hash, extraction_status, digest_kind, created_at, updated_at"
)
_DIGEST_UPDATE_ALLOWED = {
@@ -3608,7 +3627,7 @@ _DIGEST_UPDATE_ALLOWED = {
"headline_holding", "analysis_text", "summary", "underlying_citation",
"underlying_court", "underlying_date", "underlying_judge", "practice_area",
"appeal_subtype", "subject_tags", "source_document_path", "content_hash",
"extraction_status",
"extraction_status", "digest_kind",
}

View File

@@ -211,6 +211,16 @@ async def enrich_digest(digest_id: UUID | str, progress: ProgressCb | None = Non
fields["underlying_date"] = extracted["underlying_date"]
if not (row.get("subject_tags") or []) and extracted.get("subject_tags"):
fields["subject_tags"] = extracted["subject_tags"]
# digest_kind classifies the issue (decision vs announcement). A successful
# extraction (any field returned) must end with a non-empty kind — that is the
# signal the drain self-heal uses to tell "enriched" from "failed". If the
# model omitted it, infer: a ruling citation → decision, else announcement.
if extracted and not (row.get("digest_kind") or "").strip():
kind = extracted.get("digest_kind")
if kind not in ("decision", "announcement", "other"):
cite = fields.get("underlying_citation") or row.get("underlying_citation") or ""
kind = "decision" if cite.strip() else "announcement"
fields["digest_kind"] = kind
if fields:
try:

View File

@@ -33,7 +33,7 @@ _VALID_PRACTICE_AREAS = {"", "rishuy_uvniya", "betterment_levy", "compensation_1
# below contains '{' / '}' which str.format would treat as placeholders and
# crash (same trap documented in precedent_metadata_extractor).
DIGEST_EXTRACTION_PROMPT = """אתה מסייע משפטי בכיר. לפניך "יומון" — סיכום עמוד-אחד של משרד עפר טויסטר (עלון "כל יום")
על פסק דין/החלטה אחת בתחום תכנון ובנייה / היטל השבחה / פיצויים (ס' 197). חלץ ממנו מטא-דאטה לקטלוג.
על **החלטה/פסק דין** אחד, או על **עדכון/הודעה** (חקיקה, נוהל, הודעת-תכנון, ברכת-שנה) בתחום תכנון ובנייה / היטל השבחה / פיצויים (ס' 197). חלץ ממנו מטא-דאטה לקטלוג.
**אל תמציא** — שדה שלא מופיע בטקסט → השאר ריק (מחרוזת ריקה / מערך ריק).
@@ -41,12 +41,13 @@ DIGEST_EXTRACTION_PROMPT = """אתה מסייע משפטי בכיר. לפניך
החזר JSON אחד (object — לא array), ללא markdown וללא הסברים:
{
"digest_kind": "סווג את הגיליון: 'decision' = סיכום פסק דין/החלטה (יש מראה-מקום בתחתית) · 'announcement' = עדכון/הודעה ללא הכרעה (חקיקה, נוהל, הודעת-תכנון, ברכה) · 'other' = אחר. **חובה למלא תמיד.**",
"yomon_number": "מספר היומון מהכותרת ('יומון מס' 5163''5163'). ספרות בלבד. אם אין — ריק.",
"digest_date_iso": "YYYY-MM-DD — תאריך גיליון היומון (בכותרת, למשל '7 ביוני 2026''2026-06-07').",
"concept_tag": "תג-המושג שבמרכאות בראש העמוד (למשל 'שיקול הדעת המצומצם', 'Cherry-picking'). ביטוי קצר אחד.",
"headline_holding": "כותרת-ההלכה המודגשת מתחת לתג — משפט אחד שמסכם מה נקבע (למשל 'ביהמ\\"ש - שיקול דעת הוועדה המחוזית אינו מצומצם לטעות חמורה').",
"summary": "תקציר ניטרלי 2-3 משפטים בגוף שלישי: מה הייתה השאלה ומה הוכרע. בלי שיפוט.",
"underlying_citation": "מראה-המקום של פסק הדין/ההחלטה המקורי, כפי שמופיע בתחתית היומון, מילה במילה (למשל 'עת\\"מ 46111-12-22 יכין-אפק בע\\"מ נ' הוועדה המחוזית'). זהו השדה הקריטי — חלץ אותו במלואו ובדיוק.",
"concept_tag": "תג-המושג/הכותרת בראש העמוד (למשל 'שיקול הדעת המצומצם', 'Cherry-picking', או 'עדכונים לשנה החדשה' בעדכון). ביטוי קצר אחד. **חלץ תמיד — קיים לכל סוג גיליון.**",
"headline_holding": "הכותרת המודגשת מתחת לתג — משפט אחד שמסכם את עיקר הגיליון (מה נקבע בהחלטה, או נושא העדכון). **חלץ תמיד.**",
"summary": "תקציר ניטרלי 2-3 משפטים בגוף שלישי: בהחלטה — מה הייתה השאלה ומה הוכרע; בעדכון — מה תוכן/משמעות העדכון. בלי שיפוט. **חלץ תמיד.**",
"underlying_citation": "**רק ל-decision** — מראה-המקום של פסק הדין/ההחלטה המקורי, כפי שמופיע בתחתית היומון, מילה במילה (למשל 'עת\\"מ 46111-12-22 יכין-אפק בע\\"מ נ' הוועדה המחוזית'). בעדכון/הודעה — ריק. זהו השדה הקריטי ל-decision — חלץ אותו במלואו ובדיוק.",
"underlying_court": "הערכאה שנתנה את פסק הדין המקורי (למשל 'בית המשפט לעניינים מנהליים מרכז-לוד', 'ועדת הערר מחוז ירושלים').",
"underlying_date_iso": "YYYY-MM-DD — תאריך מתן פסק הדין/ההחלטה המקורי (לרוב 'ניתן ביום DD.M.YY' בתחתית). שים לב: זה שונה מתאריך גיליון היומון!",
"underlying_judge": "שם השופט/ת או יו\\"ר ההרכב שנתן את פסק הדין המקורי (למשל 'יעל טויסטר ישראלי'). בלי תארים ('עו\\"ד', 'כב' השופט').",
@@ -56,11 +57,12 @@ DIGEST_EXTRACTION_PROMPT = """אתה מסייע משפטי בכיר. לפניך
}
## כללי איכות
1. **underlying_citation** — השדה החשוב ביותר; הוא הגשר לפסק הדין המקורי. חלץ מההערות/התחתית, מילה במילה.
2. **הבחן בין שני התאריכים**: digest_date_iso = תאריך גיליון היומון (בכותרת); underlying_date_iso = מועד מתן פסק הדין (בתחתית, 'ניתן ביום...'). אל תבלבל.
3. **summary** — ניטרלי, גוף שלישי, בלי מילות שיפוט.
4. **subject_tags** — snake_case, תחום ועדת ערר תכנון ובנייה בלבד.
5. אם רכיב לא מופיע בבירור — השאר את אותו שדה ריק. אל תנחש.
1. **digest_kind** — חובה. אם יש מראה-מקום של פסק דין/החלטה בתחתית → 'decision'. אם זה עדכון/הודעה/נוהל/ברכה ללא הכרעה → 'announcement'.
2. **concept_tag / headline_holding / summary** — חלץ **תמיד**, לכל סוג גיליון (גם עדכון). אלה לא ייחודיים להחלטות.
3. **underlying_citation** — רק ל-decision; הוא הגשר לפסק הדין. בעדכון — השאר ריק (זה תקין, לא חוסר).
4. **הבחן בין שני התאריכים**: digest_date_iso = תאריך גיליון היומון (בכותרת); underlying_date_iso = מועד מתן פסק הדין (בתחתית, 'ניתן ביום...'). אל תבלבל.
5. **subject_tags** — snake_case, תחום ועדת ערר תכנון ובנייה בלבד.
6. אם רכיב לא מופיע בבירור — השאר את אותו שדה ריק. אל תנחש.
"""
@@ -125,6 +127,10 @@ async def extract(raw_text: str, model: str | None = None) -> dict:
if s:
out[key] = s
kind = _norm_str(result, "digest_kind").lower()
if kind in ("decision", "announcement", "other"):
out["digest_kind"] = kind
dd = _norm_date(result, "digest_date_iso")
if dd is not None:
out["digest_date"] = dd

View File

@@ -36,20 +36,24 @@ CONCURRENCY = int(os.environ.get("DIGEST_DRAIN_CONCURRENCY", "3"))
async def main() -> int:
pool = await db.get_pool()
# Self-heal: an enrich that failed mid-LLM (e.g. the local claude
# subscription window was exhausted) can leave a row 'completed' with no
# concept_tag AND no underlying_citation — a real digest always extracts at
# least a citation, so "both empty" means the extraction never landed. Reset
# those to 'pending' so the next run retries (idempotent auto-resume). Safe:
# successfully-enriched rows always have a concept_tag or citation.
# get_pool() runs schema migrations first — incl. the V32 digest_kind backfill
# that classifies legacy rows — so the failure check below is safe from the
# very first run (no legacy row has digest_kind='').
#
# Self-heal: a successful enrich ALWAYS sets digest_kind (decision/announcement
# /other). So a 'completed' row with digest_kind='' means the extraction never
# landed (e.g. the local claude subscription window was exhausted) — reset to
# 'pending' to retry (idempotent auto-resume). This correctly does NOT touch
# announcements (digest_kind='announcement', legitimately no citation), which
# the old "both fields empty" heuristic wrongly retried forever.
healed = await pool.execute(
"UPDATE digests SET extraction_status = 'pending' "
"WHERE extraction_status = 'completed' "
"AND coalesce(concept_tag,'') = '' AND coalesce(underlying_citation,'') = '' "
"AND coalesce(digest_kind,'') = '' "
"AND coalesce(analysis_text,'') <> ''"
)
if healed and healed != "UPDATE 0":
print(f"self-heal: reset failed-empty digests → pending ({healed})", flush=True)
print(f"self-heal: reset unclassified (failed) digests → pending ({healed})", flush=True)
# Self-heal stale 'processing': flock guarantees a single drainer, so at the
# start of THIS run any row left 'processing' is from a previous run that was
# killed mid-row (session/quota cutoff). Reset to 'pending' so it is retried.