fix(extraction): סינון cited_only מתור/מוני החילוץ (#140)
31 שורות case_law עם source_kind='cited_only' (ציטוט-בלבד, ללא full_text/chunks) נושאות halacha_extraction_status='pending' רק כברירת-מחדל ומזהמות את מונה ה-pending ובמתזמר/בדף-התפעול — אין להן מה לחלץ. תיקון (G1 — תיקון-במקור, G2 — מסנן יחיד משותף): - db.EXTRACTION_ELIGIBLE_PREDICATE — מקור-אמת יחיד ל"שורה ברת-חילוץ" (source_kind <> 'cited_only' AND יש precedent_chunks). מוחל ב-list_pending_extraction_requests; #139 יעשה בו שימוש-חוזר ל-reconcile (אותו כלל, לא כפול). - מוני-snapshot מסננים cited_only: halacha_drain_supervisor.db_snapshot, web/app.py meta+hal_ext (GROUP BY status). - reconcile_metadata_status.py מורחב לכסות גם את תור-ההלכות: cited_only→'skipped' (אותו terminal-state כמו צד-המטא, תור-תאום, G2). בוצע על ה-DB החי: 31 הועברו ל-'skipped' (metadata כבר היה מיושב — אידמפוטנטי). התפלגות-אחרי: halacha pending=9 (עבודה אמיתית), skipped=31, completed=309. בדיקות: test_extraction_queue_eligibility (predicate + list_pending מחיל אותו, שני ה-kinds). כל 345 בדיקות mcp עוברות. guards נקיים. Invariants: G1 (terminal-state אמיתי במקור), G2 (predicate יחיד, ללא תור מקביל), INV-DM1 (stub לא-searchable אינו מועמד-חילוץ), G12 (leak-guard נקי). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6746,6 +6746,19 @@ async def request_halacha_extraction(case_law_id: UUID) -> bool:
|
||||
return result == "UPDATE 1"
|
||||
|
||||
|
||||
# Single source of truth for "this row can actually be extracted" (#140, G2):
|
||||
# a real precedent whose text was chunked into precedent_chunks. ``cited_only``
|
||||
# stubs are citation-only (no full_text, no chunks) and can NEVER yield an
|
||||
# extraction, so they must never enter the work queue or get proactively
|
||||
# re-queued. Shared by the queue reader (list_pending_extraction_requests) and
|
||||
# the orphan-reconcile job (#139) so the eligibility rule lives in ONE place.
|
||||
# Assumes the surrounding query exposes the table as ``case_law``.
|
||||
EXTRACTION_ELIGIBLE_PREDICATE = (
|
||||
"case_law.source_kind <> 'cited_only' "
|
||||
"AND EXISTS (SELECT 1 FROM precedent_chunks pc WHERE pc.case_law_id = case_law.id)"
|
||||
)
|
||||
|
||||
|
||||
async def list_pending_extraction_requests(
|
||||
kind: str = "metadata", # 'metadata' | 'halacha'
|
||||
limit: int = 20,
|
||||
@@ -6764,11 +6777,14 @@ async def list_pending_extraction_requests(
|
||||
# internal_committee rows could be stamped (we opened that gate in
|
||||
# request_metadata_extraction / request_halacha_extraction) but stayed
|
||||
# invisible to the worker forever.
|
||||
# Exclude ineligible rows (cited_only / chunkless) so a stub can never sit in
|
||||
# the work queue — same predicate the reconcile job uses (#140, G2).
|
||||
rows = await pool.fetch(
|
||||
f"""SELECT id, case_number, case_name, court, date,
|
||||
practice_area, is_binding, {col} AS requested_at
|
||||
FROM case_law
|
||||
WHERE {col} IS NOT NULL
|
||||
AND {EXTRACTION_ELIGIBLE_PREDICATE}
|
||||
ORDER BY {col} ASC
|
||||
LIMIT $1""",
|
||||
limit,
|
||||
|
||||
66
mcp-server/tests/test_extraction_queue_eligibility.py
Normal file
66
mcp-server/tests/test_extraction_queue_eligibility.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Regression test for #140 — cited_only stubs must never enter the extraction
|
||||
work queue.
|
||||
|
||||
``list_pending_extraction_requests`` must apply ``EXTRACTION_ELIGIBLE_PREDICATE``
|
||||
so a citation-only stub (no full_text, no precedent_chunks) is excluded even if
|
||||
it carries a stamped ``*_extraction_requested_at`` and a default 'pending'
|
||||
status. The predicate is the single shared eligibility rule (#139 reuses it).
|
||||
|
||||
Runs OFFLINE — a fake pool captures the SQL and asserts the predicate is wired
|
||||
into the WHERE clause (same style as test_halacha_reextract_preserves_approved).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from legal_mcp.services import db
|
||||
|
||||
|
||||
class _FakePool:
|
||||
def __init__(self) -> None:
|
||||
self.fetched: list[str] = []
|
||||
|
||||
async def fetch(self, sql: str, *args): # noqa: ANN002
|
||||
self.fetched.append(sql)
|
||||
return []
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def fake_pool(monkeypatch: pytest.MonkeyPatch) -> _FakePool:
|
||||
pool = _FakePool()
|
||||
|
||||
async def _get_pool() -> _FakePool:
|
||||
return pool
|
||||
|
||||
monkeypatch.setattr(db, "get_pool", _get_pool)
|
||||
return pool
|
||||
|
||||
|
||||
def _norm(sql: str) -> str:
|
||||
return " ".join(sql.split())
|
||||
|
||||
|
||||
def test_predicate_excludes_cited_only_and_requires_chunks() -> None:
|
||||
pred = _norm(db.EXTRACTION_ELIGIBLE_PREDICATE)
|
||||
assert "source_kind <> 'cited_only'" in pred
|
||||
assert "precedent_chunks" in pred and "EXISTS" in pred.upper()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("kind", ["metadata", "halacha"])
|
||||
def test_list_pending_applies_eligibility_predicate(fake_pool: _FakePool, kind: str) -> None:
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(db.list_pending_extraction_requests(kind=kind))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
assert fake_pool.fetched, "expected a queue query"
|
||||
sql = _norm(fake_pool.fetched[0])
|
||||
# The eligibility predicate must be ANDed into the queue WHERE clause.
|
||||
assert _norm(db.EXTRACTION_ELIGIBLE_PREDICATE) in sql, sql
|
||||
# ...alongside the requested_at gate, for the correct kind.
|
||||
col = "metadata_extraction_requested_at" if kind == "metadata" else "halacha_extraction_requested_at"
|
||||
assert f"{col} IS NOT NULL" in sql, sql
|
||||
@@ -32,7 +32,7 @@
|
||||
| `legal-court-fetch-drain.config.cjs` | pm2/js | **תזמון שעתי של `drain_court_fetch.py`** (cron `17 * * * *`, `COURT_FETCH_DRAIN_CRON` לעקיפה) — הופך את לולאת יומון→אחזור→קליטה ל-fully-autonomous. `autorestart:false` (one-shot per tick). דורש `legal-court-fetch-service` רץ. התקנה: `pm2 start scripts/legal-court-fetch-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
|
||||
| `drain_metadata_queue.py` | python | **ריקון תור חילוץ-המטא של הפסיקה** — `process_pending_extractions(kind='metadata')` ב-batches עד ריק. רץ על **Gemini Flash** (structured JSON, `gemini_session`) — מהיר ואמין, במקום ה-claude CLI ה-agentic שפגע ב-`error_max_turns`. no-op מהיר כשריק. הרצה ידנית: `mcp-server/.venv/bin/python scripts/drain_metadata_queue.py [batch]`. | דרך `legal-metadata-drain.config.cjs` (pm2 cron) |
|
||||
| `legal-metadata-drain.config.cjs` | pm2/js | **תזמון כל 15 דק' של `drain_metadata_queue.py`** (cron `*/15 * * * *`, `METADATA_DRAIN_CRON` לעקיפה) — מונע סתימה של תור חילוץ-המטא ב-/precedents. דורש `GEMINI_API_KEY` ב-`~/.env`. התקנה: `pm2 start scripts/legal-metadata-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
|
||||
| `reconcile_metadata_status.py` | python | **נרמול `metadata_extraction_status` תקוע (G1)** — שורות עם ברירת-המחדל `'pending'` שאינן בצנרת-Gemini נערמות כ-backlog-רפאים שהדריינר (סורק `*_requested_at IS NOT NULL`) לעולם לא מנקה ומנפח את מונה "ממתין" ב-/operations. מיישב כל שורה למצב-אמת במקור: `internal_committee`→`completed` (מטא דטרמיניסטי, מחוץ ל-Gemini), `external_upload` מלא→`completed`, `external_upload` עם טקסט וחסר שם/תקציר→חותם `requested_at` (הדריינר יטפל), `cited_only` (אין טקסט)→`skipped`. אידמפוטנטי. תיקון-המקור הנלווה ב-`db.create_internal_committee_decision`. הרצה: `mcp-server/.venv/bin/python scripts/reconcile_metadata_status.py`. | חד-פעמי / re-runnable כהגנת-drift |
|
||||
| `reconcile_metadata_status.py` | python | **נרמול `metadata_extraction_status` תקוע (G1)** — שורות עם ברירת-המחדל `'pending'` שאינן בצנרת-Gemini נערמות כ-backlog-רפאים שהדריינר (סורק `*_requested_at IS NOT NULL`) לעולם לא מנקה ומנפח את מונה "ממתין" ב-/operations. מיישב כל שורה למצב-אמת במקור: `internal_committee`→`completed` (מטא דטרמיניסטי, מחוץ ל-Gemini), `external_upload` מלא→`completed`, `external_upload` עם טקסט וחסר שם/תקציר→חותם `requested_at` (הדריינר יטפל), `cited_only` (אין טקסט)→`skipped`. **מכסה את שני התורים (#140):** אותו `cited_only→skipped` מוחל גם על `halacha_extraction_status` (תור-תאום, G2). אידמפוטנטי. תיקון-המקור הנלווה ב-`db.create_internal_committee_decision` + מסנן `EXTRACTION_ELIGIBLE_PREDICATE` ב-`list_pending_extraction_requests`. הרצה: `mcp-server/.venv/bin/python scripts/reconcile_metadata_status.py`. | חד-פעמי / re-runnable כהגנת-drift |
|
||||
| `backfill_plans_registry.py` | python | **ייבוא מרשם-התכניות (V38) מקורפוס-ההחלטות** — סורק `data/cases/*/drafts/decision.md` + `data/training/cmp/*.md`, מאתר פסקאות-תוקף ("פורסמה למתן תוקף"), מחלץ רשומת-תכנית מובנית (`plans_extractor`, claude CLI מקומי) ועושה `upsert_plan(review_status='pending_review')` עם provenance. ה-SSOT לזהות+תוקף של תכנית, פעם-אחת במקום גזירה-מחדש מהשומות בכל תיק (G2). idempotent על plan_number מנורמל (G1/G3). `--dry-run` (ברירת-מחדל, כלום לא נכתב) / `--apply` / `--glob` (תת-קבוצה). אחרי הרצה: אישור-יו"ר ב-`plan_review`/תור-האישור (G10). הרץ: `mcp-server/.venv/bin/python scripts/backfill_plans_registry.py`. | ידני (חד-פעמי + לפי-צורך כשנוספות החלטות) |
|
||||
| `backfill_precedent_citations.py` | python | **#145** — backfill ל-`citation_formatted` (מראה-מקום) ברשומות `case_law` ריקות, באמצעות `db.format_precedent_citation` הדטרמיניסטי (X1 §3 / INV-ID2 — שדה-תצוגה נגזר, לא מעוצב ע"י LLM ש-הפיל אותו, #145). שני מעברים לכל שורה: (1) **ללא-LLM** — הרכבה מהשדות השמורים (ממלא שורות-ועדה עם parties+docket+date); (2) **LLM** — אם (1) נמנע ויש full_text, מריץ את מחלץ-המטא (extract_and_apply) שמחלץ רכיבים (parties, citation_prefix) ואז מרכיב — זה ממלא את 171 פסקי-בתי-המשפט מהכותרת. שורות בלי רובריקה (אין צדדים) נשארות ריקות ומדווחות, לא מנוחשות (INV-AH). idempotent — רק שדה ריק (G3). `--apply` / `--limit N` / `--no-llm`. הרץ: `HOME=/home/chaim mcp-server/.venv/bin/python scripts/backfill_precedent_citations.py`. | ידני (חד-פעמי + לפי-צורך) |
|
||||
| `auto-sync-cases.sh` | bash | סנכרון תיקי ערר ל-Gitea — רץ כל דקה | `* * * * *` (cron) |
|
||||
|
||||
@@ -247,7 +247,7 @@ def db_snapshot() -> dict:
|
||||
"async def m():\n"
|
||||
" pool=await db.get_pool()\n"
|
||||
" async with pool.acquire() as c:\n"
|
||||
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law GROUP BY 1\")}\n"
|
||||
" st={r['halacha_extraction_status'] or 'unknown':r['n'] for r in await c.fetch(\"SELECT halacha_extraction_status,count(*) n FROM case_law WHERE source_kind <> 'cited_only' GROUP BY 1\")}\n"
|
||||
" procs=[r['case_number'] for r in await c.fetch(\"SELECT case_number FROM case_law WHERE halacha_extraction_status='processing' ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5\")]\n"
|
||||
" hal=await c.fetchval('SELECT count(*) FROM halachot')\n"
|
||||
" ck=await c.fetchval('SELECT count(*) FROM precedent_chunks WHERE halacha_extracted_at IS NOT NULL')\n"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Reconcile stale ``metadata_extraction_status='pending'`` rows (G1).
|
||||
"""Reconcile stale ``*_extraction_status='pending'`` rows (G1).
|
||||
|
||||
The column defaults to 'pending', but only ``source_kind='external_upload'``
|
||||
rows with extractable text genuinely need the Gemini metadata drain. Internal
|
||||
@@ -13,6 +13,10 @@ This settles each row to a truthful terminal state at the source:
|
||||
- external_upload w/ text but missing name/summary → stamp requested_at (real work → drain picks it up)
|
||||
- cited_only (no text) → 'skipped' (terminal; nothing to extract)
|
||||
|
||||
Covers BOTH extraction queues (#140): the cited_only→'skipped' settle is applied
|
||||
to ``halacha_extraction_status`` as well as ``metadata_extraction_status`` — same
|
||||
phantom-backlog fix on the twin queue, one reconcile script (G2).
|
||||
|
||||
Idempotent and re-runnable (a healthy DB reports all-zero). The companion source
|
||||
fix lives in db.create_internal_committee_decision (inserts 'completed' directly)
|
||||
so internal rows never re-enter this state.
|
||||
@@ -57,6 +61,15 @@ async def main() -> int:
|
||||
"WHERE source_kind = 'cited_only' "
|
||||
"AND metadata_extraction_status = 'pending'"
|
||||
)
|
||||
# Halacha side (#140): cited_only stubs inherit DEFAULT 'pending' but have no
|
||||
# text/chunks to extract holdings from — settle them to the same terminal
|
||||
# 'skipped' the metadata side uses, so they stop inflating the halacha
|
||||
# pending counter / supervisor snapshot. Same source-fix, one reconcile (G2).
|
||||
cited_hal = await pool.execute(
|
||||
"UPDATE case_law SET halacha_extraction_status = 'skipped' "
|
||||
"WHERE source_kind = 'cited_only' "
|
||||
"AND halacha_extraction_status = 'pending'"
|
||||
)
|
||||
|
||||
def n(tag: str) -> str:
|
||||
try:
|
||||
@@ -67,7 +80,8 @@ async def main() -> int:
|
||||
print(f"internal_committee → completed : {n(internal)}")
|
||||
print(f"external_upload → completed : {n(external_done)}")
|
||||
print(f"external_upload → requeued : {n(external_requeued)}")
|
||||
print(f"cited_only → skipped : {n(cited)}")
|
||||
print(f"cited_only metadata → skipped : {n(cited)}")
|
||||
print(f"cited_only halacha → skipped : {n(cited_hal)}")
|
||||
|
||||
rows = await pool.fetch(
|
||||
"SELECT coalesce(metadata_extraction_status,'NULL') s, count(*) n "
|
||||
@@ -76,6 +90,13 @@ async def main() -> int:
|
||||
print("\nresulting metadata_extraction_status distribution:")
|
||||
for r in rows:
|
||||
print(f" {r['s']:<12} {r['n']}")
|
||||
hal_rows = await pool.fetch(
|
||||
"SELECT coalesce(halacha_extraction_status,'NULL') s, count(*) n "
|
||||
"FROM case_law GROUP BY 1 ORDER BY 2 DESC"
|
||||
)
|
||||
print("\nresulting halacha_extraction_status distribution:")
|
||||
for r in hal_rows:
|
||||
print(f" {r['s']:<12} {r['n']}")
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -6626,9 +6626,11 @@ async def operations_snapshot():
|
||||
"SELECT coalesce(nullif(citation_raw,''), case_number_norm) "
|
||||
"FROM court_fetch_jobs WHERE status = 'running' ORDER BY updated_at LIMIT 5"
|
||||
)
|
||||
# Exclude cited_only stubs — citation-only rows with no text can never be
|
||||
# extracted, so their default 'pending' must not inflate the counter (#140).
|
||||
meta = await counts(
|
||||
"SELECT coalesce(metadata_extraction_status,'unknown'), count(*) "
|
||||
"FROM case_law GROUP BY 1"
|
||||
"FROM case_law WHERE source_kind <> 'cited_only' GROUP BY 1"
|
||||
)
|
||||
meta_queued = await conn.fetchval(
|
||||
"SELECT count(*) FROM case_law WHERE metadata_extraction_requested_at IS NOT NULL"
|
||||
@@ -6639,7 +6641,7 @@ async def operations_snapshot():
|
||||
)
|
||||
hal_ext = await counts(
|
||||
"SELECT coalesce(halacha_extraction_status,'unknown'), count(*) "
|
||||
"FROM case_law GROUP BY 1"
|
||||
"FROM case_law WHERE source_kind <> 'cited_only' GROUP BY 1"
|
||||
)
|
||||
hal_queued = await conn.fetchval(
|
||||
"SELECT count(*) FROM case_law WHERE halacha_extraction_requested_at IS NOT NULL"
|
||||
|
||||
Reference in New Issue
Block a user