From d95a36f31043762cb01c1ab372e9491a8e91c429 Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 8 Jun 2026 05:13:49 +0000 Subject: [PATCH] feat(extraction): precedent metadata via Gemini Flash + scheduled drainer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /precedents metadata queue was stuck — 24 rows requested, nothing draining them — and the agentic claude CLI hit error_max_turns on what is a single structured text→JSON task (slow + flaky). Metadata extraction is bounded extraction, the wrong fit for an agentic loop. - gemini_session.py: query_json drop-in (gemini-2.5-flash, JSON mode, httpx — no new SDK dep). Reads GEMINI_API_KEY (~/.env; SoT Infisical nautilus:/external-apis/gemini). Host-side only — no LLM from the container. - precedent_metadata_extractor: claude_session.query_json → gemini_session. Validated live: rich, accurate fields (case_name/summary/appeal_subtype/tags). - process_pending_extractions: kind-aware cooldown — metadata 2s (Gemini, fast), halacha keeps 30s (Claude rate limits). - drain_metadata_queue.py + legal-metadata-drain.config.cjs (pm2 cron */15) so the queue never clogs again. SCRIPTS.md. - X8 INV-FP5 updated: per-task engine choice (Gemini=bounded metadata, claude_session=agentic halacha), both host-side, single canonical queue (G2). Agentic/voice-sensitive work (writing, analysis, halacha) stays on claude_session (Daphna's subscription). Gemini cost ≈ $0.10/1M tokens — negligible. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/spec/X8-field-provenance.md | 14 +-- .../src/legal_mcp/services/gemini_session.py | 97 +++++++++++++++++++ .../legal_mcp/services/precedent_library.py | 9 +- .../services/precedent_metadata_extractor.py | 7 +- scripts/SCRIPTS.md | 2 + scripts/drain_metadata_queue.py | 48 +++++++++ scripts/legal-metadata-drain.config.cjs | 34 +++++++ 7 files changed, 202 insertions(+), 9 deletions(-) create mode 100644 mcp-server/src/legal_mcp/services/gemini_session.py create mode 100644 scripts/drain_metadata_queue.py create mode 100644 scripts/legal-metadata-drain.config.cjs diff --git a/docs/spec/X8-field-provenance.md b/docs/spec/X8-field-provenance.md index 6286c24..31158d9 100644 --- a/docs/spec/X8-field-provenance.md +++ b/docs/spec/X8-field-provenance.md @@ -92,12 +92,14 @@ NCSC/JTC — *AI in Courts* (verifiable citation) | סטטוס: verified **אכיפה:** `proofreader.verify_quote` בעת חילוץ → `quote_verified`. **הפרה ידועה:** — (קיים; ה-flag נכתב, אך אין חיווי ב-UI — ראה [X6 INV-UI6](X6-ui-api-contract.md)). -### INV-FP5: חילוץ אסינכרוני דרך claude_session מקומי -**כלל:** חילוץ-LLM (מטא, הלכות) רץ **אסינכרוני, מתור**, דרך `claude_session` **מקומי בלבד** — לא חוסם את -ה-web, ולא קורא ל-LLM מהקונטיינר. מופע של [G2](00-constitution.md#inv-g2-מקור-אמת-יחיד--אין-מסלולים-מקבילים-מתפצלים) -(מסלול-LLM קנוני יחיד). **פרויקטלי-תפעולי.** תואם זיכרון `feedback_claude_session_local_only`. -**מקור-סמכות:** [ingest.py](../../mcp-server/src/legal_mcp/services/ingest.py) (queue בצעד 12 → `process_pending_extractions`); [legal-ai/CLAUDE.md](../../CLAUDE.md) (claude_session local-only). -**אכיפה:** queue + `precedent_process_pending`; קריאות-LLM רק מ-MCP מקומי. +### INV-FP5: חילוץ אסינכרוני, מתור, צד-מארח (לא מהקונטיינר) +**כלל:** חילוץ-LLM (מטא, הלכות) רץ **אסינכרוני, מתור, מצד-המארח** — לא חוסם את ה-web ולא קורא ל-LLM +מהקונטיינר. **בחירת-מנוע לפי אופי-המשימה (לא מסלול מקביל):** חילוץ-מטא הוא משימה *תחומה* (טקסט→JSON) +ולכן רץ על **Gemini Flash** (`gemini_session`, structured JSON) — ה-claude CLI ה-agentic פגע ב- +`error_max_turns`; חילוץ-הלכות (רגיש-קול/agentic) נשאר על **`claude_session`** (CLI מקומי, מנוי דפנה). +שני המנועים מתנקזים לתור-החילוץ הקנוני היחיד ([G2](00-constitution.md#inv-g2-מקור-אמת-יחיד--אין-מסלולים-מקבילים-מתפצלים)). **פרויקטלי-תפעולי.** +**מקור-סמכות:** [ingest.py](../../mcp-server/src/legal_mcp/services/ingest.py) (queue → `process_pending_extractions`); [gemini_session.py](../../mcp-server/src/legal_mcp/services/gemini_session.py) (מטא); [legal-ai/CLAUDE.md](../../CLAUDE.md) (claude_session local-only להלכות). `GEMINI_API_KEY` בצד-המארח בלבד — לא בקונטיינר (תואם `feedback_claude_session_local_only`: אין קריאות-LLM מהקונטיינר). +**אכיפה:** queue + `precedent_process_pending` + drainers מתוזמנים (`legal-metadata-drain`/CEO); קריאות-LLM רק מצד-המארח. **הפרה ידועה:** תור-החילוץ **סמוי** (אין הבחנה pending-initial מול pending-review; אין extraction-job table) ([gap-audit GAP-45](gap-audit.md); [X9](X9-mcp-tool-contract.md)). --- diff --git a/mcp-server/src/legal_mcp/services/gemini_session.py b/mcp-server/src/legal_mcp/services/gemini_session.py new file mode 100644 index 0000000..088a9bb --- /dev/null +++ b/mcp-server/src/legal_mcp/services/gemini_session.py @@ -0,0 +1,97 @@ +"""Gemini structured-output helper — a drop-in for ``claude_session.query_json`` +for BOUNDED extraction tasks (text → JSON). + +Why a second LLM path: metadata extraction is a single structured call (fill +case_name/summary/headnote/tags from a verdict's text), not an agentic loop. The +``claude -p`` CLI behind ``claude_session`` is agentic — it reaches for tools and +hits ``error_max_turns`` on a task that should be one shot — so it was slow and +flaky for the precedent metadata queue. Gemini Flash with JSON mode +(``responseMimeType: application/json``) is the right tool: one call, schema- +clean JSON, fast, and ~$0.10/1M tokens (negligible for this volume). + +Scope: **bounded extraction only** (precedent metadata). The agentic, voice- +sensitive work — decision writing, analysis, halacha extraction — stays on +``claude_session`` (Daphna's subscription, zero API cost). This is a deliberate +per-task provider choice, not a wholesale move off Claude. + +Key: ``GEMINI_API_KEY`` (host ~/.env; SoT Infisical nautilus:/external-apis/gemini +as ``GOOGLE_GEMINI_API_KEY``). Model: ``GEMINI_MODEL`` (default gemini-2.5-flash). +Direct REST via httpx — no extra SDK dependency. +""" + +from __future__ import annotations + +import json +import logging +import os + +import httpx + +logger = logging.getLogger(__name__) + +_BASE = "https://generativelanguage.googleapis.com/v1beta" +_DEFAULT_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") +_DEFAULT_TIMEOUT = float(os.environ.get("GEMINI_TIMEOUT_S", "120")) + + +class GeminiError(RuntimeError): + """Gemini API call failed or returned an unexpected shape.""" + + +def _api_key() -> str: + key = os.environ.get("GEMINI_API_KEY", "").strip() + if not key: + raise GeminiError( + "GEMINI_API_KEY אינו מוגדר (host ~/.env / Infisical " + "nautilus:/external-apis/gemini)." + ) + return key + + +async def query_json( + prompt: str, + timeout: float | int = _DEFAULT_TIMEOUT, + *, + system: str | None = None, + model: str | None = None, + # Accepted for drop-in parity with claude_session.query_json; ignored here. + effort: str | None = None, + tools: str | None = None, +) -> dict | list | None: + """Single structured-output call → parsed JSON. Drop-in for + ``claude_session.query_json``. Raises ``GeminiError`` on failure (the caller + treats that like any extraction failure — recorded, never silently wrong). + """ + model = model or _DEFAULT_MODEL + body: dict = { + "contents": [{"role": "user", "parts": [{"text": prompt}]}], + "generationConfig": { + "responseMimeType": "application/json", + "temperature": 0, + }, + } + if system: + body["system_instruction"] = {"parts": [{"text": system}]} + + url = f"{_BASE}/models/{model}:generateContent" + try: + async with httpx.AsyncClient(timeout=float(timeout)) as client: + resp = await client.post(url, params={"key": _api_key()}, json=body) + except httpx.HTTPError as e: + raise GeminiError(f"Gemini request failed: {e}") from e + if resp.status_code != 200: + raise GeminiError(f"Gemini HTTP {resp.status_code}: {resp.text[:200]}") + + data = resp.json() + # Surface an explicit safety/finish block rather than returning empty. + cand = (data.get("candidates") or [{}])[0] + if cand.get("finishReason") in ("SAFETY", "RECITATION", "PROHIBITED_CONTENT"): + raise GeminiError(f"Gemini blocked output: finishReason={cand['finishReason']}") + try: + text = cand["content"]["parts"][0]["text"] + except (KeyError, IndexError, TypeError) as e: + raise GeminiError(f"Gemini unexpected response: {str(data)[:200]}") from e + try: + return json.loads(text) + except json.JSONDecodeError as e: + raise GeminiError(f"Gemini returned non-JSON: {text[:200]}") from e diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 053bbe1..e5219ea 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -15,6 +15,7 @@ from __future__ import annotations import asyncio import logging +import os from pathlib import Path from typing import Awaitable, Callable from uuid import UUID @@ -179,6 +180,9 @@ async def reextract_halachot( # precedent into a 429 storm. Observed 2026-05-03: 1110/20 succeeded with 9 # halachot, 317/10 immediately after returned silent no_halachot. INTER_PRECEDENT_COOLDOWN_SEC = 30 +# Metadata extraction is on Gemini (fast, high rate limits) — a brief spacer is +# enough; the 30s above is for the Claude-backed halacha path. +METADATA_COOLDOWN_SEC = float(os.environ.get("METADATA_COOLDOWN_SEC", "2")) # How many times to retry a precedent that came back as 'extraction_failed' # (i.e. >50% chunks crashed). Each retry uses a longer cooldown. @@ -226,11 +230,14 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) - cid, effort=config.HALACHA_BULK_EXTRACT_EFFORT, ) + # Metadata extraction runs on Gemini (high rate limits, fast) — the long + # cooldown is only needed for halacha (Claude/Anthropic rate limits). + cooldown = METADATA_COOLDOWN_SEC if kind == "metadata" else INTER_PRECEDENT_COOLDOWN_SEC results: list[dict] = [] processed = 0 for idx, row in enumerate(pending): if idx > 0: - await asyncio.sleep(INTER_PRECEDENT_COOLDOWN_SEC) + await asyncio.sleep(cooldown) cid = UUID(str(row["id"])) attempts = 0 result: dict = {} diff --git a/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py b/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py index a30e2ea..7209c59 100644 --- a/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py +++ b/mcp-server/src/legal_mcp/services/precedent_metadata_extractor.py @@ -19,7 +19,7 @@ from datetime import date as date_type from uuid import UUID from legal_mcp.config import parse_llm_json -from legal_mcp.services import claude_session, db +from legal_mcp.services import db, gemini_session logger = logging.getLogger(__name__) @@ -150,7 +150,10 @@ async def extract_metadata(case_law_id: UUID | str) -> dict: ) try: - result = await claude_session.query_json( + # Bounded structured extraction → Gemini Flash (JSON mode). The agentic + # claude CLI hit error_max_turns on this single-shot task; see + # gemini_session.py. Voice-sensitive/agentic work stays on claude_session. + result = await gemini_session.query_json( user_msg, system=METADATA_EXTRACTION_PROMPT, ) except Exception as e: diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 33237c4..86fc7a5 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -24,6 +24,8 @@ | `legal-reaper.config.cjs` | pm2/js | **דמון pm2 ל-`reap_orphan_procs.py --loop`** (ברירת-מחדל 180ש', `REAP_INTERVAL_S` לעקיפה). `max_memory_restart 100M` (ה-reaper עצמו לא ידלוף). התקנה: `pm2 start scripts/legal-reaper.config.cjs && pm2 save`. לוגים: `pm2 logs legal-reaper`. | pm2 (host-side) | | `drain_court_fetch.py` | python | **ריקון תור-אחזור הפסיקה (X13)** — קורא ל-`court_fetch_orchestrator.drain_pending(limit)` שמוריד+קולט כל job ממתין שהיומונים מילאו, וקושר חזרה ליומון. מקומי בלבד (ingest = claude CLI). no-op מהיר כשהתור ריק. הרצה ידנית: `mcp-server/.venv/bin/python scripts/drain_court_fetch.py [limit]`. | דרך `legal-court-fetch-drain.config.cjs` (pm2 cron) | | `legal-court-fetch-drain.config.cjs` | pm2/js | **תזמון שעתי של `drain_court_fetch.py`** (cron `17 * * * *`, `COURT_FETCH_DRAIN_CRON` לעקיפה) — הופך את לולאת יומון→אחזור→קליטה ל-fully-autonomous. `autorestart:false` (one-shot per tick). דורש `legal-court-fetch-service` רץ. התקנה: `pm2 start scripts/legal-court-fetch-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | +| `drain_metadata_queue.py` | python | **ריקון תור חילוץ-המטא של הפסיקה** — `process_pending_extractions(kind='metadata')` ב-batches עד ריק. רץ על **Gemini Flash** (structured JSON, `gemini_session`) — מהיר ואמין, במקום ה-claude CLI ה-agentic שפגע ב-`error_max_turns`. no-op מהיר כשריק. הרצה ידנית: `mcp-server/.venv/bin/python scripts/drain_metadata_queue.py [batch]`. | דרך `legal-metadata-drain.config.cjs` (pm2 cron) | +| `legal-metadata-drain.config.cjs` | pm2/js | **תזמון כל 15 דק' של `drain_metadata_queue.py`** (cron `*/15 * * * *`, `METADATA_DRAIN_CRON` לעקיפה) — מונע סתימה של תור חילוץ-המטא ב-/precedents. דורש `GEMINI_API_KEY` ב-`~/.env`. התקנה: `pm2 start scripts/legal-metadata-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | | `auto-sync-cases.sh` | bash | סנכרון תיקי ערר ל-Gitea — רץ כל דקה | `* * * * *` (cron) | | `backup-db.sh` | bash | גיבוי PostgreSQL יומי ל-`data/backups/` (gzip) | לתזמן: `0 2 * * *` | | `restore-db.sh` | bash | שחזור DB מגיבוי (companion ל-backup-db.sh) | ידני | diff --git a/scripts/drain_metadata_queue.py b/scripts/drain_metadata_queue.py new file mode 100644 index 0000000..bd836f4 --- /dev/null +++ b/scripts/drain_metadata_queue.py @@ -0,0 +1,48 @@ +"""Drain the precedent metadata-extraction queue. + +Calls ``process_pending_extractions(kind='metadata')`` in batches until the +queue is empty (two consecutive zero-progress rounds). Metadata extraction runs +on **Gemini Flash** (structured JSON) — fast and reliable, unlike the agentic +claude CLI which hit ``error_max_turns`` on this bounded task. A no-op (fast) +when the queue is empty. + +Host-only (reads GEMINI_API_KEY + POSTGRES_URL from ~/.env via legal_mcp.config). +Scheduled by ``legal-metadata-drain`` (pm2 cron); also runnable by hand: + + mcp-server/.venv/bin/python scripts/drain_metadata_queue.py [batch] +""" + +import asyncio +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) + +from legal_mcp.services import precedent_library as pl + + +async def main() -> int: + batch = int(sys.argv[1]) if len(sys.argv) > 1 else 10 + total = 0 + empty_rounds = 0 + rnd = 0 + while empty_rounds < 2: + rnd += 1 + out = await pl.process_pending_extractions(kind="metadata", limit=batch) + processed = out.get("processed", 0) + total += processed + print(f"[round {rnd}] processed={processed} total_pending={out.get('total_pending', 0)} " + f"status={out.get('status')}", flush=True) + for r in out.get("results", []): + print(f" {str(r.get('case_number',''))[:42]}: {r.get('status')}", flush=True) + if processed == 0: + empty_rounds += 1 + await asyncio.sleep(3) + else: + empty_rounds = 0 + print(f"===DONE=== metadata extracted (cumulative cases handled={total})", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/scripts/legal-metadata-drain.config.cjs b/scripts/legal-metadata-drain.config.cjs new file mode 100644 index 0000000..55f351a --- /dev/null +++ b/scripts/legal-metadata-drain.config.cjs @@ -0,0 +1,34 @@ +/** + * pm2 ecosystem entry for legal-metadata-drain — scheduled (every 15 min) drain + * of the precedent metadata-extraction queue (Gemini Flash). Keeps the + * /precedents metadata queue from clogging (the prior agentic claude-CLI path + * hit error_max_turns and nothing drained it autonomously). + * + * Pattern: cron_restart fires the script on schedule; autorestart:false → runs + * once and exits (pm2 shows "stopped" between ticks — expected). Cheap no-op + * when the queue is empty; Gemini Flash ≈ $0.10/1M tokens. + * + * Requires (host ~/.env via legal_mcp.config): GEMINI_API_KEY, POSTGRES_URL. + * + * Install (once): + * pm2 start /home/chaim/legal-ai/scripts/legal-metadata-drain.config.cjs + * pm2 save + * Run now (manual): mcp-server/.venv/bin/python scripts/drain_metadata_queue.py + * Schedule override: METADATA_DRAIN_CRON (default every 15 min). + */ +const cron = process.env.METADATA_DRAIN_CRON || "*/15 * * * *"; + +module.exports = { + apps: [ + { + name: "legal-metadata-drain", + cwd: "/home/chaim/legal-ai", + script: "/home/chaim/legal-ai/mcp-server/.venv/bin/python", + args: "scripts/drain_metadata_queue.py 10", + env: { HOME: "/home/chaim", PYTHONUNBUFFERED: "1" }, + autorestart: false, // one-shot per cron tick + cron_restart: cron, + max_memory_restart: "500M", + }, + ], +};