Merge pull request 'feat(extraction): חילוץ-מטא של פסיקה דרך Gemini Flash + drainer מתוזמן' (#138) from worktree-gemini-metadata into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m30s

This commit was merged in pull request #138.
This commit is contained in:
2026-06-08 05:14:29 +00:00
7 changed files with 202 additions and 9 deletions

View File

@@ -92,12 +92,14 @@ NCSC/JTC — *AI in Courts* (verifiable citation) | סטטוס: verified
**אכיפה:** `proofreader.verify_quote` בעת חילוץ → `quote_verified`. **אכיפה:** `proofreader.verify_quote` בעת חילוץ → `quote_verified`.
**הפרה ידועה:** — (קיים; ה-flag נכתב, אך אין חיווי ב-UI — ראה [X6 INV-UI6](X6-ui-api-contract.md)). **הפרה ידועה:** — (קיים; ה-flag נכתב, אך אין חיווי ב-UI — ראה [X6 INV-UI6](X6-ui-api-contract.md)).
### INV-FP5: חילוץ אסינכרוני דרך claude_session מקומי ### INV-FP5: חילוץ אסינכרוני, מתור, צד-מארח (לא מהקונטיינר)
**כלל:** חילוץ-LLM (מטא, הלכות) רץ **אסינכרוני, מתור**, דרך `claude_session` **מקומי בלבד** — לא חוסם את **כלל:** חילוץ-LLM (מטא, הלכות) רץ **אסינכרוני, מתור, מצד-המארח** — לא חוסם את ה-web ולא קורא ל-LLM
ה-web, ולא קורא ל-LLM מהקונטיינר. מופע של [G2](00-constitution.md#inv-g2-מקור-אמת-יחיד--אין-מסלולים-מקבילים-מתפצלים) מהקונטיינר. **בחירת-מנוע לפי אופי-המשימה (לא מסלול מקביל):** חילוץ-מטא הוא משימה *תחומה* (טקסט→JSON)
(מסלול-LLM קנוני יחיד). **פרויקטלי-תפעולי.** תואם זיכרון `feedback_claude_session_local_only`. ולכן רץ על **Gemini Flash** (`gemini_session`, structured JSON) — ה-claude CLI ה-agentic פגע ב-
**מקור-סמכות:** [ingest.py](../../mcp-server/src/legal_mcp/services/ingest.py) (queue בצעד 12 → `process_pending_extractions`); [legal-ai/CLAUDE.md](../../CLAUDE.md) (claude_session local-only). `error_max_turns`; חילוץ-הלכות (רגיש-קול/agentic) נשאר על **`claude_session`** (CLI מקומי, מנוי דפנה).
**אכיפה:** queue + `precedent_process_pending`; קריאות-LLM רק מ-MCP מקומי. שני המנועים מתנקזים לתור-החילוץ הקנוני היחיד ([G2](00-constitution.md#inv-g2-מקור-אמת-יחיד--אין-מסלולים-מקבילים-מתפצלים)). **פרויקטלי-תפעולי.**
**מקור-סמכות:** [ingest.py](../../mcp-server/src/legal_mcp/services/ingest.py) (queue → `process_pending_extractions`); [gemini_session.py](../../mcp-server/src/legal_mcp/services/gemini_session.py) (מטא); [legal-ai/CLAUDE.md](../../CLAUDE.md) (claude_session local-only להלכות). `GEMINI_API_KEY` בצד-המארח בלבד — לא בקונטיינר (תואם `feedback_claude_session_local_only`: אין קריאות-LLM מהקונטיינר).
**אכיפה:** queue + `precedent_process_pending` + drainers מתוזמנים (`legal-metadata-drain`/CEO); קריאות-LLM רק מצד-המארח.
**הפרה ידועה:** תור-החילוץ **סמוי** (אין הבחנה pending-initial מול pending-review; אין extraction-job table) ([gap-audit GAP-45](gap-audit.md); [X9](X9-mcp-tool-contract.md)). **הפרה ידועה:** תור-החילוץ **סמוי** (אין הבחנה pending-initial מול pending-review; אין extraction-job table) ([gap-audit GAP-45](gap-audit.md); [X9](X9-mcp-tool-contract.md)).
--- ---

View File

@@ -0,0 +1,97 @@
"""Gemini structured-output helper — a drop-in for ``claude_session.query_json``
for BOUNDED extraction tasks (text → JSON).
Why a second LLM path: metadata extraction is a single structured call (fill
case_name/summary/headnote/tags from a verdict's text), not an agentic loop. The
``claude -p`` CLI behind ``claude_session`` is agentic — it reaches for tools and
hits ``error_max_turns`` on a task that should be one shot — so it was slow and
flaky for the precedent metadata queue. Gemini Flash with JSON mode
(``responseMimeType: application/json``) is the right tool: one call, schema-
clean JSON, fast, and ~$0.10/1M tokens (negligible for this volume).
Scope: **bounded extraction only** (precedent metadata). The agentic, voice-
sensitive work — decision writing, analysis, halacha extraction — stays on
``claude_session`` (Daphna's subscription, zero API cost). This is a deliberate
per-task provider choice, not a wholesale move off Claude.
Key: ``GEMINI_API_KEY`` (host ~/.env; SoT Infisical nautilus:/external-apis/gemini
as ``GOOGLE_GEMINI_API_KEY``). Model: ``GEMINI_MODEL`` (default gemini-2.5-flash).
Direct REST via httpx — no extra SDK dependency.
"""
from __future__ import annotations
import json
import logging
import os
import httpx
logger = logging.getLogger(__name__)
_BASE = "https://generativelanguage.googleapis.com/v1beta"
_DEFAULT_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
_DEFAULT_TIMEOUT = float(os.environ.get("GEMINI_TIMEOUT_S", "120"))
class GeminiError(RuntimeError):
"""Gemini API call failed or returned an unexpected shape."""
def _api_key() -> str:
key = os.environ.get("GEMINI_API_KEY", "").strip()
if not key:
raise GeminiError(
"GEMINI_API_KEY אינו מוגדר (host ~/.env / Infisical "
"nautilus:/external-apis/gemini)."
)
return key
async def query_json(
prompt: str,
timeout: float | int = _DEFAULT_TIMEOUT,
*,
system: str | None = None,
model: str | None = None,
# Accepted for drop-in parity with claude_session.query_json; ignored here.
effort: str | None = None,
tools: str | None = None,
) -> dict | list | None:
"""Single structured-output call → parsed JSON. Drop-in for
``claude_session.query_json``. Raises ``GeminiError`` on failure (the caller
treats that like any extraction failure — recorded, never silently wrong).
"""
model = model or _DEFAULT_MODEL
body: dict = {
"contents": [{"role": "user", "parts": [{"text": prompt}]}],
"generationConfig": {
"responseMimeType": "application/json",
"temperature": 0,
},
}
if system:
body["system_instruction"] = {"parts": [{"text": system}]}
url = f"{_BASE}/models/{model}:generateContent"
try:
async with httpx.AsyncClient(timeout=float(timeout)) as client:
resp = await client.post(url, params={"key": _api_key()}, json=body)
except httpx.HTTPError as e:
raise GeminiError(f"Gemini request failed: {e}") from e
if resp.status_code != 200:
raise GeminiError(f"Gemini HTTP {resp.status_code}: {resp.text[:200]}")
data = resp.json()
# Surface an explicit safety/finish block rather than returning empty.
cand = (data.get("candidates") or [{}])[0]
if cand.get("finishReason") in ("SAFETY", "RECITATION", "PROHIBITED_CONTENT"):
raise GeminiError(f"Gemini blocked output: finishReason={cand['finishReason']}")
try:
text = cand["content"]["parts"][0]["text"]
except (KeyError, IndexError, TypeError) as e:
raise GeminiError(f"Gemini unexpected response: {str(data)[:200]}") from e
try:
return json.loads(text)
except json.JSONDecodeError as e:
raise GeminiError(f"Gemini returned non-JSON: {text[:200]}") from e

View File

@@ -15,6 +15,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os
from pathlib import Path from pathlib import Path
from typing import Awaitable, Callable from typing import Awaitable, Callable
from uuid import UUID from uuid import UUID
@@ -179,6 +180,9 @@ async def reextract_halachot(
# precedent into a 429 storm. Observed 2026-05-03: 1110/20 succeeded with 9 # precedent into a 429 storm. Observed 2026-05-03: 1110/20 succeeded with 9
# halachot, 317/10 immediately after returned silent no_halachot. # halachot, 317/10 immediately after returned silent no_halachot.
INTER_PRECEDENT_COOLDOWN_SEC = 30 INTER_PRECEDENT_COOLDOWN_SEC = 30
# Metadata extraction is on Gemini (fast, high rate limits) — a brief spacer is
# enough; the 30s above is for the Claude-backed halacha path.
METADATA_COOLDOWN_SEC = float(os.environ.get("METADATA_COOLDOWN_SEC", "2"))
# How many times to retry a precedent that came back as 'extraction_failed' # How many times to retry a precedent that came back as 'extraction_failed'
# (i.e. >50% chunks crashed). Each retry uses a longer cooldown. # (i.e. >50% chunks crashed). Each retry uses a longer cooldown.
@@ -226,11 +230,14 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) -
cid, effort=config.HALACHA_BULK_EXTRACT_EFFORT, cid, effort=config.HALACHA_BULK_EXTRACT_EFFORT,
) )
# Metadata extraction runs on Gemini (high rate limits, fast) — the long
# cooldown is only needed for halacha (Claude/Anthropic rate limits).
cooldown = METADATA_COOLDOWN_SEC if kind == "metadata" else INTER_PRECEDENT_COOLDOWN_SEC
results: list[dict] = [] results: list[dict] = []
processed = 0 processed = 0
for idx, row in enumerate(pending): for idx, row in enumerate(pending):
if idx > 0: if idx > 0:
await asyncio.sleep(INTER_PRECEDENT_COOLDOWN_SEC) await asyncio.sleep(cooldown)
cid = UUID(str(row["id"])) cid = UUID(str(row["id"]))
attempts = 0 attempts = 0
result: dict = {} result: dict = {}

View File

@@ -19,7 +19,7 @@ from datetime import date as date_type
from uuid import UUID from uuid import UUID
from legal_mcp.config import parse_llm_json from legal_mcp.config import parse_llm_json
from legal_mcp.services import claude_session, db from legal_mcp.services import db, gemini_session
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -150,7 +150,10 @@ async def extract_metadata(case_law_id: UUID | str) -> dict:
) )
try: try:
result = await claude_session.query_json( # Bounded structured extraction → Gemini Flash (JSON mode). The agentic
# claude CLI hit error_max_turns on this single-shot task; see
# gemini_session.py. Voice-sensitive/agentic work stays on claude_session.
result = await gemini_session.query_json(
user_msg, system=METADATA_EXTRACTION_PROMPT, user_msg, system=METADATA_EXTRACTION_PROMPT,
) )
except Exception as e: except Exception as e:

