#!/usr/bin/env python """Audit the style_corpus table — list each decision with what's populated and what's missing. Produces a JSON report at data/audit/corpus-YYYY-MM-DD.json so we can see at a glance which corpus entries lack summary/outcome/key_principles/appeal_subtype/chunks/embeddings. Run with the mcp-server venv (has asyncpg): POSTGRES_URL=postgres://... ./mcp-server/.venv/bin/python scripts/audit_training_corpus.py Without POSTGRES_URL, falls back to the per-field env vars used by web/mcp-server config. """ from __future__ import annotations import asyncio import json import os import re import sys from datetime import UTC, date, datetime from pathlib import Path import asyncpg def _build_dsn() -> str: if url := os.environ.get("POSTGRES_URL"): return url return ( f"postgres://{os.environ.get('POSTGRES_USER', 'legal_ai')}:" f"{os.environ.get('POSTGRES_PASSWORD', '')}@" f"{os.environ.get('POSTGRES_HOST', '127.0.0.1')}:" f"{os.environ.get('POSTGRES_PORT', '5433')}/" f"{os.environ.get('POSTGRES_DB', 'legal_ai')}" ) async def audit() -> dict: dsn = _build_dsn() conn = await asyncpg.connect(dsn) try: rows = await conn.fetch( """ SELECT id, decision_number, decision_date, subject_categories, length(full_text) AS chars, summary, outcome, key_principles, practice_area, appeal_subtype, document_id, created_at FROM style_corpus ORDER BY decision_date NULLS LAST, decision_number """ ) # Chunk + embedding counts for each related document — by direct FK first, # then by title-match for legacy rows where style_corpus.document_id is NULL. chunk_counts = await conn.fetch( """ SELECT d.id AS doc_id, d.title, count(c.id) AS chunks, count(c.embedding) FILTER (WHERE c.embedding IS NOT NULL) AS chunks_with_emb FROM documents d LEFT JOIN document_chunks c ON c.document_id = d.id WHERE d.title LIKE '[קורפוס]%' OR d.id IN (SELECT document_id FROM style_corpus WHERE document_id IS NOT NULL) GROUP BY d.id, d.title """ ) finally: await conn.close() by_doc_id = {r["doc_id"]: r for r in chunk_counts} # Index corpus documents by every digit cluster in their title so we can # match against style_corpus.decision_number regardless of formatting # (e.g. style_corpus has "1109-25" but title may say "ARAR-25-1109" or # "ערר 1009-25"). Each digit run >=3 chars becomes a key. by_digit: dict[str, dict] = {} for r in chunk_counts: title = r["title"] or "" for tok in re.findall(r"\d{3,}", title): by_digit.setdefault(tok, r) decisions = [] gaps_total = { "summary": 0, "outcome": 0, "key_principles": 0, "appeal_subtype": 0, "subject_categories": 0, "chunks": 0, "embeddings": 0, "document_id": 0, } for row in rows: cats = row["subject_categories"] if isinstance(cats, str): try: cats = json.loads(cats) except json.JSONDecodeError: cats = [] cats = cats or [] kp = row["key_principles"] if isinstance(kp, str): try: kp = json.loads(kp) except json.JSONDecodeError: kp = [] kp = kp or [] # Resolve chunks: prefer FK, fall back to digit-cluster match on decision_number. chunks = 0 chunks_with_emb = 0 if row["document_id"] and row["document_id"] in by_doc_id: r = by_doc_id[row["document_id"]] chunks = r["chunks"] chunks_with_emb = r["chunks_with_emb"] elif row["decision_number"]: for tok in re.findall(r"\d{3,}", row["decision_number"]): if tok in by_digit: r = by_digit[tok] chunks = r["chunks"] chunks_with_emb = r["chunks_with_emb"] break missing = [] if not row["summary"]: missing.append("summary") gaps_total["summary"] += 1 if not row["outcome"]: missing.append("outcome") gaps_total["outcome"] += 1 if not kp: missing.append("key_principles") gaps_total["key_principles"] += 1 if not row["appeal_subtype"]: missing.append("appeal_subtype") gaps_total["appeal_subtype"] += 1 if not cats: missing.append("subject_categories") gaps_total["subject_categories"] += 1 if chunks == 0: missing.append("chunks") gaps_total["chunks"] += 1 elif chunks_with_emb < chunks: missing.append(f"embeddings({chunks_with_emb}/{chunks})") gaps_total["embeddings"] += 1 if row["document_id"] is None: missing.append("document_id") gaps_total["document_id"] += 1 decisions.append({ "id": str(row["id"]), "decision_number": row["decision_number"] or "", "decision_date": row["decision_date"].isoformat() if row["decision_date"] else None, "chars": row["chars"], "subject_categories": cats, "practice_area": row["practice_area"] or "", "appeal_subtype": row["appeal_subtype"] or "", "summary_len": len(row["summary"] or ""), "outcome_len": len(row["outcome"] or ""), "key_principles_count": len(kp), "chunks": chunks, "chunks_with_embeddings": chunks_with_emb, "document_id": str(row["document_id"]) if row["document_id"] else None, "missing": missing, "created_at": row["created_at"].isoformat() if row["created_at"] else None, }) return { "generated_at": datetime.now(UTC).isoformat(), "total_decisions": len(decisions), "gaps_total": gaps_total, "decisions": decisions, } async def main() -> int: report = await audit() out_dir = Path(__file__).resolve().parents[1] / "data" / "audit" out_dir.mkdir(parents=True, exist_ok=True) today = date.today().isoformat() out_file = out_dir / f"corpus-{today}.json" out_file.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") # Console summary print(f"Total decisions: {report['total_decisions']}") print("Gaps by field (count of decisions missing it):") for field, n in report["gaps_total"].items(): bar = "█" * min(n, 60) print(f" {field:25s} {n:3d} {bar}") print(f"\nReport written to {out_file}") return 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))