"""Backfill the planning-schemes registry (טבלת plans) from our existing decisions. Scans the decision corpus — both our drafts (data/cases/*/drafts/decision.md) and Daphna's published finals (data/training/cmp/*.md) — for paragraphs that state a plan's validity ("פורסמה למתן תוקף …"), extracts the structured plan record via the local-LLM extractor, and upserts each into the registry as review_status='pending_review'. The chair then reviews the queue (plan_list / plan_review, or the future UI) and only the approved rows become the SSOT that block-tet cites. This is the "import from all our decisions" step — it seeds identity+validity once instead of re-deriving from appraisals per case (G2). Idempotent (G3): re-running upserts on the normalized plan_number, never duplicating. Run (dry-run, the default — prints what WOULD be ingested, writes nothing): mcp-server/.venv/bin/python scripts/backfill_plans_registry.py Apply (actually upsert as pending_review): mcp-server/.venv/bin/python scripts/backfill_plans_registry.py --apply Limit to a subset while testing: mcp-server/.venv/bin/python scripts/backfill_plans_registry.py --glob 'data/training/cmp/*.md' """ from __future__ import annotations import argparse import asyncio import os import re import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) from legal_mcp.services import db, plans_extractor # noqa: E402 _REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) _MARKER = "פורסמה למתן תוקף" _DEFAULT_GLOBS = ( "data/cases/*/drafts/decision.md", "data/training/cmp/*.md", ) def _candidate_paragraphs(text: str) -> list[str]: """Return paragraphs that assert a plan's validity (contain the marker).""" paras = re.split(r"\n\s*\n", text) return [p.strip() for p in paras if _MARKER in p] def _source_case_number(path: str) -> str: """Derive a provenance case number from the file path, best-effort. data/cases//drafts/decision.md → . Otherwise '' (training finals are keyed by Daphna's filename, not our case-number space).""" m = re.search(r"/data/cases/([^/]+)/", path) return m.group(1) if m else "" async def _process_file(path: str, *, apply: bool) -> dict: with open(path, encoding="utf-8") as fh: text = fh.read() paras = _candidate_paragraphs(text) if not paras: return {"path": path, "paragraphs": 0, "candidates": 0, "upserted": 0} block = "\n\n".join(paras) candidates = await plans_extractor.extract_plans_from_text(block) upserted = 0 if apply and candidates: plans = await plans_extractor.upsert_candidates( candidates, source_case_number=_source_case_number(path), model_used="backfill", ) upserted = len(plans) rel = os.path.relpath(path, _REPO_ROOT) print(f"\n• {rel} — {len(paras)} פסקאות-תוקף, {len(candidates)} מועמדים" + (f", {upserted} נכתבו" if apply else " (dry-run)")) for c in candidates: gd = c.get("gazette_date") or "—" yp = f' י"פ {c["yalkut_number"]}' if c.get("yalkut_number") else "" print(f" - {c.get('display_name') or c['plan_number']} | תוקף: {gd}{yp}") return { "path": path, "paragraphs": len(paras), "candidates": len(candidates), "upserted": upserted, } async def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--apply", action="store_true", help="actually upsert (default: dry-run, writes nothing)") parser.add_argument("--glob", action="append", dest="globs", help="override the corpus glob(s); repeatable") args = parser.parse_args() import glob as globmod globs = args.globs or list(_DEFAULT_GLOBS) files: list[str] = [] for g in globs: files.extend(sorted(globmod.glob(os.path.join(_REPO_ROOT, g)))) files = sorted(set(files)) mode = "APPLY" if args.apply else "DRY-RUN" print(f"[{mode}] backfill plans registry — {len(files)} קבצים, globs={globs}") totals = {"paragraphs": 0, "candidates": 0, "upserted": 0} for path in files: try: r = await _process_file(path, apply=args.apply) except Exception as e: # noqa: BLE001 — record, keep going print(f"\n!! שגיאה ב-{path}: {e}", file=sys.stderr) continue for k in totals: totals[k] += r[k] print(f"\n=== סיכום [{mode}]: {len(files)} קבצים | " f"{totals['paragraphs']} פסקאות | {totals['candidates']} מועמדים | " f"{totals['upserted']} נכתבו (pending_review) ===") if not args.apply: print("הרץ עם --apply כדי לכתוב למרשם, ואז אשר ב-plan_review / תור-האישור.") await db.close_pool() if __name__ == "__main__": asyncio.run(main())