View File

@@ -24,6 +24,8 @@
| `legal-reaper.config.cjs` | pm2/js | **דמון pm2 ל-`reap_orphan_procs.py --loop`** (ברירת-מחדל 180ש', `REAP_INTERVAL_S` לעקיפה). `max_memory_restart 100M` (ה-reaper עצמו לא ידלוף). התקנה: `pm2 start scripts/legal-reaper.config.cjs && pm2 save`. לוגים: `pm2 logs legal-reaper`. | pm2 (host-side) | | `legal-reaper.config.cjs` | pm2/js | **דמון pm2 ל-`reap_orphan_procs.py --loop`** (ברירת-מחדל 180ש', `REAP_INTERVAL_S` לעקיפה). `max_memory_restart 100M` (ה-reaper עצמו לא ידלוף). התקנה: `pm2 start scripts/legal-reaper.config.cjs && pm2 save`. לוגים: `pm2 logs legal-reaper`. | pm2 (host-side) |
| `drain_court_fetch.py` | python | **ריקון תור-אחזור הפסיקה (X13)** — קורא ל-`court_fetch_orchestrator.drain_pending(limit)` שמוריד+קולט כל job ממתין שהיומונים מילאו, וקושר חזרה ליומון. מקומי בלבד (ingest = claude CLI). no-op מהיר כשהתור ריק. הרצה ידנית: `mcp-server/.venv/bin/python scripts/drain_court_fetch.py [limit]`. | דרך `legal-court-fetch-drain.config.cjs` (pm2 cron) | | `drain_court_fetch.py` | python | **ריקון תור-אחזור הפסיקה (X13)** — קורא ל-`court_fetch_orchestrator.drain_pending(limit)` שמוריד+קולט כל job ממתין שהיומונים מילאו, וקושר חזרה ליומון. מקומי בלבד (ingest = claude CLI). no-op מהיר כשהתור ריק. הרצה ידנית: `mcp-server/.venv/bin/python scripts/drain_court_fetch.py [limit]`. | דרך `legal-court-fetch-drain.config.cjs` (pm2 cron) |
| `legal-court-fetch-drain.config.cjs` | pm2/js | **תזמון שעתי של `drain_court_fetch.py`** (cron `17 * * * *`, `COURT_FETCH_DRAIN_CRON` לעקיפה) — הופך את לולאת יומון→אחזור→קליטה ל-fully-autonomous. `autorestart:false` (one-shot per tick). דורש `legal-court-fetch-service` רץ. התקנה: `pm2 start scripts/legal-court-fetch-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | | `legal-court-fetch-drain.config.cjs` | pm2/js | **תזמון שעתי של `drain_court_fetch.py`** (cron `17 * * * *`, `COURT_FETCH_DRAIN_CRON` לעקיפה) — הופך את לולאת יומון→אחזור→קליטה ל-fully-autonomous. `autorestart:false` (one-shot per tick). דורש `legal-court-fetch-service` רץ. התקנה: `pm2 start scripts/legal-court-fetch-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
| `drain_metadata_queue.py` | python | **ריקון תור חילוץ-המטא של הפסיקה**`process_pending_extractions(kind='metadata')` ב-batches עד ריק. רץ על **Gemini Flash** (structured JSON, `gemini_session`) — מהיר ואמין, במקום ה-claude CLI ה-agentic שפגע ב-`error_max_turns`. no-op מהיר כשריק. הרצה ידנית: `mcp-server/.venv/bin/python scripts/drain_metadata_queue.py [batch]`. | דרך `legal-metadata-drain.config.cjs` (pm2 cron) |
| `legal-metadata-drain.config.cjs` | pm2/js | **תזמון כל 15 דק' של `drain_metadata_queue.py`** (cron `*/15 * * * *`, `METADATA_DRAIN_CRON` לעקיפה) — מונע סתימה של תור חילוץ-המטא ב-/precedents. דורש `GEMINI_API_KEY` ב-`~/.env`. התקנה: `pm2 start scripts/legal-metadata-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
| `auto-sync-cases.sh` | bash | סנכרון תיקי ערר ל-Gitea — רץ כל דקה | `* * * * *` (cron) | | `auto-sync-cases.sh` | bash | סנכרון תיקי ערר ל-Gitea — רץ כל דקה | `* * * * *` (cron) |
| `backup-db.sh` | bash | גיבוי PostgreSQL יומי ל-`data/backups/` (gzip) | לתזמן: `0 2 * * *` | | `backup-db.sh` | bash | גיבוי PostgreSQL יומי ל-`data/backups/` (gzip) | לתזמן: `0 2 * * *` |
| `restore-db.sh` | bash | שחזור DB מגיבוי (companion ל-backup-db.sh) | ידני | | `restore-db.sh` | bash | שחזור DB מגיבוי (companion ל-backup-db.sh) | ידני |

