#!/usr/bin/env python3 """Backfill — LLM synthesis of canonical_halachot.canonical_statement (V41 Phase 4). WHAT THIS DOES -------------- Walks canonicals in ``review_status='pending_synthesis'`` and, for each, asks a local ``claude_session`` model (Opus by default) to rewrite the statement carried over from the representative halacha into ONE clean, case-independent legal principle — grounded in the instances' supporting quotes (INV-AH). Accepted rewrites are committed with a fresh embedding; abstained / drift-rejected / new-citation outcomes keep the original statement. Either way ``review_status`` advances to ``pending_review`` for the chair gate (G10 / INV-LRN1). All logic lives in services/canonical_synthesis.py (G2) — this script is the batch driver: ordering, throttling, dry-run reporting and a CSV audit trail. IDEMPOTENCY / RESUME -------------------- Operates on ``pending_synthesis`` only; a committed canonical leaves the queue, so re-running continues where it stopped. Safe to interrupt. THROTTLING ---------- Each item is one Opus call against chaim's claude.ai subscription. Before every item the shared usage_limits ceilings are checked; once a window is over its soft ceiling the run STOPS gracefully (resumable) instead of hammering 429. Disable with --no-throttle (e.g. small samples). USAGE ----- cd ~/legal-ai/mcp-server .venv/bin/python ../scripts/backfill_canonical_synthesis.py --sample 20 # dry-run, 20 random .venv/bin/python ../scripts/backfill_canonical_synthesis.py --dry-run --limit 50 # dry-run, first 50 (multi-instance first) .venv/bin/python ../scripts/backfill_canonical_synthesis.py --apply # full throttled run .venv/bin/python ../scripts/backfill_canonical_synthesis.py --apply --limit 200 """ from __future__ import annotations import argparse import asyncio import csv import os import random import sys from collections import Counter from datetime import datetime, timezone from uuid import UUID sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) from legal_mcp.services import canonical_synthesis, db # noqa: E402 try: # stdlib-only module, importable from system python too from legal_mcp.services import usage_limits except Exception: # pragma: no cover usage_limits = None AUDIT_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "audit") async def _pending(limit: int | None, sample: int | None) -> list[dict]: """Pending-synthesis canonicals, multi-instance first (highest value).""" pool = await db.get_pool() rows = await pool.fetch( "SELECT id::text AS id, instance_count, canonical_statement " "FROM canonical_halachot WHERE review_status='pending_synthesis' " "ORDER BY instance_count DESC, created_at", ) items = [dict(r) for r in rows] if sample and sample < len(items): items = random.sample(items, sample) if limit: items = items[:limit] return items def _throttled() -> tuple[bool, str]: if usage_limits is None: return False, "usage_limits unavailable" usage = usage_limits.subscription_usage() if usage is None: return False, "usage read failed (proceeding)" over, _reset, detail = usage_limits.ceiling_status(usage) return over, detail def _short(s: str, n: int = 90) -> str: s = (s or "").replace("\n", " ") return s if len(s) <= n else s[: n - 1] + "…" async def _run(apply: bool, limit: int | None, sample: int | None, throttle: bool, verbose: bool) -> int: items = await _pending(limit, sample) total = len(items) mode = "APPLY" if apply else "DRY-RUN" print(f"[{mode}] {total} canonicals pending_synthesis to process " f"(throttle={'on' if throttle else 'off'})\n") if not total: print("nothing to do.") return 0 stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") os.makedirs(AUDIT_DIR, exist_ok=True) audit_path = os.path.join( AUDIT_DIR, f"canonical-synthesis-{'apply' if apply else 'dryrun'}-{stamp}.csv") counts: Counter[str] = Counter() stopped = False with open(audit_path, "w", newline="", encoding="utf-8") as fh: w = csv.writer(fh) w.writerow(["canonical_id", "instance_count", "status", "drift_cosine", "reason", "before", "after"]) for n, it in enumerate(items, 1): if throttle: over, detail = _throttled() if over: print(f"\n⏸ usage ceiling reached ({detail}) — stopping at " f"{n - 1}/{total}. Re-run to resume.") stopped = True break cid = UUID(it["id"]) if apply: res = await canonical_synthesis.synthesize_and_apply(cid) else: res = await canonical_synthesis.synthesize_canonical(cid) counts[res["status"]] += 1 w.writerow([it["id"], it["instance_count"], res["status"], res.get("drift_cosine"), res.get("reason", ""), res.get("original", ""), res.get("proposed", "")]) mark = {"accepted": "✓", "abstained": "·", "drift_rejected": "✗", "new_citation": "✗", "llm_error": "!", "no_instances": "·", "not_found": "!"}.get(res["status"], "?") line = (f"[{n}/{total}] {mark} {res['status']:<14} " f"inst={it['instance_count']} {it['id'][:8]}") print(line) if verbose and res["status"] in ("accepted",) or (verbose and res.get("proposed") != res.get("original")): print(f" before: {_short(res.get('original', ''))}") print(f" after : {_short(res.get('proposed', ''))} " f"(drift={res.get('drift_cosine')})") if res.get("reason"): print(f" reason: {_short(res['reason'], 110)}") processed = sum(counts.values()) print(f"\n── summary ({mode}) — {processed}/{total} processed" f"{' (stopped early)' if stopped else ''} ──") for status, c in counts.most_common(): print(f" {status:<16} {c}") print(f"\naudit CSV: {audit_path}") if not apply: print("dry-run — nothing written to the DB. Re-run with --apply to commit.") return 0 def main() -> int: p = argparse.ArgumentParser(description="LLM synthesis of canonical_statement (V41 Phase 4)") p.add_argument("--apply", action="store_true", help="commit to the DB (default: dry-run)") p.add_argument("--dry-run", action="store_true", help="explicit dry-run (default)") p.add_argument("--limit", type=int, default=None, help="cap items processed") p.add_argument("--sample", type=int, default=None, help="random sample of N (dry-run inspection)") p.add_argument("--no-throttle", action="store_true", help="skip usage-ceiling checks") p.add_argument("--verbose", action="store_true", help="print before/after for changed items") args = p.parse_args() return asyncio.run(_run( apply=args.apply, limit=args.limit, sample=args.sample, throttle=not args.no_throttle, verbose=args.verbose, )) if __name__ == "__main__": raise SystemExit(main())