"""Halacha extraction quality monitor. Tracks ``avg(confidence)`` of halachot extracted by the LLM pipeline over time and emits an alert when the recent-window average drops more than a configurable threshold below the lifetime baseline. Intended schedule: weekly cron, e.g. ``0 8 * * 1`` (Monday 08:00). Output: a single-line JSON payload to stdout (suitable for piping into ``notify.py`` or a webhook), plus a human-readable alert text on stderr when drift is detected. Usage ----- :: # Default — weekly window, 5% drop threshold (relative) python scripts/monitor_halacha_quality.py # Custom window/threshold: python scripts/monitor_halacha_quality.py --window 14 --threshold 0.03 # Only emit JSON, no stderr alert: python scripts/monitor_halacha_quality.py --silent """ from __future__ import annotations import argparse import asyncio import json import os import sys from datetime import datetime, timezone from pathlib import Path def _setup_paths(): """Make ``legal_mcp`` importable when run from anywhere.""" here = Path(__file__).resolve().parent candidates = [ here.parent / "mcp-server" / "src", # host Path("/app/mcp-server/src"), # container ] for c in candidates: if c.is_dir() and str(c) not in sys.path: sys.path.insert(0, str(c)) _setup_paths() from legal_mcp.services import db # noqa: E402 # Statuses considered "trusted" — the baseline is computed only over # halachot whose extraction the chair has accepted. ``pending_review`` # is the queue waiting for review; their average tends to be lower # because anything obviously bad gets rejected before approval. So we # track BOTH series and alert on either one drifting: # 1. Trusted baseline (approved+published) — drift here means the # extractor's "best output" quality is degrading. # 2. All extracted — drift here means raw extractor accuracy is down. TRUSTED_STATUSES = ("approved", "published") async def _collect_metrics(window_days: int) -> dict: pool = await db.get_pool() # Lifetime baselines lifetime_all = await pool.fetchrow( "SELECT count(*) AS n, AVG(confidence) AS avg_conf FROM halachot" ) lifetime_trusted = await pool.fetchrow( f""" SELECT count(*) AS n, AVG(confidence) AS avg_conf FROM halachot WHERE review_status = ANY($1::text[]) """, list(TRUSTED_STATUSES), ) # Recent window recent_all = await pool.fetchrow( f""" SELECT count(*) AS n, AVG(confidence) AS avg_conf FROM halachot WHERE created_at > NOW() - INTERVAL '{int(window_days)} days' """ ) recent_trusted = await pool.fetchrow( f""" SELECT count(*) AS n, AVG(confidence) AS avg_conf FROM halachot WHERE created_at > NOW() - INTERVAL '{int(window_days)} days' AND review_status = ANY($1::text[]) """, list(TRUSTED_STATUSES), ) # Per-precedent recent (extractor outputs that haven't been reviewed # yet) — sometimes the canary that catches drift earliest. We track # the most-recent N extractions regardless of review state. pending_recent = await pool.fetchrow( """ SELECT count(*) AS n, AVG(confidence) AS avg_conf FROM halachot WHERE review_status = 'pending_review' """ ) def _f(rec, key: str) -> float | None: v = rec[key] if v is None: return None return float(v) def _i(rec, key: str) -> int: v = rec[key] return int(v) if v is not None else 0 return { "window_days": int(window_days), "lifetime_all_count": _i(lifetime_all, "n"), "lifetime_all_avg": _f(lifetime_all, "avg_conf"), "lifetime_trusted_count": _i(lifetime_trusted, "n"), "lifetime_trusted_avg": _f(lifetime_trusted, "avg_conf"), "recent_all_count": _i(recent_all, "n"), "recent_all_avg": _f(recent_all, "avg_conf"), "recent_trusted_count": _i(recent_trusted, "n"), "recent_trusted_avg": _f(recent_trusted, "avg_conf"), "pending_review_count": _i(pending_recent, "n"), "pending_review_avg": _f(pending_recent, "avg_conf"), } def _drift(baseline: float | None, recent: float | None) -> float | None: """Return relative drift as a positive number when recent < baseline. >>> _drift(0.85, 0.80) # -> 0.0588 (5.88% drop) """ if baseline is None or recent is None or baseline <= 0: return None return (baseline - recent) / baseline def _evaluate(metrics: dict, threshold: float, min_sample: int) -> dict: """Decide whether any series is drifting below threshold.""" alerts: list[dict] = [] series = [ ( "trusted", metrics["lifetime_trusted_avg"], metrics["recent_trusted_avg"], metrics["recent_trusted_count"], ), ( "all_extracted", metrics["lifetime_all_avg"], metrics["recent_all_avg"], metrics["recent_all_count"], ), ] for name, baseline, recent, recent_n in series: d = _drift(baseline, recent) entry = { "series": name, "baseline": baseline, "recent": recent, "recent_n": recent_n, "drift": d, "alert": False, "reason": None, } if recent_n < min_sample: entry["reason"] = f"recent_n={recent_n} below min_sample={min_sample}" elif d is None: entry["reason"] = "missing baseline or recent average" elif d >= threshold: entry["alert"] = True entry["reason"] = ( f"drift {d:.1%} >= threshold {threshold:.1%} " f"(baseline={baseline:.3f}, recent={recent:.3f}, n={recent_n})" ) else: entry["reason"] = ( f"drift {d:.1%} < threshold {threshold:.1%} — within tolerance" ) alerts.append(entry) any_alert = any(a["alert"] for a in alerts) return {"alert": any_alert, "series": alerts} def _format_alert_text(metrics: dict, decision: dict) -> str: lines = [ f"Halacha quality alert — window={metrics['window_days']}d", "", ] for s in decision["series"]: sym = "ALERT" if s["alert"] else "ok" baseline = f"{s['baseline']:.3f}" if s["baseline"] is not None else "—" recent = f"{s['recent']:.3f}" if s["recent"] is not None else "—" drift = f"{s['drift']:.1%}" if s["drift"] is not None else "—" lines.append( f" [{sym}] {s['series']}: baseline={baseline} recent={recent} " f"drift={drift} n={s['recent_n']}" ) if s["reason"]: lines.append(f" {s['reason']}") return "\n".join(lines) async def run( *, window_days: int, threshold: float, min_sample: int, ) -> dict: metrics = await _collect_metrics(window_days) decision = _evaluate(metrics, threshold, min_sample) return { "generated_at": datetime.now(timezone.utc).isoformat(), "window_days": window_days, "threshold_rel": threshold, "min_sample": min_sample, "metrics": metrics, "decision": decision, } def main(): parser = argparse.ArgumentParser( description="Monitor halacha extraction quality (confidence drift)." ) parser.add_argument( "--window", type=int, default=7, help="Recent window in days (default: 7).", ) parser.add_argument( "--threshold", type=float, default=0.05, help="Relative drop alert threshold (default: 0.05 = 5%%).", ) parser.add_argument( "--min-sample", type=int, default=5, help="Minimum halachot in window to evaluate (default: 5). " "Below this, the series is reported but not alerted on.", ) parser.add_argument( "--silent", action="store_true", help="Suppress stderr alert text; only print JSON.", ) parser.add_argument( "--exit-on-alert", action="store_true", help="Exit with status 1 when an alert fires (default: always exit 0).", ) args = parser.parse_args() report = asyncio.run( run( window_days=args.window, threshold=args.threshold, min_sample=args.min_sample, ) ) # JSON to stdout print(json.dumps(report, ensure_ascii=False, indent=2)) if report["decision"]["alert"] and not args.silent: print("", file=sys.stderr) print(_format_alert_text(report["metrics"], report["decision"]), file=sys.stderr) if args.exit_on_alert and report["decision"]["alert"]: sys.exit(1) if __name__ == "__main__": main()