View File

@@ -0,0 +1,48 @@
"""Drain the precedent metadata-extraction queue.
Calls ``process_pending_extractions(kind='metadata')`` in batches until the
queue is empty (two consecutive zero-progress rounds). Metadata extraction runs
on **Gemini Flash** (structured JSON) — fast and reliable, unlike the agentic
claude CLI which hit ``error_max_turns`` on this bounded task. A no-op (fast)
when the queue is empty.
Host-only (reads GEMINI_API_KEY + POSTGRES_URL from ~/.env via legal_mcp.config).
Scheduled by ``legal-metadata-drain`` (pm2 cron); also runnable by hand:
mcp-server/.venv/bin/python scripts/drain_metadata_queue.py [batch]
"""
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import precedent_library as pl
async def main() -> int:
batch = int(sys.argv[1]) if len(sys.argv) > 1 else 10
total = 0
empty_rounds = 0
rnd = 0
while empty_rounds < 2:
rnd += 1
out = await pl.process_pending_extractions(kind="metadata", limit=batch)
processed = out.get("processed", 0)
total += processed
print(f"[round {rnd}] processed={processed} total_pending={out.get('total_pending', 0)} "
f"status={out.get('status')}", flush=True)
for r in out.get("results", []):
print(f" {str(r.get('case_number',''))[:42]}: {r.get('status')}", flush=True)
if processed == 0:
empty_rounds += 1
await asyncio.sleep(3)
else:
empty_rounds = 0
print(f"===DONE=== metadata extracted (cumulative cases handled={total})", flush=True)
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))

