"""Batch upload proofread training corpus to style DB. Two-phase workflow: --preview Extract metadata from all .md files, print review table, don't upload --upload Actually upload all files (with optional --only FILE to run one) Metadata extraction: * decision_number: from filename (ARAR-YY-NNNN / ערר NNNN-YY) or decision date year * decision_date: from "ניתנה ... ב " near end of text * categories: keyword heuristics on body text """ from __future__ import annotations import argparse import asyncio import os import re import sys from pathlib import Path PROOFREAD_DIR = Path("/home/chaim/legal-ai/data/training/proofread") # Manual metadata overrides for files where auto-extraction can't determine values. METADATA_OVERRIDES: dict[str, dict] = { "ARAR-25-1067 - יחיעם יפה ואח׳.md": { "decision_date": "2025-11-27", # no "ניתנה" signature in file; user-provided }, } # Files to skip — already in style_corpus from legacy ingestion # (verified by exact character-count match with existing DB rows). SKIP_FILES = { "תמא 38-בית הכרם-1126+1141-החלטה.md", # → corpus: 1126/1141 "היתר בניה-בית שמש-1180+1181-החלטה.md", # → corpus: 1180/1181 "היתר בניה-הראל-1043+1054-החלטה.md", # → corpus: 1043/1054 "היתר בניה-הראל-1071+1077-החלטה.md", # → corpus: 1071/1077 } # Load env vars needed by mcp-server ENV_FILE = Path.home() / ".env" if ENV_FILE.exists(): for line in ENV_FILE.read_text().splitlines(): if "=" in line and not line.startswith("#"): k, v = line.split("=", 1) os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'")) # Make mcp-server package importable sys.path.insert(0, "/home/chaim/legal-ai/mcp-server/src") # ── Decision number extraction ─────────────────────────────────── FILENAME_NUMBER_PATTERNS = [ # ARAR-YY-NNNN[-X] - title.md re.compile(r"^ARAR-(\d{2})-(\d{3,4})"), # ערר NNNN-YY title.md or ערר NNNN-YY title re.compile(r"^ערר\s+(\d{3,4})-(\d{2})"), # ערר NNNN - title (no year in filename — needs date lookup) re.compile(r"^ערר\s+(\d{3,4})\s*-"), ] LEGACY_MULTI_PATTERN = re.compile(r"(\d{3,4})\+(\d{3,4})") def decision_number_from_filename(stem: str) -> tuple[str | None, str | None]: """Return (number, year_short) or (multi_number, None) or (None, None). year_short is YY (last 2 digits) if extractable from filename. For legacy files with 'NNNN+NNNN' or no year, returns partial info that must be completed from decision date. """ # ARAR-YY-NNNN m = FILENAME_NUMBER_PATTERNS[0].match(stem) if m: year, num = m.group(1), m.group(2) return f"{num}/{year}", year # ערר NNNN-YY m = FILENAME_NUMBER_PATTERNS[1].match(stem) if m: num, year = m.group(1), m.group(2) return f"{num}/{year}", year # ערר NNNN - title (no year) m = FILENAME_NUMBER_PATTERNS[2].match(stem) if m: num = m.group(1) return f"{num}/??", None # Legacy: "NNNN+NNNN" merged decisions m = LEGACY_MULTI_PATTERN.search(stem) if m: return f"{m.group(1)}+{m.group(2)}/??", None return None, None # ── Decision date extraction ───────────────────────────────────── HEBREW_MONTHS = { "ינואר": 1, "בינואר": 1, "פברואר": 2, "בפברואר": 2, "מרץ": 3, "מרס": 3, "במרץ": 3, "במרס": 3, "אפריל": 4, "באפריל": 4, "מאי": 5, "במאי": 5, "יוני": 6, "ביוני": 6, "יולי": 7, "ביולי": 7, "אוגוסט": 8, "באוגוסט": 8, "ספטמבר": 9, "בספטמבר": 9, "אוקטובר": 10, "באוקטובר": 10, "נובמבר": 11, "בנובמבר": 11, "דצמבר": 12, "בדצמבר": 12, } # Matches " ב, " or " , " (with optional commas) DATE_RE = re.compile( r"(\d{1,2})\s+(ב?(?:ינואר|פברואר|מרץ|מרס|אפריל|מאי|יוני|יולי|אוגוסט|ספטמבר|אוקטובר|נובמבר|דצמבר))\s*[,.]?\s*(\d{4})" ) NITNA_RE = re.compile(r"ניתנ[הו]?\s+(?:פה\s+אחד|בדעת\s+רוב|היום)?") def decision_date_from_text(text: str) -> str | None: """Extract decision date in YYYY-MM-DD format from 'ניתנה... DATE' section. Searches the last ~2000 chars where the signing block lives. """ tail = text[-2500:] if len(text) > 2500 else text # Prefer dates near "ניתנה" marker nitna_match = NITNA_RE.search(tail) search_text = tail[nitna_match.start():] if nitna_match else tail m = DATE_RE.search(search_text) if not m: # Fall back: search whole tail m = DATE_RE.search(tail) if not m: return None day = int(m.group(1)) month = HEBREW_MONTHS.get(m.group(2)) year = int(m.group(3)) if not month: return None try: from datetime import date return date(year, month, day).isoformat() except ValueError: return None # ── Subject category extraction ────────────────────────────────── # Categories as defined in the tool signature. ALL_CATEGORIES = [ "בנייה", "שימוש חורג", "תכנית", "היתר", "הקלה", "חלוקה", 'תמ"א 38', "היטל השבחה", "פיצויים 197", ] def categorize(text: str) -> list[str]: """Heuristic category detection based on subject matter, not incidental mentions. Strategy: the real subject is established in the opening 2000 chars (first decision-opening paragraph). Secondary signal is repetition count — casual mentions in law citations don't repeat. """ opening = text[:2000] # subject is stated up front t = text cats: list[str] = [] # תמ"א 38 — very specific marker, single mention is fine if re.search(r'תמ[״"\']?א\s*38|תמא\s*38', t): cats.append('תמ"א 38') # היטל השבחה — require real engagement: must appear in opening OR 3+ times hsbacha_count = len(re.findall(r"היטל(?:י)?\s+השבחה", t)) if hsbacha_count >= 3 or re.search(r"היטל(?:י)?\s+השבחה", opening): cats.append("היטל השבחה") # פיצויים 197 — require multiple mentions OR in opening p197_re = r"פיצויים\s+לפי\s+(?:ס(?:עיף|')\s*)?197|סעיף\s*197|ס['\"]?\s*197" p197_count = len(re.findall(p197_re, t)) if p197_count >= 2 or re.search(p197_re, opening): cats.append("פיצויים 197") # שימוש חורג — must appear in opening OR 3+ times (avoids law-quote false positives) shimush_count = t.count("שימוש חורג") if shimush_count >= 3 or "שימוש חורג" in opening: cats.append("שימוש חורג") # הקלה — real subject if 3+ mentions AND appears in opening hakala_count = len(re.findall(r"\bהקלה\b|\bהקלות\b", t)) if hakala_count >= 3 and re.search(r"\bהקלה\b|\bהקלות\b", opening): cats.append("הקלה") # חלוקה — "איחוד וחלוקה" or "חלוקה חדשה" (specific phrases) if re.search(r"איחוד\s+וחלוקה|חלוקה\s+חדשה|תכנית\s+לחלוקה", t): cats.append("חלוקה") # תכנית — plan-level appeal (primary subject). Allow ה/ב/ל prefixes on תכנית. tochnit_opening = bool(re.search( r"הפקדת\s+ה?תכנית|" r"אישור\s+ה?תכנית|" r"המלצה\s+להפקיד|" r"להפקיד\s+את\s+ה?תכנית|" r"לדון\s+בתכנית|" r"דנה\s+בתכנית|" r"החלטה\s+לאשר\s+ה?תכנית", opening, )) if tochnit_opening: cats.append("תכנית") # היתר — "בקשה להיתר" or "היתר בניה" as subject in opening if re.search(r"בקשה\s+להיתר|היתר\s+בני(?:י)?ה", opening): cats.append("היתר") # בנייה — default/fallback for building-permit cases # (not for plan-level תכנית-only cases) has_permit_subject = "היתר" in cats or "הקלה" in cats or 'תמ"א 38' in cats if has_permit_subject and "בנייה" not in cats: cats.append("בנייה") # If nothing matched, default to בנייה return cats or ["בנייה"] # ── Year fallback from date ────────────────────────────────────── def finalize_decision_number(number: str | None, date_iso: str | None) -> str: """If filename number is missing year, fill it from decision date.""" if not number: if date_iso: # Extract last 2 digits of Hebrew year via Gregorian year return f"??/{date_iso[2:4]}" return "" if number.endswith("/??"): if date_iso: yy = date_iso[2:4] return number.replace("/??", f"/{yy}") return number.replace("/??", "") return number # ── Main metadata extraction ───────────────────────────────────── def extract_metadata(path: Path) -> dict: text = path.read_text(encoding="utf-8") num_from_name, _ = decision_number_from_filename(path.stem) date_iso = decision_date_from_text(text) decision_number = finalize_decision_number(num_from_name, date_iso) cats = categorize(text) meta = { "file": path.name, "decision_number": decision_number, "decision_date": date_iso or "??", "categories": cats, "chars": len(text), } # Apply manual overrides if path.name in METADATA_OVERRIDES: meta.update(METADATA_OVERRIDES[path.name]) return meta def print_preview(results: list[dict]) -> None: """Print review table of metadata for all files.""" print(f"\n{'#':<3} {'FILE':<55} {'NUMBER':<15} {'DATE':<12} {'CATEGORIES'}") print("-" * 130) for i, r in enumerate(results, 1): file_short = r["file"] if len(r["file"]) <= 53 else r["file"][:50] + "..." cats = ", ".join(r["categories"]) print(f"{i:<3} {file_short:<55} {r['decision_number']:<15} {r['decision_date']:<12} {cats}") print() # Highlight issues issues = [r for r in results if r["decision_date"] == "??" or not r["decision_number"] or "??" in r["decision_number"]] if issues: print(f"⚠️ {len(issues)} files with incomplete metadata:") for r in issues: print(f" - {r['file']} → number={r['decision_number']!r} date={r['decision_date']!r}") # ── Upload ─────────────────────────────────────────────────────── async def upload_one(meta: dict) -> dict: from legal_mcp.tools.documents import document_upload_training path = PROOFREAD_DIR / meta["file"] result = await document_upload_training( file_path=str(path), decision_number=meta["decision_number"], decision_date=meta["decision_date"] if meta["decision_date"] != "??" else "", subject_categories=meta["categories"], title=path.stem, ) return {"file": meta["file"], "result": result} async def upload_all(results: list[dict]) -> None: for i, meta in enumerate(results, 1): try: r = await upload_one(meta) print(f"[{i}/{len(results)}] ✓ {meta['file']}") print(f" {r['result'][:200]}") except Exception as e: print(f"[{i}/{len(results)}] ✗ {meta['file']}: {e}") # ── CLI ────────────────────────────────────────────────────────── def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--preview", action="store_true", help="Show metadata table without uploading") ap.add_argument("--upload", action="store_true", help="Upload all files to style corpus") ap.add_argument("--only", help="Only process this specific filename") args = ap.parse_args() files = sorted(PROOFREAD_DIR.glob("*.md")) files = [f for f in files if f.name not in SKIP_FILES] if args.only: files = [f for f in files if f.name == args.only] if not files: print(f"File not found: {args.only}") return 1 results = [extract_metadata(f) for f in files] if args.preview or not args.upload: print_preview(results) if not args.upload: return 0 if args.upload: print(f"\n>>> Uploading {len(results)} files to style corpus...\n") asyncio.run(upload_all(results)) return 0 if __name__ == "__main__": sys.exit(main())