Files
legal-ai/scripts/.archive/batch_upload_training.py
Chaim 5c9a5d702a Clean up scripts/: archive 17, delete 5, add SCRIPTS.md registry
Active scripts (5): auto-sync-cases.sh, backup-db.sh, restore-db.sh,
notify.py, bidi_table.py

Archived (17): one-time migration/seeding scripts whose functionality
is now in MCP server or web API. Moved to scripts/.archive/

Deleted (5): zero-value scripts (duplicates, hardcoded single-case,
debug scripts)

Added scripts/SCRIPTS.md — registry of all scripts with purpose,
status, and what superseded them. CLAUDE.md updated with rule:
any script change requires SCRIPTS.md update.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:30:19 +00:00

350 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Batch upload proofread training corpus to style DB.
Two-phase workflow:
--preview Extract metadata from all .md files, print review table, don't upload
--upload Actually upload all files (with optional --only FILE to run one)
Metadata extraction:
* decision_number: from filename (ARAR-YY-NNNN / ערר NNNN-YY) or decision date year
* decision_date: from "ניתנה ... <day> ב<Hebrew month> <YYYY>" near end of text
* categories: keyword heuristics on body text
"""
from __future__ import annotations
import argparse
import asyncio
import os
import re
import sys
from pathlib import Path
PROOFREAD_DIR = Path("/home/chaim/legal-ai/data/training/proofread")
# Manual metadata overrides for files where auto-extraction can't determine values.
METADATA_OVERRIDES: dict[str, dict] = {
"ARAR-25-1067 - יחיעם יפה ואח׳.md": {
"decision_date": "2025-11-27", # no "ניתנה" signature in file; user-provided
},
}
# Files to skip — already in style_corpus from legacy ingestion
# (verified by exact character-count match with existing DB rows).
SKIP_FILES = {
"תמא 38-בית הכרם-1126+1141-החלטה.md", # → corpus: 1126/1141
"היתר בניה-בית שמש-1180+1181-החלטה.md", # → corpus: 1180/1181
"היתר בניה-הראל-1043+1054-החלטה.md", # → corpus: 1043/1054
"היתר בניה-הראל-1071+1077-החלטה.md", # → corpus: 1071/1077
}
# Load env vars needed by mcp-server
ENV_FILE = Path.home() / ".env"
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))
# Make mcp-server package importable
sys.path.insert(0, "/home/chaim/legal-ai/mcp-server/src")
# ── Decision number extraction ───────────────────────────────────
FILENAME_NUMBER_PATTERNS = [
# ARAR-YY-NNNN[-X] - title.md
re.compile(r"^ARAR-(\d{2})-(\d{3,4})"),
# ערר NNNN-YY title.md or ערר NNNN-YY title
re.compile(r"^ערר\s+(\d{3,4})-(\d{2})"),
# ערר NNNN - title (no year in filename — needs date lookup)
re.compile(r"^ערר\s+(\d{3,4})\s*-"),
]
LEGACY_MULTI_PATTERN = re.compile(r"(\d{3,4})\+(\d{3,4})")
def decision_number_from_filename(stem: str) -> tuple[str | None, str | None]:
"""Return (number, year_short) or (multi_number, None) or (None, None).
year_short is YY (last 2 digits) if extractable from filename.
For legacy files with 'NNNN+NNNN' or no year, returns partial info
that must be completed from decision date.
"""
# ARAR-YY-NNNN
m = FILENAME_NUMBER_PATTERNS[0].match(stem)
if m:
year, num = m.group(1), m.group(2)
return f"{num}/{year}", year
# ערר NNNN-YY
m = FILENAME_NUMBER_PATTERNS[1].match(stem)
if m:
num, year = m.group(1), m.group(2)
return f"{num}/{year}", year
# ערר NNNN - title (no year)
m = FILENAME_NUMBER_PATTERNS[2].match(stem)
if m:
num = m.group(1)
return f"{num}/??", None
# Legacy: "NNNN+NNNN" merged decisions
m = LEGACY_MULTI_PATTERN.search(stem)
if m:
return f"{m.group(1)}+{m.group(2)}/??", None
return None, None
# ── Decision date extraction ─────────────────────────────────────
HEBREW_MONTHS = {
"ינואר": 1, "בינואר": 1,
"פברואר": 2, "בפברואר": 2,
"מרץ": 3, "מרס": 3, "במרץ": 3, "במרס": 3,
"אפריל": 4, "באפריל": 4,
"מאי": 5, "במאי": 5,
"יוני": 6, "ביוני": 6,
"יולי": 7, "ביולי": 7,
"אוגוסט": 8, "באוגוסט": 8,
"ספטמבר": 9, "בספטמבר": 9,
"אוקטובר": 10, "באוקטובר": 10,
"נובמבר": 11, "בנובמבר": 11,
"דצמבר": 12, "בדצמבר": 12,
}
# Matches "<day> ב<month>, <year>" or "<day> <month>, <year>" (with optional commas)
DATE_RE = re.compile(
r"(\d{1,2})\s+(ב?(?:ינואר|פברואר|מרץ|מרס|אפריל|מאי|יוני|יולי|אוגוסט|ספטמבר|אוקטובר|נובמבר|דצמבר))\s*[,.]?\s*(\d{4})"
)
NITNA_RE = re.compile(r"ניתנ[הו]?\s+(?:פה\s+אחד|בדעת\s+רוב|היום)?")
def decision_date_from_text(text: str) -> str | None:
"""Extract decision date in YYYY-MM-DD format from 'ניתנה... DATE' section.
Searches the last ~2000 chars where the signing block lives.
"""
tail = text[-2500:] if len(text) > 2500 else text
# Prefer dates near "ניתנה" marker
nitna_match = NITNA_RE.search(tail)
search_text = tail[nitna_match.start():] if nitna_match else tail
m = DATE_RE.search(search_text)
if not m:
# Fall back: search whole tail
m = DATE_RE.search(tail)
if not m:
return None
day = int(m.group(1))
month = HEBREW_MONTHS.get(m.group(2))
year = int(m.group(3))
if not month:
return None
try:
from datetime import date
return date(year, month, day).isoformat()
except ValueError:
return None
# ── Subject category extraction ──────────────────────────────────
# Categories as defined in the tool signature.
ALL_CATEGORIES = [
"בנייה", "שימוש חורג", "תכנית", "היתר", "הקלה",
"חלוקה", 'תמ"א 38', "היטל השבחה", "פיצויים 197",
]
def categorize(text: str) -> list[str]:
"""Heuristic category detection based on subject matter, not incidental mentions.
Strategy: the real subject is established in the opening 2000 chars
(first decision-opening paragraph). Secondary signal is repetition count
— casual mentions in law citations don't repeat.
"""
opening = text[:2000] # subject is stated up front
t = text
cats: list[str] = []
# תמ"א 38 — very specific marker, single mention is fine
if re.search(r'תמ[״"\']?א\s*38|תמא\s*38', t):
cats.append('תמ"א 38')
# היטל השבחה — require real engagement: must appear in opening OR 3+ times
hsbacha_count = len(re.findall(r"היטל(?:י)?\s+השבחה", t))
if hsbacha_count >= 3 or re.search(r"היטל(?:י)?\s+השבחה", opening):
cats.append("היטל השבחה")
# פיצויים 197 — require multiple mentions OR in opening
p197_re = r"פיצויים\s+לפי\s+(?:ס(?:עיף|')\s*)?197|סעיף\s*197|ס['\"]?\s*197"
p197_count = len(re.findall(p197_re, t))
if p197_count >= 2 or re.search(p197_re, opening):
cats.append("פיצויים 197")
# שימוש חורג — must appear in opening OR 3+ times (avoids law-quote false positives)
shimush_count = t.count("שימוש חורג")
if shimush_count >= 3 or "שימוש חורג" in opening:
cats.append("שימוש חורג")
# הקלה — real subject if 3+ mentions AND appears in opening
hakala_count = len(re.findall(r"\bהקלה\b|\bהקלות\b", t))
if hakala_count >= 3 and re.search(r"\bהקלה\b|\bהקלות\b", opening):
cats.append("הקלה")
# חלוקה — "איחוד וחלוקה" or "חלוקה חדשה" (specific phrases)
if re.search(r"איחוד\s+וחלוקה|חלוקה\s+חדשה|תכנית\s+לחלוקה", t):
cats.append("חלוקה")
# תכנית — plan-level appeal (primary subject). Allow ה/ב/ל prefixes on תכנית.
tochnit_opening = bool(re.search(
r"הפקדת\s+ה?תכנית|"
r"אישור\s+ה?תכנית|"
r"המלצה\s+להפקיד|"
r"להפקיד\s+את\s+ה?תכנית|"
r"לדון\s+בתכנית|"
r"דנה\s+בתכנית|"
r"החלטה\s+לאשר\s+ה?תכנית",
opening,
))
if tochnit_opening:
cats.append("תכנית")
# היתר — "בקשה להיתר" or "היתר בניה" as subject in opening
if re.search(r"בקשה\s+להיתר|היתר\s+בני(?:י)?ה", opening):
cats.append("היתר")
# בנייה — default/fallback for building-permit cases
# (not for plan-level תכנית-only cases)
has_permit_subject = "היתר" in cats or "הקלה" in cats or 'תמ"א 38' in cats
if has_permit_subject and "בנייה" not in cats:
cats.append("בנייה")
# If nothing matched, default to בנייה
return cats or ["בנייה"]
# ── Year fallback from date ──────────────────────────────────────
def finalize_decision_number(number: str | None, date_iso: str | None) -> str:
"""If filename number is missing year, fill it from decision date."""
if not number:
if date_iso:
# Extract last 2 digits of Hebrew year via Gregorian year
return f"??/{date_iso[2:4]}"
return ""
if number.endswith("/??"):
if date_iso:
yy = date_iso[2:4]
return number.replace("/??", f"/{yy}")
return number.replace("/??", "")
return number
# ── Main metadata extraction ─────────────────────────────────────
def extract_metadata(path: Path) -> dict:
text = path.read_text(encoding="utf-8")
num_from_name, _ = decision_number_from_filename(path.stem)
date_iso = decision_date_from_text(text)
decision_number = finalize_decision_number(num_from_name, date_iso)
cats = categorize(text)
meta = {
"file": path.name,
"decision_number": decision_number,
"decision_date": date_iso or "??",
"categories": cats,
"chars": len(text),
}
# Apply manual overrides
if path.name in METADATA_OVERRIDES:
meta.update(METADATA_OVERRIDES[path.name])
return meta
def print_preview(results: list[dict]) -> None:
"""Print review table of metadata for all files."""
print(f"\n{'#':<3} {'FILE':<55} {'NUMBER':<15} {'DATE':<12} {'CATEGORIES'}")
print("-" * 130)
for i, r in enumerate(results, 1):
file_short = r["file"] if len(r["file"]) <= 53 else r["file"][:50] + "..."
cats = ", ".join(r["categories"])
print(f"{i:<3} {file_short:<55} {r['decision_number']:<15} {r['decision_date']:<12} {cats}")
print()
# Highlight issues
issues = [r for r in results if r["decision_date"] == "??" or not r["decision_number"] or "??" in r["decision_number"]]
if issues:
print(f"⚠️ {len(issues)} files with incomplete metadata:")
for r in issues:
print(f" - {r['file']} → number={r['decision_number']!r} date={r['decision_date']!r}")
# ── Upload ───────────────────────────────────────────────────────
async def upload_one(meta: dict) -> dict:
from legal_mcp.tools.documents import document_upload_training
path = PROOFREAD_DIR / meta["file"]
result = await document_upload_training(
file_path=str(path),
decision_number=meta["decision_number"],
decision_date=meta["decision_date"] if meta["decision_date"] != "??" else "",
subject_categories=meta["categories"],
title=path.stem,
)
return {"file": meta["file"], "result": result}
async def upload_all(results: list[dict]) -> None:
for i, meta in enumerate(results, 1):
try:
r = await upload_one(meta)
print(f"[{i}/{len(results)}] ✓ {meta['file']}")
print(f" {r['result'][:200]}")
except Exception as e:
print(f"[{i}/{len(results)}] ✗ {meta['file']}: {e}")
# ── CLI ──────────────────────────────────────────────────────────
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--preview", action="store_true", help="Show metadata table without uploading")
ap.add_argument("--upload", action="store_true", help="Upload all files to style corpus")
ap.add_argument("--only", help="Only process this specific filename")
args = ap.parse_args()
files = sorted(PROOFREAD_DIR.glob("*.md"))
files = [f for f in files if f.name not in SKIP_FILES]
if args.only:
files = [f for f in files if f.name == args.only]
if not files:
print(f"File not found: {args.only}")
return 1
results = [extract_metadata(f) for f in files]
if args.preview or not args.upload:
print_preview(results)
if not args.upload:
return 0
if args.upload:
print(f"\n>>> Uploading {len(results)} files to style corpus...\n")
asyncio.run(upload_all(results))
return 0
if __name__ == "__main__":
sys.exit(main())