#!/usr/bin/env python """A/B (NON-DESTRUCTIVE): re-extract halachot for ONE precedent with a chosen model+effort and compare against the existing stored halachot. Purpose: decide whether re-running halacha extraction on Opus 4.8 (@ xhigh/max effort) yields fewer / higher-quality halachot than the current production output — WITHOUT deleting or storing anything in the DB. Mirrors the production pipeline in `halacha_extractor.extract()` (same prompts, same chunk selection + fallback, same quote-verification), but swaps the LLM call for `claude -p --model --effort ` and skips embeddings + DB writes. Usage: DOTENV_PATH=/home/chaim/.env DATA_DIR=/home/chaim/legal-ai/data \ AB_MODEL=claude-opus-4-8 AB_EFFORT=xhigh \ .venv/bin/python scripts/ab_halacha_opus48.py Env knobs: AB_MODEL (default claude-opus-4-8), AB_EFFORT (default xhigh), AB_CONCURRENCY (default 2). """ from __future__ import annotations import asyncio import json import os import statistics import sys from collections import Counter from uuid import UUID from legal_mcp.config import parse_llm_json from legal_mcp.services import db from legal_mcp.services import halacha_extractor as hx MODEL = os.environ.get("AB_MODEL", "claude-opus-4-8") EFFORT = os.environ.get("AB_EFFORT", "xhigh") CONCURRENCY = int(os.environ.get("AB_CONCURRENCY", "2")) CHUNK_TIMEOUT = int(os.environ.get("AB_CHUNK_TIMEOUT", "1800")) async def run_claude(system: str, prompt: str, timeout: int = CHUNK_TIMEOUT): """One `claude -p` call with explicit --model/--effort. Returns parsed JSON.""" full = f"{system}\n\n{prompt}" cmd = [ "claude", "-p", "--output-format", "json", "--max-turns", "1", "--model", MODEL, "--effort", EFFORT, ] proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) out_b, err_b = await asyncio.wait_for( proc.communicate(input=full.encode("utf-8")), timeout=timeout, ) if proc.returncode != 0: raise RuntimeError( f"claude CLI exit {proc.returncode}: " f"{err_b.decode('utf-8', 'replace').strip()[:300]}" ) raw = out_b.decode("utf-8", "replace").strip() try: data = json.loads(raw) if isinstance(data, dict) and "result" in data: raw = data["result"] except json.JSONDecodeError: pass return parse_llm_json(raw) async def extract_chunk(chunk_text, section_type, idx, total, context, is_binding): base_prompt = ( hx.HALACHA_EXTRACTION_PROMPT_BINDING if is_binding else hx.HALACHA_EXTRACTION_PROMPT_PERSUASIVE ) chunk_label = f" (חלק {idx + 1}/{total})" if total > 1 else "" user_msg = ( f"## הקלט\n" f"סוג קטע: {section_type}\n" f"{context}{chunk_label}\n\n" f"--- תחילת הטקסט ---\n{chunk_text}\n--- סוף הטקסט ---" ) try: result = await run_claude(base_prompt, user_msg) except Exception as e: print(f" ! chunk {idx + 1}/{total} failed: {e}", file=sys.stderr) return [], False if isinstance(result, list): return result, True print(f" ! chunk {idx + 1}/{total} non-list: {type(result).__name__}", file=sys.stderr) return [], False def stats(halachot: list[dict], label: str) -> dict: n = len(halachot) def fconf(x): try: return float(x.get("confidence")) except (TypeError, ValueError): return None confs = [c for c in (fconf(h) for h in halachot) if c is not None] qv = Counter(bool(h.get("quote_verified")) for h in halachot) rt = Counter(h.get("rule_type") for h in halachot) return { "label": label, "n": n, "quote_verified_true": qv.get(True, 0), "quote_verified_false": qv.get(False, 0), "conf_min": min(confs) if confs else None, "conf_median": statistics.median(confs) if confs else None, "conf_max": max(confs) if confs else None, "conf_below_0_7": sum(1 for c in confs if c < 0.7), "rule_types": dict(rt), } def print_stats(s: dict): print(f"\n=== {s['label']} ===") print(f" count : {s['n']}") print(f" quote_verified : {s['quote_verified_true']} ✓ / {s['quote_verified_false']} ✗") if s["conf_median"] is not None: print(f" confidence min/med/max: {s['conf_min']:.2f} / {s['conf_median']:.2f} / {s['conf_max']:.2f}") print(f" confidence < 0.7 : {s['conf_below_0_7']} / {s['n']}") print(f" rule_type dist : {s['rule_types']}") async def main(): if len(sys.argv) < 2: print("usage: ab_halacha_opus48.py ", file=sys.stderr) sys.exit(2) case_law_id = UUID(sys.argv[1]) record = await db.get_case_law(case_law_id) if not record: print("case_law not found", file=sys.stderr) sys.exit(1) is_binding = bool(record.get("is_binding")) citation = record.get("case_number", "") court = record.get("court", "") date_str = str(record.get("date") or "") full_text = record.get("full_text") or "" print(f"Precedent: {citation} — {record.get('case_name')}") print(f" court={court} is_binding={is_binding} prompt={'BINDING' if is_binding else 'PERSUASIVE'}") print(f" model={MODEL} effort={EFFORT} concurrency={CONCURRENCY}") # ---- Side A: existing stored halachot (current production output) ---- existing = await db.list_halachot(case_law_id=case_law_id, limit=500) by_status = Counter(h.get("review_status") for h in existing) print(f"\n[A] existing halachot in DB: {len(existing)} status breakdown: {dict(by_status)}") approved = by_status.get("approved", 0) + by_status.get("published", 0) if approved: print(f" ⚠ {approved} already approved/published — a REAL re-run would DELETE these.") # ---- Side B: fresh extraction via chosen model/effort (no DB writes) ---- chunks = await db.list_precedent_chunks(case_law_id, section_types=hx.EXTRACTABLE_SECTIONS) if not chunks: chunks = await db.list_precedent_chunks(case_law_id) print(f"\n[B] extracting from {len(chunks)} chunks via {MODEL} @ {EFFORT} ...") context = f"מקור: {citation} — {court}, {date_str}" sem = asyncio.Semaphore(CONCURRENCY) async def bounded(i, c): async with sem: return await extract_chunk(c["content"], c["section_type"], i, len(chunks), context, is_binding) results = await asyncio.gather(*[bounded(i, c) for i, c in enumerate(chunks)]) raw_b, failed = [], 0 for items, ok in results: raw_b.extend(items) if not ok: failed += 1 cleaned_b = [] for raw in raw_b: coerced = hx._coerce_halacha(raw, is_binding=is_binding) if coerced is None: continue coerced["quote_verified"] = hx._verify_quote(coerced["supporting_quote"], full_text) cleaned_b.append(coerced) print(f" raw={len(raw_b)} valid={len(cleaned_b)} failed_chunks={failed}/{len(chunks)}") # ---- Comparison ---- a_stats = stats(existing, f"A · current production (n={len(existing)})") b_stats = stats(cleaned_b, f"B · {MODEL} @ {EFFORT}") print_stats(a_stats) print_stats(b_stats) # Dump B halachot for human quality judgement out_path = f"/home/chaim/legal-ai/data/ab_halacha_{citation.replace('/', '_').replace(chr(34), '').strip()}_{EFFORT}.json" with open(out_path, "w", encoding="utf-8") as f: json.dump( {"precedent": citation, "model": MODEL, "effort": EFFORT, "A_stats": a_stats, "B_stats": b_stats, "B_halachot": cleaned_b}, f, ensure_ascii=False, indent=2, ) print(f"\nB halachot written to: {out_path}") if __name__ == "__main__": asyncio.run(main())