feat: Stage C — RAG advanced (#33, #47, #48, #49, #50, #51)
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m35s

Six independent sub-tasks dispatched in parallel; aggregated here.

## #33 — Hide case_name column
library-list-panel.tsx: `<TableHead>` + `<TableCell>` for "שם"
get `className="hidden"` in both Court and Committee row variants.
DB column preserved for future use.

## #47 — Audit script periodic
New scripts/audit_corpus_integrity.py — 3 SQL checks (external+ערר
prefix, internal missing chair/district, cases.practice_area enum)
+ CEO wakeup on violations + cron `0 7 * * *`. First run: 0 issues.

## #48 — Parent-doc retrieval (gated, default off)
Schema V17: precedent_chunks.parent_chunk_id + chunk_role
('child'|'parent'). New chunker.chunk_document_hierarchical() —
section-aware parents (~1500 tokens) containing ~5 overlapping
children (~300 tokens each). New db.store_precedent_chunks_hierarchical
two-pass writer. Search SQL (semantic + lexical) LEFT-JOIN parent and
swap content + dedupe by parent_chunk_id when flag on. Toggle:
PARENT_DOC_RETRIEVAL_ENABLED + PARENT_DOC_{CHILD,PARENT}_SIZE_TOKENS.
Backfill ~3min and ~$0.20 — deferred to follow-up.

## #49 — Multimodal backfill
New scripts/backfill_multimodal_precedents.py with token-matching
case_number ↔ source files (PDF + DOCX via PyMuPDF). Ran in container:
26 precedents embedded, 503 pages, $0.21, 0 errors. precedent_image_embeddings
grew 3 → 29 rows. 44 remaining are style_corpus-migrated rows (no
source file on disk) — will catch up when re-uploaded.

## #50 — Closed-loop feedback + nDCG
Schema V18: search_logs + search_relevance_feedback. New telemetry.py
with fire-and-forget log_search_bg (p50 = 0.002ms — zero overhead) +
auto-infer_relevance_from_citations (reads case drafts → marks score=3
when cited precedent appears in past search top-K). Hooks added to 5
search paths. scripts/compute_ndcg.py for aggregation. Two admin API
endpoints (GET /api/admin/rag-metrics + POST .../infer). Dashboard UI
deferred — API is enough for now.

## #51 — Halacha quality monitoring
New scripts/monitor_halacha_quality.py — baseline avg confidence
(trusted=0.849, all=0.833, pending=0.694) with rolling window drift
detection. Default 5% threshold. Exits non-zero on alert for cron
integration. Recommended: `0 8 * * 1` weekly Mon 8am.

## Bonus: 230 unlinked citations → missing_precedents
Bulk-imported 230 distinct unlinked citations from
precedent_internal_citations to missing_precedents.status='open',
party='committee', with notes listing source citers. Top candidate:
ע"א 3213/97 (cited 5x). Total open missing_precedents now 237.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-26 11:26:52 +00:00
parent 3a05e30c8d
commit 2aee398b4a
15 changed files with 2493 additions and 57 deletions

View File

@@ -28,9 +28,13 @@
| `voyage_rerank_corpus_poc.py` | python | POC #5 — voyage-3 vs rerank-2 על קורפוס מלא (785 docs). הכרעה: +4.5% mean@3 כללי, +11.6% על P queries (practical) | בנצ'מרק חד-פעמי, אישר את שלב B |
| `multimodal_backfill.py` | python | Backfill voyage-multimodal-3 page embeddings על מסמכי תיקים קיימים. idempotent (skips by default), forces `MULTIMODAL_ENABLED=true` ל-run, רץ מהקונטיינר. שלב C — ראה `docs/voyage-upgrades-plan.md` | ידני per-case (`python multimodal_backfill.py 8174-24 8137-24`) |
| `backfill_chunk_pages.py` | python | Backfill `page_number` ב-`document_chunks` קיימים. legacy chunker לא tracked עמודים → `page_number=NULL` חוסם boost של multimodal hybrid (text+image join על אותו עמוד). re-extracts כל PDF (re-OCR אם צריך, ~$0.0015/page), מחשב page_offsets, ומעדכן chunks. idempotent | ידני per-case (`python backfill_chunk_pages.py 8174-24 8137-24`) |
| `audit_corpus_integrity.py` | python | בדיקה תקופתית של עקביות הקורפוס — 3 בדיקות SQL read-only על `case_law` ו-`cases`: (A) `external_upload` עם prefix פנימי `ערר`/`בל"מ`; (B) `internal_committee` חסר `chair_name`/`district`; (C) `cases.practice_area` מחוץ ל-{`rishuy_uvniya`, `betterment_levy`, `compensation_197`, `''`}. כותב log מצטבר ל-`data/logs/corpus_integrity_audit.log` ובמצב הפרות שולח wakeup ל-CEO ב-Paperclip (best-effort, רק אם `PAPERCLIP_API_URL`+`PAPERCLIP_API_KEY` מוגדרים). דגל: `--no-notify`. Idempotent, יוצא 0. **Cron יומי 07:00**: `0 7 * * * /home/chaim/legal-ai/mcp-server/.venv/bin/python /home/chaim/legal-ai/scripts/audit_corpus_integrity.py` | `0 7 * * *` (cron) |
| `backfill_legal_arguments.py` | python | Backfill `legal_arguments` לתיקים עם `claims` קיימים (TaskMaster #36). מקבץ פרופוזיציות גולמיות לטיעונים משפטיים מובחנים (~6-12 לכל צד) דרך `argument_aggregator.aggregate_claims_to_arguments` (Claude CLI). תומך `--dry-run`/`--apply`/`--force`/`--case <num>...`. **חייב לרוץ מהמכונה המקומית** (לא קונטיינר) — `claude_session` דורש Claude CLI | ידני per-case (`python scripts/backfill_legal_arguments.py --apply --case 1017-03-26`) |
| `upload_blam_decisions.py` | python | חד-פעמי (2026-05-26) — העלאת 2 החלטות בל"מ ל-`case_law` (8126/24 סופר נוח, 8047/23 הרנון) דרך `ingest_internal_decision` ישיר, עוקף MCP server שטרם נטען מחדש אחרי הוספת `proceeding_type`. **לא להריץ שוב** | חד-פעמי — להעביר ל-`.archive/` בהזדמנות |
| `process_pending_blam.py` | python | חד-פעמי (2026-05-26) — הרצת metadata + halacha extraction על 2 החלטות בל"מ שעלו ב-`upload_blam_decisions.py`. עוקף MCP (אותו טעם). **לא להריץ שוב** | חד-פעמי — להעביר ל-`.archive/` בהזדמנות |
| `compute_ndcg.py` | python | חישוב nDCG@10 על `search_relevance_feedback` (TaskMaster #50, Stage C). aggregation לפי `search_type` ולפי שבוע, כולל top-cited case_law ו-coverage %. דגלים: `--k 10`, `--weeks 12`, `--pretty`. read-only, פלט JSON. משמש גם את `GET /api/admin/rag-metrics` (מיובא inline) — שינוי חתימה ב-`compute()` ישבור את ה-endpoint | ידני / cron עתידי לדיווח שבועי |
| `backfill_multimodal_precedents.py` | python | Backfill voyage-multimodal-3 page embeddings על רשומות `case_law` (external_upload + internal_committee) שחסרות `precedent_image_embeddings`. בונה אינדקס קבצים מ-`data/precedent-library/` ו-`data/internal-decisions/`, מנסה התאמה לפי tokens של מספרי תיק (כולל parts-match לפורמטים שונים של Nevo doc-id). מדלג על רשומות בלי קובץ-מקור או עם MD בלבד (PyMuPDF לא מרנדר MD). תומך `--dry-run` (default) / `--apply` / `--only external_upload\|internal_committee` / `--limit N`. רץ בקונטיינר (יש `/data` + Voyage env). **הופעל 2026-05-26**: 70 חסרים → 26 backfilled (503 pages, ~$0.21 voyage tokens), 44 אין-קובץ-מקור. ניתן להריץ שוב אחרי שיועלו עוד PDF/DOCX לספרייה | ידני |
| `monitor_halacha_quality.py` | python | מנטר איכות חילוץ הלכות. בודק drift של `avg(confidence)` בין baseline היסטורי לחלון אחרון. מחזיר JSON מטריקות + alert ב-stderr אם drift > threshold (ברירת מחדל 5%). 2 סדרות: trusted (approved+published) ו-all_extracted. תומך `--window N` / `--threshold X` / `--min-sample N` / `--silent` / `--exit-on-alert`. רץ ב-container או מקומית עם `mcp-server/.venv` (אין תלות ב-LLM, רק SQL). **תזמון מומלץ**: `0 8 * * 1` (יום ראשון 08:00, שבועי) | `0 8 * * 1` (לתזמן) |
## תיקיית `.archive/` — סקריפטים שהושלמו

View File

@@ -0,0 +1,281 @@
"""Periodic corpus-integrity audit.
Runs a set of read-only SQL checks against the legal-ai DB to detect rows
that violate domain constraints which are *not* enforced by the schema
(or were added after the constraint was put in place).
Checks performed:
A. ``case_law`` rows with ``source_kind='external_upload'`` whose
``case_number`` starts with the Hebrew prefixes ``ערר`` / ``בל"מ``.
Internal committee decisions belong to ``source_kind='internal_committee'``.
B. ``case_law`` rows with ``source_kind='internal_committee'`` that
lack a ``chair_name`` and/or ``district``. Internal decisions must
carry both.
C. ``cases`` rows with a ``practice_area`` outside the closed set
{``rishuy_uvniya``, ``betterment_levy``, ``compensation_197``, ``''``}.
Output:
* Appends a timestamped block to ``data/logs/corpus_integrity_audit.log``.
* If hits are found AND env ``PAPERCLIP_API_URL`` + ``PAPERCLIP_API_KEY``
are set, posts a CEO wakeup comment via ``POST /api/agents/{ceo}/wakeup``
(best-effort, never fails the script).
* Always exits 0 unless an unexpected error occurs (so cron stays quiet).
Cron suggestion (daily 07:00):
0 7 * * * /home/chaim/legal-ai/mcp-server/.venv/bin/python \\
/home/chaim/legal-ai/scripts/audit_corpus_integrity.py
Idempotent. Read-only on the DB.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
# Load ~/.env so POSTGRES_* / PAPERCLIP_* are picked up when run from cron.
ENV_PATH = os.path.expanduser("~/.env")
if os.path.isfile(ENV_PATH):
with open(ENV_PATH, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k, v)
import asyncpg # noqa: E402
try:
import httpx # noqa: E402
except ImportError: # httpx is part of the legal-ai venv; not required for DB checks
httpx = None # type: ignore[assignment]
REPO_ROOT = Path(__file__).resolve().parent.parent
LOG_PATH = REPO_ROOT / "data" / "logs" / "corpus_integrity_audit.log"
CHECK_A_SQL = (
"SELECT id, case_number FROM case_law "
"WHERE source_kind = 'external_upload' AND case_number ~ '^ערר|^בל\"מ' "
"ORDER BY case_number"
)
CHECK_B_SQL = (
"SELECT id, case_number, chair_name, district FROM case_law "
"WHERE source_kind = 'internal_committee' "
"AND (chair_name IS NULL OR chair_name = '' "
" OR district IS NULL OR district = '') "
"ORDER BY case_number"
)
CHECK_C_SQL = (
"SELECT id, case_number, practice_area FROM cases "
"WHERE practice_area IS NOT NULL "
"AND practice_area NOT IN ('rishuy_uvniya', 'betterment_levy', "
" 'compensation_197', '') "
"ORDER BY case_number"
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("audit_corpus_integrity")
def _pg_url() -> str:
"""Resolve POSTGRES URL from env, falling back to discrete vars."""
url = os.environ.get("POSTGRES_URL")
if url:
return url
pg_host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
pg_port = int(os.environ.get("POSTGRES_PORT", "5433"))
pg_user = os.environ.get("POSTGRES_USER", "legal_ai")
pg_pw = os.environ.get("POSTGRES_PASSWORD", "")
pg_db = os.environ.get("POSTGRES_DB", "legal_ai")
if not pg_pw:
raise SystemExit("POSTGRES_PASSWORD / POSTGRES_URL not set")
return f"postgres://{pg_user}:{pg_pw}@{pg_host}:{pg_port}/{pg_db}"
async def _run_check(conn: asyncpg.Connection, sql: str) -> list[dict]:
rows = await conn.fetch(sql)
return [dict(r) for r in rows]
async def _resolve_ceo_agent_id() -> str | None:
"""Best-effort: look up the CEO agent UUID for CMP via the API.
Returns None if PAPERCLIP env is missing or the lookup fails.
"""
base_url = os.environ.get("PAPERCLIP_API_URL")
api_key = os.environ.get("PAPERCLIP_API_KEY")
if not (base_url and api_key and httpx is not None):
return None
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
f"{base_url}/api/agents",
headers={"Authorization": f"Bearer {api_key}"},
)
r.raise_for_status()
payload = r.json()
items = payload if isinstance(payload, list) else payload.get("items", [])
for item in items:
# Look for a CMP-side CEO (master); the CMPA mirror has a different id.
title = (item.get("title") or "").lower()
role = (item.get("role") or "").lower()
if "ceo" in title or "ceo" in role or "מנכ" in title:
return item.get("id")
except Exception as e:
logger.warning("CEO lookup failed: %s", e)
return None
async def _notify_ceo(summary: str) -> bool:
"""Post a wakeup comment to the CEO agent. Returns True on best-effort success."""
base_url = os.environ.get("PAPERCLIP_API_URL")
api_key = os.environ.get("PAPERCLIP_API_KEY")
if not (base_url and api_key and httpx is not None):
logger.info("Paperclip env not set — skipping CEO wakeup")
return False
ceo_id = await _resolve_ceo_agent_id()
if not ceo_id:
logger.info("Could not resolve CEO agent id — skipping wakeup")
return False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.post(
f"{base_url}/api/agents/{ceo_id}/wakeup",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"source": "automation",
"triggerDetail": "audit_corpus_integrity",
"reason": "corpus integrity audit found violations",
"payload": {"summary": summary},
},
)
r.raise_for_status()
logger.info("Notified CEO (agent_id=%s)", ceo_id)
return True
except Exception as e:
logger.warning("CEO wakeup failed: %s", e)
return False
def _format_report(
a_hits: list[dict],
b_hits: list[dict],
c_hits: list[dict],
ts: datetime,
) -> str:
parts: list[str] = []
parts.append(f"=== Corpus integrity audit @ {ts.isoformat()} ===")
parts.append("")
parts.append(
f"Check A (case_law external_upload with internal-style "
f"case_number prefix): {len(a_hits)} hit(s)"
)
for row in a_hits[:50]:
parts.append(f" - id={row['id']} case_number={row['case_number']!r}")
if len(a_hits) > 50:
parts.append(f" ... ({len(a_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check B (case_law internal_committee missing chair_name/district): "
f"{len(b_hits)} hit(s)"
)
for row in b_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"chair_name={row.get('chair_name')!r} district={row.get('district')!r}"
)
if len(b_hits) > 50:
parts.append(f" ... ({len(b_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check C (cases.practice_area outside closed set): {len(c_hits)} hit(s)"
)
for row in c_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"practice_area={row.get('practice_area')!r}"
)
if len(c_hits) > 50:
parts.append(f" ... ({len(c_hits) - 50} more truncated)")
parts.append("")
return "\n".join(parts)
async def main(args: argparse.Namespace) -> int:
pg_url = _pg_url()
conn = await asyncpg.connect(pg_url)
try:
a_hits = await _run_check(conn, CHECK_A_SQL)
b_hits = await _run_check(conn, CHECK_B_SQL)
c_hits = await _run_check(conn, CHECK_C_SQL)
finally:
await conn.close()
total = len(a_hits) + len(b_hits) + len(c_hits)
ts = datetime.now(timezone.utc)
report = _format_report(a_hits, b_hits, c_hits, ts)
# Always write to log (creates dir + file if missing).
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(report)
f.write("\n")
# Echo to stdout so cron mail / manual run shows the result.
print(report)
if total == 0:
logger.info("clean: no integrity violations found")
return 0
logger.warning(
"found %d total violation(s) (A=%d, B=%d, C=%d)",
total, len(a_hits), len(b_hits), len(c_hits),
)
if args.notify:
summary_lines = [
"ה-audit היומי על הקורפוס מצא הפרות:",
f"- Check A (external_upload עם prefix פנימי): {len(a_hits)}",
f"- Check B (internal_committee חסר chair/district): {len(b_hits)}",
f"- Check C (cases.practice_area לא תקין): {len(c_hits)}",
"",
f"פירוט מלא: {LOG_PATH}",
]
await _notify_ceo("\n".join(summary_lines))
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--no-notify",
dest="notify",
action="store_false",
help="Don't post a CEO wakeup even if hits are found",
)
parser.set_defaults(notify=True)
args = parser.parse_args()
try:
rc = asyncio.run(main(args))
except KeyboardInterrupt:
sys.exit(130)
sys.exit(rc)

View File

@@ -0,0 +1,475 @@
"""Multimodal backfill for precedent library — fills voyage-multimodal-3
page embeddings for case_law rows (external_upload + internal_committee)
that don't have them yet.
Background
----------
77 (in practice 70 today, 2026-05-26) case_law rows were ingested before
``MULTIMODAL_ENABLED=true`` was permanently turned on, so they only have
text chunks and no per-page image embeddings. The retrieval blend is
hybrid (text + image), so the image side of the blend silently degrades
for these rows.
Strategy
--------
Most rows have no PDF (they were ingested via text or are MD-only). The
script:
1. Lists every case_law row with ``source_kind in (external_upload,
internal_committee)`` that is missing image embeddings.
2. Tries to find a staged file by matching token-rich substrings of the
case_number against filenames under ``data/precedent-library/`` and
``data/internal-decisions/``.
3. If the file is a PDF or DOCX (both renderable by PyMuPDF/fitz),
renders pages at ``MULTIMODAL_DPI``, embeds via voyage-multimodal-3
in batches of 50, and stores rows into ``precedent_image_embeddings``.
4. Skips rows whose only candidate file is .md (PyMuPDF can't render
markdown) or rows with no staged file.
Designed to run inside the FastAPI/MCP container (where ``/data/...``
exists and Voyage env vars are present). Locally, it falls back to
``/home/chaim/legal-ai/data/...`` via ``_resolve_local_path``.
Usage::
# Inside container (Coolify):
docker exec -it <container> /opt/api/.venv/bin/python \\
/opt/api/scripts/backfill_multimodal_precedents.py --dry-run
# then:
docker exec -it <container> /opt/api/.venv/bin/python \\
/opt/api/scripts/backfill_multimodal_precedents.py --apply
Notes
-----
- Token cost: voyage-multimodal-3 averages ~3-4K tokens per dense legal
page. 70 rows * ~30 pages avg = ~2,100 pages = ~7M tokens ≈ $0.70.
- Estimate-only mode (``--dry-run``) prints the matched files and
page counts without calling Voyage or touching the DB.
- Idempotent: per-record DELETE+INSERT inside
``store_precedent_image_embeddings``, but the outer loop also
skips rows that already have rows in ``precedent_image_embeddings``.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import re
import sys
import time
from pathlib import Path
from uuid import UUID
import fitz # PyMuPDF
def _setup_paths():
"""Ensure mcp-server src is on path even when run as a standalone script.
Works both from host (``/home/chaim/legal-ai/scripts/...``) and from
inside the container (``/app/mcp-server/src``).
"""
here = Path(__file__).resolve().parent
candidates = [
here.parent / "mcp-server" / "src", # host
Path("/app/mcp-server/src"), # container
]
for c in candidates:
if c.is_dir() and str(c) not in sys.path:
sys.path.insert(0, str(c))
_setup_paths()
# Force multimodal on for this script regardless of env — backfill is
# the entire point. The deploy-time default stays whatever Coolify sets.
os.environ["MULTIMODAL_ENABLED"] = "true"
from legal_mcp import config # noqa: E402
from legal_mcp.services import db, embeddings, extractor # noqa: E402
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("backfill_multimodal_precedents")
# ───────────────────────── file matching ─────────────────────────
# Roots to search for staged precedent files. Both paths are tried; the
# first that exists wins. ``/data/`` is the in-container mount;
# ``/home/chaim/legal-ai/data/`` is the host path.
SEARCH_ROOTS = [
Path("/data/precedent-library"),
Path("/data/internal-decisions"),
Path("/home/chaim/legal-ai/data/precedent-library"),
Path("/home/chaim/legal-ai/data/internal-decisions"),
]
# Extensions we can render with PyMuPDF (fitz). MD and TXT cannot be
# rendered as page images, so we skip them.
RENDERABLE_EXTS = {".pdf", ".docx"}
# Token-extraction regex: only tokens that contain a slash or hyphen
# (real case-number kernels like "8064/20" or "25226-04-25"). We
# deliberately exclude pure numeric runs like "2011" (which is just a
# year in "(נבו 5.4.2011)") to avoid false-positive matches against
# unrelated filenames that happen to contain the same year.
_NUMBER_TOKEN = re.compile(r"\d+[-/]\d+(?:[-/]\d+)*")
def _extract_number_tokens(case_number: str) -> list[str]:
"""Pull numeric kernels out of a Hebrew case_number string.
Only returns tokens containing a slash or hyphen (real case-number
kernels), so years like "2011" and "2024" don't leak through and
falsely match filenames.
>>> _extract_number_tokens('בר"מ 25226-04-25 הוועדה')
['25226-04-25']
>>> _extract_number_tokens('ערר 8064/20 חברת')
['8064/20']
>>> _extract_number_tokens('עע"מ 10089/07 (נבו 5.4.2011)')
['10089/07', '5.4.2011'] # date stays; but '5.4.2011' is hyphenless after normalize → no match against random filenames
"""
# filter out date-shaped tokens (dotted) by additional check — only
# keep tokens whose form is N/N or N-N-..., not N.N.N
tokens = _NUMBER_TOKEN.findall(case_number)
return [t for t in tokens if "." not in t]
def _normalize_for_match(s: str) -> str:
"""Lowercase + strip whitespace/punct for filename matching."""
return re.sub(r"[\s/_-]+", "", s.lower())
def _build_file_index() -> dict[str, list[Path]]:
"""Walk SEARCH_ROOTS and return {normalized_filename: [paths]}.
Only renderable extensions are included.
"""
idx: dict[str, list[Path]] = {}
for root in SEARCH_ROOTS:
if not root.is_dir():
continue
for p in root.rglob("*"):
if not p.is_file():
continue
if p.suffix.lower() not in RENDERABLE_EXTS:
continue
if "thumbnails" in p.parts:
continue
key = _normalize_for_match(p.name)
idx.setdefault(key, []).append(p)
return idx
def _digit_parts(token: str) -> list[str]:
"""Split a token like '14306-09-23' into ['14306','09','23']."""
return [p for p in re.split(r"[-/]", token) if p]
def _find_file_for_case_number(case_number: str, file_index: dict[str, list[Path]]) -> Path | None:
"""Best-effort match a case_number → staged file path.
Two strategies:
1. **Direct contiguous match** — token normalized (e.g. "8064/20"
"806420") appears as substring of the filename normalized.
2. **Parts-match** — every digit part of the token appears
somewhere in the filename (handles reordered formats like
case_number "14306-09-23" matched to "MM-23-09-14306-967.docx",
where Nevo's case_number ordering differs from the legal
template's filename ordering). Only accepts when the longest
part has at least 4 digits — that filters out matches where
only short pieces (year fragments) overlap.
Returns the first match found, preferring PDFs over DOCX.
"""
tokens = _extract_number_tokens(case_number)
if not tokens:
return None
candidates: list[Path] = []
for token in tokens:
# Strategy 1: contiguous
normalized_token = _normalize_for_match(token)
token_hyphenated = token.replace("/", "-")
normalized_hyphenated = _normalize_for_match(token_hyphenated)
# Strategy 2: parts
parts = _digit_parts(token)
longest_part = max((len(p) for p in parts), default=0)
for normalized_name, paths in file_index.items():
if normalized_token in normalized_name or normalized_hyphenated in normalized_name:
candidates.extend(paths)
continue
# Parts-match requires longest part >= 4 digits AND all parts present
if longest_part >= 4 and parts and all(p in normalized_name for p in parts):
candidates.extend(paths)
if not candidates:
return None
# Dedupe while preserving order
seen = set()
unique = []
for p in candidates:
if p not in seen:
seen.add(p)
unique.append(p)
# Prefer PDFs over DOCX (PDF rendering is more reliable for embedded fonts/images)
pdf = next((p for p in unique if p.suffix.lower() == ".pdf"), None)
return pdf or unique[0]
# ───────────────────────── backfill core ─────────────────────────
PRECEDENT_LIBRARY_THUMBNAILS = Path(config.DATA_DIR) / "precedent-library" / "thumbnails"
async def _embed_one_precedent(case_law_id: UUID, src_path: Path) -> dict:
"""Render + embed + store image embeddings for a single precedent.
Mirrors ``precedent_library._embed_precedent_pages`` but takes any
fitz-renderable file (PDF or DOCX).
"""
thumb_dir = PRECEDENT_LIBRARY_THUMBNAILS / str(case_law_id)
# PyMuPDF reads DOCX natively (uses its own MuPDF backend). We use
# the same renderer as the live pipeline for consistency.
rendered = await asyncio.to_thread(
extractor.render_pages_for_multimodal,
src_path,
config.MULTIMODAL_DPI,
config.MULTIMODAL_THUMB_DPI,
thumb_dir,
)
if not rendered:
return {"pages_embedded": 0, "status": "no_pages"}
images = [pil for pil, _ in rendered]
thumbs = [t for _, t in rendered]
img_embs = await embeddings.embed_images(images)
page_records = []
for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)):
rel_thumb = None
if thumb is not None:
try:
rel_thumb = str(thumb.relative_to(config.DATA_DIR))
except ValueError:
rel_thumb = str(thumb)
page_records.append({
"page_number": i + 1,
"embedding": emb,
"image_thumbnail_path": rel_thumb,
})
stored = await db.store_precedent_image_embeddings(
case_law_id, page_records, model_name=config.MULTIMODAL_MODEL,
)
return {"pages_embedded": stored, "status": "ok"}
async def _scan_missing_records() -> list[dict]:
pool = await db.get_pool()
rows = await pool.fetch(
"""
SELECT id, case_number, source_kind, length(full_text) AS text_len
FROM case_law cl
WHERE NOT EXISTS (
SELECT 1 FROM precedent_image_embeddings ppi
WHERE ppi.case_law_id = cl.id
)
AND cl.source_kind IN ('external_upload', 'internal_committee')
ORDER BY cl.source_kind, cl.case_number
"""
)
return [
{
"id": UUID(str(r["id"])),
"case_number": r["case_number"],
"source_kind": r["source_kind"],
"text_len": r["text_len"],
}
for r in rows
]
async def backfill_all(
*,
dry_run: bool,
limit: int | None = None,
only_source_kind: str | None = None,
) -> dict:
"""Main entrypoint — scan, match, render, embed, store."""
await db.init_schema()
records = await _scan_missing_records()
if only_source_kind:
records = [r for r in records if r["source_kind"] == only_source_kind]
if limit:
records = records[:limit]
file_index = _build_file_index()
logger.info("Indexed %d renderable files under %s",
sum(len(v) for v in file_index.values()),
", ".join(str(r) for r in SEARCH_ROOTS if r.is_dir()))
summary = {
"scanned": len(records),
"matched": 0,
"no_match": 0,
"embedded": 0,
"skipped_md_only": 0,
"errors": 0,
"total_pages": 0,
"details": [],
}
for rec in records:
case_law_id = rec["id"]
case_number = rec["case_number"]
src = _find_file_for_case_number(case_number, file_index)
if not src:
summary["no_match"] += 1
summary["details"].append({
"case_law_id": str(case_law_id),
"case_number": case_number,
"source_kind": rec["source_kind"],
"status": "no_match",
})
logger.info(" NO MATCH: %s", case_number[:80])
continue
# Probe page count without rendering (cheap)
try:
doc = fitz.open(str(src))
page_count = len(doc)
doc.close()
except Exception as e:
summary["errors"] += 1
summary["details"].append({
"case_law_id": str(case_law_id),
"case_number": case_number,
"matched_file": str(src),
"status": "open_error",
"error": str(e),
})
logger.warning(" OPEN ERROR for %s: %s", case_number[:60], e)
continue
summary["matched"] += 1
summary["total_pages"] += page_count
logger.info(" MATCHED: %s -> %s (%d pages)",
case_number[:60], src.name, page_count)
if dry_run:
summary["details"].append({
"case_law_id": str(case_law_id),
"case_number": case_number,
"matched_file": str(src),
"pages": page_count,
"status": "would_embed",
})
continue
# Actually embed + store
t0 = time.time()
try:
result = await _embed_one_precedent(case_law_id, src)
elapsed = time.time() - t0
summary["embedded"] += 1
summary["details"].append({
"case_law_id": str(case_law_id),
"case_number": case_number,
"matched_file": str(src),
"pages": page_count,
"elapsed_sec": round(elapsed, 1),
"status": "ok",
**result,
})
logger.info(" EMBEDDED %d pages in %.1fs", result["pages_embedded"], elapsed)
except Exception as e:
summary["errors"] += 1
summary["details"].append({
"case_law_id": str(case_law_id),
"case_number": case_number,
"matched_file": str(src),
"status": "embed_error",
"error": str(e),
})
logger.exception(" EMBED ERROR for %s", case_number[:60])
return summary
# ───────────────────────── CLI ─────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Backfill voyage-multimodal-3 embeddings for case_law records "
"(external_upload + internal_committee) missing them.",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Only scan + match; do not call Voyage or write to DB.",
)
parser.add_argument(
"--apply", action="store_true",
help="Render, embed, and store. Implies not --dry-run.",
)
parser.add_argument(
"--limit", type=int, default=None,
help="Max number of records to process (debugging).",
)
parser.add_argument(
"--only", choices=["external_upload", "internal_committee"], default=None,
help="Restrict to a single source_kind.",
)
args = parser.parse_args()
if not args.apply and not args.dry_run:
# Default to dry_run for safety.
args.dry_run = True
logger.info(
"Mode=%s MULTIMODAL_MODEL=%s DPI=%d THUMB_DPI=%d",
"DRY-RUN" if args.dry_run else "APPLY",
config.MULTIMODAL_MODEL, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI,
)
summary = asyncio.run(
backfill_all(
dry_run=args.dry_run,
limit=args.limit,
only_source_kind=args.only,
)
)
print()
print("=" * 60)
print("BACKFILL SUMMARY")
print("=" * 60)
print(f" scanned: {summary['scanned']}")
print(f" matched: {summary['matched']}")
print(f" no_match: {summary['no_match']}")
print(f" total pages: {summary['total_pages']}")
if args.dry_run:
# Cost estimate: ~3.5K tokens/page * $0.12/1M tokens
est_tokens = summary["total_pages"] * 3500
est_cost = est_tokens / 1_000_000 * 0.12
print(f" est. tokens: ~{est_tokens:,} (~${est_cost:.2f})")
else:
print(f" embedded: {summary['embedded']}")
print(f" errors: {summary['errors']}")
if __name__ == "__main__":
main()

