Files
legal-ai/scripts/audit_corpus_integrity.py
Chaim 2aee398b4a
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m35s
feat: Stage C — RAG advanced (#33, #47, #48, #49, #50, #51)
Six independent sub-tasks dispatched in parallel; aggregated here.

## #33 — Hide case_name column
library-list-panel.tsx: `<TableHead>` + `<TableCell>` for "שם"
get `className="hidden"` in both Court and Committee row variants.
DB column preserved for future use.

## #47 — Audit script periodic
New scripts/audit_corpus_integrity.py — 3 SQL checks (external+ערר
prefix, internal missing chair/district, cases.practice_area enum)
+ CEO wakeup on violations + cron `0 7 * * *`. First run: 0 issues.

## #48 — Parent-doc retrieval (gated, default off)
Schema V17: precedent_chunks.parent_chunk_id + chunk_role
('child'|'parent'). New chunker.chunk_document_hierarchical() —
section-aware parents (~1500 tokens) containing ~5 overlapping
children (~300 tokens each). New db.store_precedent_chunks_hierarchical
two-pass writer. Search SQL (semantic + lexical) LEFT-JOIN parent and
swap content + dedupe by parent_chunk_id when flag on. Toggle:
PARENT_DOC_RETRIEVAL_ENABLED + PARENT_DOC_{CHILD,PARENT}_SIZE_TOKENS.
Backfill ~3min and ~$0.20 — deferred to follow-up.

## #49 — Multimodal backfill
New scripts/backfill_multimodal_precedents.py with token-matching
case_number ↔ source files (PDF + DOCX via PyMuPDF). Ran in container:
26 precedents embedded, 503 pages, $0.21, 0 errors. precedent_image_embeddings
grew 3 → 29 rows. 44 remaining are style_corpus-migrated rows (no
source file on disk) — will catch up when re-uploaded.

## #50 — Closed-loop feedback + nDCG
Schema V18: search_logs + search_relevance_feedback. New telemetry.py
with fire-and-forget log_search_bg (p50 = 0.002ms — zero overhead) +
auto-infer_relevance_from_citations (reads case drafts → marks score=3
when cited precedent appears in past search top-K). Hooks added to 5
search paths. scripts/compute_ndcg.py for aggregation. Two admin API
endpoints (GET /api/admin/rag-metrics + POST .../infer). Dashboard UI
deferred — API is enough for now.

## #51 — Halacha quality monitoring
New scripts/monitor_halacha_quality.py — baseline avg confidence
(trusted=0.849, all=0.833, pending=0.694) with rolling window drift
detection. Default 5% threshold. Exits non-zero on alert for cron
integration. Recommended: `0 8 * * 1` weekly Mon 8am.

## Bonus: 230 unlinked citations → missing_precedents
Bulk-imported 230 distinct unlinked citations from
precedent_internal_citations to missing_precedents.status='open',
party='committee', with notes listing source citers. Top candidate:
ע"א 3213/97 (cited 5x). Total open missing_precedents now 237.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 11:26:52 +00:00

282 lines
9.6 KiB
Python

"""Periodic corpus-integrity audit.
Runs a set of read-only SQL checks against the legal-ai DB to detect rows
that violate domain constraints which are *not* enforced by the schema
(or were added after the constraint was put in place).
Checks performed:
A. ``case_law`` rows with ``source_kind='external_upload'`` whose
``case_number`` starts with the Hebrew prefixes ``ערר`` / ``בל"מ``.
Internal committee decisions belong to ``source_kind='internal_committee'``.
B. ``case_law`` rows with ``source_kind='internal_committee'`` that
lack a ``chair_name`` and/or ``district``. Internal decisions must
carry both.
C. ``cases`` rows with a ``practice_area`` outside the closed set
{``rishuy_uvniya``, ``betterment_levy``, ``compensation_197``, ``''``}.
Output:
* Appends a timestamped block to ``data/logs/corpus_integrity_audit.log``.
* If hits are found AND env ``PAPERCLIP_API_URL`` + ``PAPERCLIP_API_KEY``
are set, posts a CEO wakeup comment via ``POST /api/agents/{ceo}/wakeup``
(best-effort, never fails the script).
* Always exits 0 unless an unexpected error occurs (so cron stays quiet).
Cron suggestion (daily 07:00):
0 7 * * * /home/chaim/legal-ai/mcp-server/.venv/bin/python \\
/home/chaim/legal-ai/scripts/audit_corpus_integrity.py
Idempotent. Read-only on the DB.
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
# Load ~/.env so POSTGRES_* / PAPERCLIP_* are picked up when run from cron.
ENV_PATH = os.path.expanduser("~/.env")
if os.path.isfile(ENV_PATH):
with open(ENV_PATH, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k, v)
import asyncpg # noqa: E402
try:
import httpx # noqa: E402
except ImportError: # httpx is part of the legal-ai venv; not required for DB checks
httpx = None # type: ignore[assignment]
REPO_ROOT = Path(__file__).resolve().parent.parent
LOG_PATH = REPO_ROOT / "data" / "logs" / "corpus_integrity_audit.log"
CHECK_A_SQL = (
"SELECT id, case_number FROM case_law "
"WHERE source_kind = 'external_upload' AND case_number ~ '^ערר|^בל\"מ' "
"ORDER BY case_number"
)
CHECK_B_SQL = (
"SELECT id, case_number, chair_name, district FROM case_law "
"WHERE source_kind = 'internal_committee' "
"AND (chair_name IS NULL OR chair_name = '' "
" OR district IS NULL OR district = '') "
"ORDER BY case_number"
)
CHECK_C_SQL = (
"SELECT id, case_number, practice_area FROM cases "
"WHERE practice_area IS NOT NULL "
"AND practice_area NOT IN ('rishuy_uvniya', 'betterment_levy', "
" 'compensation_197', '') "
"ORDER BY case_number"
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("audit_corpus_integrity")
def _pg_url() -> str:
"""Resolve POSTGRES URL from env, falling back to discrete vars."""
url = os.environ.get("POSTGRES_URL")
if url:
return url
pg_host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
pg_port = int(os.environ.get("POSTGRES_PORT", "5433"))
pg_user = os.environ.get("POSTGRES_USER", "legal_ai")
pg_pw = os.environ.get("POSTGRES_PASSWORD", "")
pg_db = os.environ.get("POSTGRES_DB", "legal_ai")
if not pg_pw:
raise SystemExit("POSTGRES_PASSWORD / POSTGRES_URL not set")
return f"postgres://{pg_user}:{pg_pw}@{pg_host}:{pg_port}/{pg_db}"
async def _run_check(conn: asyncpg.Connection, sql: str) -> list[dict]:
rows = await conn.fetch(sql)
return [dict(r) for r in rows]
async def _resolve_ceo_agent_id() -> str | None:
"""Best-effort: look up the CEO agent UUID for CMP via the API.
Returns None if PAPERCLIP env is missing or the lookup fails.
"""
base_url = os.environ.get("PAPERCLIP_API_URL")
api_key = os.environ.get("PAPERCLIP_API_KEY")
if not (base_url and api_key and httpx is not None):
return None
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.get(
f"{base_url}/api/agents",
headers={"Authorization": f"Bearer {api_key}"},
)
r.raise_for_status()
payload = r.json()
items = payload if isinstance(payload, list) else payload.get("items", [])
for item in items:
# Look for a CMP-side CEO (master); the CMPA mirror has a different id.
title = (item.get("title") or "").lower()
role = (item.get("role") or "").lower()
if "ceo" in title or "ceo" in role or "מנכ" in title:
return item.get("id")
except Exception as e:
logger.warning("CEO lookup failed: %s", e)
return None
async def _notify_ceo(summary: str) -> bool:
"""Post a wakeup comment to the CEO agent. Returns True on best-effort success."""
base_url = os.environ.get("PAPERCLIP_API_URL")
api_key = os.environ.get("PAPERCLIP_API_KEY")
if not (base_url and api_key and httpx is not None):
logger.info("Paperclip env not set — skipping CEO wakeup")
return False
ceo_id = await _resolve_ceo_agent_id()
if not ceo_id:
logger.info("Could not resolve CEO agent id — skipping wakeup")
return False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.post(
f"{base_url}/api/agents/{ceo_id}/wakeup",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"source": "automation",
"triggerDetail": "audit_corpus_integrity",
"reason": "corpus integrity audit found violations",
"payload": {"summary": summary},
},
)
r.raise_for_status()
logger.info("Notified CEO (agent_id=%s)", ceo_id)
return True
except Exception as e:
logger.warning("CEO wakeup failed: %s", e)
return False
def _format_report(
a_hits: list[dict],
b_hits: list[dict],
c_hits: list[dict],
ts: datetime,
) -> str:
parts: list[str] = []
parts.append(f"=== Corpus integrity audit @ {ts.isoformat()} ===")
parts.append("")
parts.append(
f"Check A (case_law external_upload with internal-style "
f"case_number prefix): {len(a_hits)} hit(s)"
)
for row in a_hits[:50]:
parts.append(f" - id={row['id']} case_number={row['case_number']!r}")
if len(a_hits) > 50:
parts.append(f" ... ({len(a_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check B (case_law internal_committee missing chair_name/district): "
f"{len(b_hits)} hit(s)"
)
for row in b_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"chair_name={row.get('chair_name')!r} district={row.get('district')!r}"
)
if len(b_hits) > 50:
parts.append(f" ... ({len(b_hits) - 50} more truncated)")
parts.append("")
parts.append(
f"Check C (cases.practice_area outside closed set): {len(c_hits)} hit(s)"
)
for row in c_hits[:50]:
parts.append(
f" - id={row['id']} case_number={row['case_number']!r} "
f"practice_area={row.get('practice_area')!r}"
)
if len(c_hits) > 50:
parts.append(f" ... ({len(c_hits) - 50} more truncated)")
parts.append("")
return "\n".join(parts)
async def main(args: argparse.Namespace) -> int:
pg_url = _pg_url()
conn = await asyncpg.connect(pg_url)
try:
a_hits = await _run_check(conn, CHECK_A_SQL)
b_hits = await _run_check(conn, CHECK_B_SQL)
c_hits = await _run_check(conn, CHECK_C_SQL)
finally:
await conn.close()
total = len(a_hits) + len(b_hits) + len(c_hits)
ts = datetime.now(timezone.utc)
report = _format_report(a_hits, b_hits, c_hits, ts)
# Always write to log (creates dir + file if missing).
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(report)
f.write("\n")
# Echo to stdout so cron mail / manual run shows the result.
print(report)
if total == 0:
logger.info("clean: no integrity violations found")
return 0
logger.warning(
"found %d total violation(s) (A=%d, B=%d, C=%d)",
total, len(a_hits), len(b_hits), len(c_hits),
)
if args.notify:
summary_lines = [
"ה-audit היומי על הקורפוס מצא הפרות:",
f"- Check A (external_upload עם prefix פנימי): {len(a_hits)}",
f"- Check B (internal_committee חסר chair/district): {len(b_hits)}",
f"- Check C (cases.practice_area לא תקין): {len(c_hits)}",
"",
f"פירוט מלא: {LOG_PATH}",
]
await _notify_ceo("\n".join(summary_lines))
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--no-notify",
dest="notify",
action="store_false",
help="Don't post a CEO wakeup even if hits are found",
)
parser.set_defaults(notify=True)
args = parser.parse_args()
try:
rc = asyncio.run(main(args))
except KeyboardInterrupt:
sys.exit(130)
sys.exit(rc)