View File

@@ -0,0 +1,34 @@
/**
* pm2 ecosystem entry for legal-metadata-drain — scheduled (every 15 min) drain
* of the precedent metadata-extraction queue (Gemini Flash). Keeps the
* /precedents metadata queue from clogging (the prior agentic claude-CLI path
* hit error_max_turns and nothing drained it autonomously).
*
* Pattern: cron_restart fires the script on schedule; autorestart:false → runs
* once and exits (pm2 shows "stopped" between ticks — expected). Cheap no-op
* when the queue is empty; Gemini Flash ≈ $0.10/1M tokens.
*
* Requires (host ~/.env via legal_mcp.config): GEMINI_API_KEY, POSTGRES_URL.
*
* Install (once):
* pm2 start /home/chaim/legal-ai/scripts/legal-metadata-drain.config.cjs
* pm2 save
* Run now (manual): mcp-server/.venv/bin/python scripts/drain_metadata_queue.py
* Schedule override: METADATA_DRAIN_CRON (default every 15 min).
*/
const cron = process.env.METADATA_DRAIN_CRON || "*/15 * * * *";
module.exports = {
apps: [
{
name: "legal-metadata-drain",
cwd: "/home/chaim/legal-ai",
script: "/home/chaim/legal-ai/mcp-server/.venv/bin/python",
args: "scripts/drain_metadata_queue.py 10",
env: { HOME: "/home/chaim", PYTHONUNBUFFERED: "1" },
autorestart: false, // one-shot per cron tick
cron_restart: cron,
max_memory_restart: "500M",
},
],
};