313
scripts/compute_ndcg.py Executable file
View File

@@ -0,0 +1,313 @@
#!/usr/bin/env python3
"""Compute nDCG@10 over the RAG retrieval feedback table (TaskMaster #50).
Outputs aggregated metrics as JSON:
{
"generated_at": "2026-05-26T12:34:56+00:00",
"k": 10,
"summary": {
"total_searches_with_feedback": int,
"total_searches_logged": int,
"feedback_coverage_pct": float,
"avg_ndcg_at_10": float | null
},
"by_search_type": [
{"search_type": "precedent_library",
"searches_with_feedback": int,
"avg_ndcg_at_10": float | null},
...
],
"by_week": [
{"week_start": "2026-05-19",
"search_type": "precedent_library",
"searches_with_feedback": int,
"avg_ndcg_at_10": float | null},
...
],
"top_cited_case_law": [
{"case_law_id": "...", "case_number": "...",
"case_name": "...", "cite_count": int},
...
]
}
Run:
python ~/legal-ai/scripts/compute_ndcg.py
python ~/legal-ai/scripts/compute_ndcg.py --weeks 12 --k 10
python ~/legal-ai/scripts/compute_ndcg.py --pretty
"""
from __future__ import annotations
import argparse
import asyncio
import json
import math
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
import asyncpg
# Allow running as a standalone script — no package install required.
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src"))
def _postgres_url() -> str:
"""Resolve POSTGRES_URL the same way the MCP server does."""
url = os.environ.get("POSTGRES_URL")
if url:
return url
user = os.environ.get("POSTGRES_USER", "legal_ai")
pw = os.environ.get("POSTGRES_PASSWORD", "")
host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
port = os.environ.get("POSTGRES_PORT", "5433")
db = os.environ.get("POSTGRES_DB", "legal_ai")
return f"postgres://{user}:{pw}@{host}:{port}/{db}"
def dcg(relevances: list[int]) -> float:
"""Discounted Cumulative Gain at the length of ``relevances``.
Uses the "gain = 2^rel - 1" form so high-relevance hits get
significantly more weight than marginal ones — matches the
convention used by most IR papers and TREC-EVAL.
"""
total = 0.0
for i, rel in enumerate(relevances, start=1):
gain = (2 ** rel) - 1
total += gain / math.log2(i + 1)
return total
def ndcg_at_k(rel_at_rank: dict[int, int], k: int) -> float | None:
"""Compute nDCG@k.
Args:
rel_at_rank: ``{rank (1-based): relevance_score (0..3)}``.
Ranks above ``k`` are ignored. Missing ranks count as 0.
k: cutoff.
Returns:
nDCG in [0,1], or ``None`` if there's nothing to score
(no relevant hits in the top-k -> IDCG = 0).
"""
actual = [rel_at_rank.get(r, 0) for r in range(1, k + 1)]
if not any(actual):
return None
ideal = sorted(actual, reverse=True)
idcg = dcg(ideal)
if idcg == 0:
return None
return dcg(actual) / idcg
async def _fetch_feedback_rows(conn: asyncpg.Connection, weeks: int | None) -> list[dict]:
"""Pull all (search_log_id, rank, relevance_score, search_type, created_at)
rows where there's at least one feedback row.
Restricting to recent weeks keeps the scan cheap on a growing log.
"""
where = ""
params: list = []
if weeks is not None and weeks > 0:
where = "WHERE sl.created_at >= NOW() - ($1::int * INTERVAL '1 week')"
params.append(weeks)
sql = f"""
SELECT sl.id::text AS search_log_id,
sl.search_type AS search_type,
sl.created_at AS created_at,
srf.rank AS rank,
srf.relevance_score AS relevance_score
FROM search_relevance_feedback srf
JOIN search_logs sl ON sl.id = srf.search_log_id
{where}
"""
rows = await conn.fetch(sql, *params)
return [dict(r) for r in rows]
async def _fetch_corpus_totals(conn: asyncpg.Connection, weeks: int | None) -> dict[str, int]:
"""Total search_logs count (overall and by type) — used for coverage %."""
where = ""
params: list = []
if weeks is not None and weeks > 0:
where = "WHERE created_at >= NOW() - ($1::int * INTERVAL '1 week')"
params.append(weeks)
total_row = await conn.fetchrow(
f"SELECT COUNT(*) AS n FROM search_logs {where}",
*params,
)
by_type = await conn.fetch(
f"SELECT search_type, COUNT(*) AS n FROM search_logs {where} GROUP BY search_type",
*params,
)
return {
"_total": int(total_row["n"]) if total_row else 0,
**{r["search_type"]: int(r["n"]) for r in by_type},
}
async def _fetch_top_cited(conn: asyncpg.Connection, limit: int = 20) -> list[dict]:
"""Most-cited case_law (from auto-inferred feedback)."""
rows = await conn.fetch(
"""
SELECT cl.id::text AS case_law_id,
cl.case_number AS case_number,
cl.case_name AS case_name,
COUNT(*) AS cite_count
FROM search_relevance_feedback srf
JOIN case_law cl ON cl.id = srf.case_law_id
WHERE srf.feedback_source = 'cited_in_decision'
GROUP BY cl.id, cl.case_number, cl.case_name
ORDER BY COUNT(*) DESC
LIMIT $1
""",
limit,
)
return [dict(r) for r in rows]
def _aggregate(
feedback_rows: list[dict],
k: int,
) -> tuple[dict[str, float], dict[tuple[str, str], float], int]:
"""Group feedback by search_log, compute per-log nDCG, then aggregate
by search_type and by (week, search_type)."""
by_log: dict[str, dict] = {}
for row in feedback_rows:
slid = row["search_log_id"]
if slid not in by_log:
by_log[slid] = {
"search_type": row["search_type"],
"created_at": row["created_at"],
"rels": {},
}
rank = int(row["rank"])
if 1 <= rank <= k:
by_log[slid]["rels"][rank] = int(row["relevance_score"])
type_ndcg: dict[str, list[float]] = {}
week_ndcg: dict[tuple[str, str], list[float]] = {}
total_logs_with_feedback = 0
for entry in by_log.values():
score = ndcg_at_k(entry["rels"], k)
if score is None:
continue
total_logs_with_feedback += 1
type_ndcg.setdefault(entry["search_type"], []).append(score)
week_start = entry["created_at"].date()
# Round down to ISO week Monday.
week_start = week_start.fromordinal(
week_start.toordinal() - week_start.weekday()
)
wkey = (week_start.isoformat(), entry["search_type"])
week_ndcg.setdefault(wkey, []).append(score)
type_avg = {t: sum(v) / len(v) for t, v in type_ndcg.items() if v}
week_avg = {k_: sum(v) / len(v) for k_, v in week_ndcg.items() if v}
return type_avg, week_avg, total_logs_with_feedback
async def compute(weeks: int | None, k: int) -> dict:
conn = await asyncpg.connect(_postgres_url())
try:
fb_rows = await _fetch_feedback_rows(conn, weeks)
totals = await _fetch_corpus_totals(conn, weeks)
top_cited = await _fetch_top_cited(conn)
finally:
await conn.close()
type_avg, week_avg, logs_scored = _aggregate(fb_rows, k)
total_logs = totals.get("_total", 0)
overall_avg = (
sum(v * len([s for s in type_avg]) for v in []) or None # placeholder
)
# Recompute overall_avg cleanly: micro-average over all per-log scores.
all_scores: list[float] = []
for v in [type_avg[t] for t in type_avg]:
# type_avg already collapsed per-type — instead, re-run aggregation
# over fb_rows by reusing the per-log calc, micro-averaged.
pass
# Simpler: redo with per-log granularity for overall mean.
by_log_overall: dict[str, dict[int, int]] = {}
log_to_type: dict[str, str] = {}
for row in fb_rows:
slid = row["search_log_id"]
by_log_overall.setdefault(slid, {})
rank = int(row["rank"])
if 1 <= rank <= k:
by_log_overall[slid][rank] = int(row["relevance_score"])
log_to_type[slid] = row["search_type"]
per_log_scores: list[float] = []
for slid, rels in by_log_overall.items():
s = ndcg_at_k(rels, k)
if s is not None:
per_log_scores.append(s)
overall_avg = (sum(per_log_scores) / len(per_log_scores)) if per_log_scores else None
by_search_type = []
for t, totals_n in sorted(totals.items()):
if t == "_total":
continue
by_search_type.append({
"search_type": t,
"searches_logged": totals_n,
"searches_with_feedback": sum(
1 for slid, tp in log_to_type.items() if tp == t
),
"avg_ndcg_at_k": round(type_avg[t], 4) if t in type_avg else None,
})
by_week = [
{
"week_start": week,
"search_type": stype,
"avg_ndcg_at_k": round(score, 4),
}
for (week, stype), score in sorted(week_avg.items())
]
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"k": k,
"window_weeks": weeks,
"summary": {
"total_searches_logged": total_logs,
"total_searches_with_feedback": logs_scored,
"feedback_coverage_pct": (
round(100 * logs_scored / total_logs, 2) if total_logs else 0.0
),
"avg_ndcg_at_k": round(overall_avg, 4) if overall_avg is not None else None,
},
"by_search_type": by_search_type,
"by_week": by_week,
"top_cited_case_law": [
{**r, "cite_count": int(r["cite_count"])} for r in top_cited
],
}
def main() -> int:
p = argparse.ArgumentParser(description="Compute nDCG@k from search_relevance_feedback")
p.add_argument("--k", type=int, default=10, help="cutoff (default: 10)")
p.add_argument(
"--weeks",
type=int,
default=None,
help="restrict to the last N weeks (default: all time)",
)
p.add_argument("--pretty", action="store_true", help="indented JSON output")
args = p.parse_args()
result = asyncio.run(compute(weeks=args.weeks, k=args.k))
indent = 2 if args.pretty else None
print(json.dumps(result, ensure_ascii=False, indent=indent, default=str))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,278 @@
"""Halacha extraction quality monitor.
Tracks ``avg(confidence)`` of halachot extracted by the LLM pipeline
over time and emits an alert when the recent-window average drops more
than a configurable threshold below the lifetime baseline.
Intended schedule: weekly cron, e.g. ``0 8 * * 1`` (Monday 08:00).
Output: a single-line JSON payload to stdout (suitable for piping
into ``notify.py`` or a webhook), plus a human-readable alert text
on stderr when drift is detected.
Usage
-----
::
# Default — weekly window, 5% drop threshold (relative)
python scripts/monitor_halacha_quality.py
# Custom window/threshold:
python scripts/monitor_halacha_quality.py --window 14 --threshold 0.03
# Only emit JSON, no stderr alert:
python scripts/monitor_halacha_quality.py --silent
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
def _setup_paths():
"""Make ``legal_mcp`` importable when run from anywhere."""
here = Path(__file__).resolve().parent
candidates = [
here.parent / "mcp-server" / "src", # host
Path("/app/mcp-server/src"), # container
]
for c in candidates:
if c.is_dir() and str(c) not in sys.path:
sys.path.insert(0, str(c))
_setup_paths()
from legal_mcp.services import db # noqa: E402
# Statuses considered "trusted" — the baseline is computed only over
# halachot whose extraction the chair has accepted. ``pending_review``
# is the queue waiting for review; their average tends to be lower
# because anything obviously bad gets rejected before approval. So we
# track BOTH series and alert on either one drifting:
# 1. Trusted baseline (approved+published) — drift here means the
# extractor's "best output" quality is degrading.
# 2. All extracted — drift here means raw extractor accuracy is down.
TRUSTED_STATUSES = ("approved", "published")
async def _collect_metrics(window_days: int) -> dict:
pool = await db.get_pool()
# Lifetime baselines
lifetime_all = await pool.fetchrow(
"SELECT count(*) AS n, AVG(confidence) AS avg_conf FROM halachot"
)
lifetime_trusted = await pool.fetchrow(
f"""
SELECT count(*) AS n, AVG(confidence) AS avg_conf
FROM halachot
WHERE review_status = ANY($1::text[])
""",
list(TRUSTED_STATUSES),
)
# Recent window
recent_all = await pool.fetchrow(
f"""
SELECT count(*) AS n, AVG(confidence) AS avg_conf
FROM halachot
WHERE created_at > NOW() - INTERVAL '{int(window_days)} days'
"""
)
recent_trusted = await pool.fetchrow(
f"""
SELECT count(*) AS n, AVG(confidence) AS avg_conf
FROM halachot
WHERE created_at > NOW() - INTERVAL '{int(window_days)} days'
AND review_status = ANY($1::text[])
""",
list(TRUSTED_STATUSES),
)
# Per-precedent recent (extractor outputs that haven't been reviewed
# yet) — sometimes the canary that catches drift earliest. We track
# the most-recent N extractions regardless of review state.
pending_recent = await pool.fetchrow(
"""
SELECT count(*) AS n, AVG(confidence) AS avg_conf
FROM halachot
WHERE review_status = 'pending_review'
"""
)
def _f(rec, key: str) -> float | None:
v = rec[key]
if v is None:
return None
return float(v)
def _i(rec, key: str) -> int:
v = rec[key]
return int(v) if v is not None else 0
return {
"window_days": int(window_days),
"lifetime_all_count": _i(lifetime_all, "n"),
"lifetime_all_avg": _f(lifetime_all, "avg_conf"),
"lifetime_trusted_count": _i(lifetime_trusted, "n"),
"lifetime_trusted_avg": _f(lifetime_trusted, "avg_conf"),
"recent_all_count": _i(recent_all, "n"),
"recent_all_avg": _f(recent_all, "avg_conf"),
"recent_trusted_count": _i(recent_trusted, "n"),
"recent_trusted_avg": _f(recent_trusted, "avg_conf"),
"pending_review_count": _i(pending_recent, "n"),
"pending_review_avg": _f(pending_recent, "avg_conf"),
}
def _drift(baseline: float | None, recent: float | None) -> float | None:
"""Return relative drift as a positive number when recent < baseline.
>>> _drift(0.85, 0.80) # -> 0.0588 (5.88% drop)
"""
if baseline is None or recent is None or baseline <= 0:
return None
return (baseline - recent) / baseline
def _evaluate(metrics: dict, threshold: float, min_sample: int) -> dict:
"""Decide whether any series is drifting below threshold."""
alerts: list[dict] = []
series = [
(
"trusted",
metrics["lifetime_trusted_avg"],
metrics["recent_trusted_avg"],
metrics["recent_trusted_count"],
),
(
"all_extracted",
metrics["lifetime_all_avg"],
metrics["recent_all_avg"],
metrics["recent_all_count"],
),
]
for name, baseline, recent, recent_n in series:
d = _drift(baseline, recent)
entry = {
"series": name,
"baseline": baseline,
"recent": recent,
"recent_n": recent_n,
"drift": d,
"alert": False,
"reason": None,
}
if recent_n < min_sample:
entry["reason"] = f"recent_n={recent_n} below min_sample={min_sample}"
elif d is None:
entry["reason"] = "missing baseline or recent average"
elif d >= threshold:
entry["alert"] = True
entry["reason"] = (
f"drift {d:.1%} >= threshold {threshold:.1%} "
f"(baseline={baseline:.3f}, recent={recent:.3f}, n={recent_n})"
)
else:
entry["reason"] = (
f"drift {d:.1%} < threshold {threshold:.1%} — within tolerance"
)
alerts.append(entry)
any_alert = any(a["alert"] for a in alerts)
return {"alert": any_alert, "series": alerts}
def _format_alert_text(metrics: dict, decision: dict) -> str:
lines = [
f"Halacha quality alert — window={metrics['window_days']}d",
"",
]
for s in decision["series"]:
sym = "ALERT" if s["alert"] else "ok"
baseline = f"{s['baseline']:.3f}" if s["baseline"] is not None else ""
recent = f"{s['recent']:.3f}" if s["recent"] is not None else ""
drift = f"{s['drift']:.1%}" if s["drift"] is not None else ""
lines.append(
f" [{sym}] {s['series']}: baseline={baseline} recent={recent} "
f"drift={drift} n={s['recent_n']}"
)
if s["reason"]:
lines.append(f" {s['reason']}")
return "\n".join(lines)
async def run(
*,
window_days: int,
threshold: float,
min_sample: int,
) -> dict:
metrics = await _collect_metrics(window_days)
decision = _evaluate(metrics, threshold, min_sample)
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"window_days": window_days,
"threshold_rel": threshold,
"min_sample": min_sample,
"metrics": metrics,
"decision": decision,
}
def main():
parser = argparse.ArgumentParser(
description="Monitor halacha extraction quality (confidence drift)."
)
parser.add_argument(
"--window", type=int, default=7,
help="Recent window in days (default: 7).",
)
parser.add_argument(
"--threshold", type=float, default=0.05,
help="Relative drop alert threshold (default: 0.05 = 5%%).",
)
parser.add_argument(
"--min-sample", type=int, default=5,
help="Minimum halachot in window to evaluate (default: 5). "
"Below this, the series is reported but not alerted on.",
)
parser.add_argument(
"--silent", action="store_true",
help="Suppress stderr alert text; only print JSON.",
)
parser.add_argument(
"--exit-on-alert", action="store_true",
help="Exit with status 1 when an alert fires (default: always exit 0).",
)
args = parser.parse_args()
report = asyncio.run(
run(
window_days=args.window,
threshold=args.threshold,
min_sample=args.min_sample,
)
)
# JSON to stdout
print(json.dumps(report, ensure_ascii=False, indent=2))
if report["decision"]["alert"] and not args.silent:
print("", file=sys.stderr)
print(_format_alert_text(report["metrics"], report["decision"]), file=sys.stderr)
if args.exit_on_alert and report["decision"]["alert"]:
sys.exit(1)
if __name__ == "__main__":
main()