feat(plans): מרשם-תכניות קנוני (V38) + נוסח-ציטוט אחיד דטרמיניסטי לבלוק ט
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 4s
Lint — undefined names / undefined-names (pull_request) Successful in 10s

מוסיף ישות קנונית לתכניות בניין-עיר (תב"ע) שחוזרות בין תיקים — SSOT לזהות+תוקף
(פרסום למתן תוקף ברשומות + מס' ילקוט-הפרסומים) + משפט-ייעוד — במקום גזירה-מחדש
מהשומות בכל תיק. בלוק ט מצטט את התוקף בנוסח אחיד דטרמיניסטי (format_plan_citation),
כך שתאריך-פרסום/מס'-ילקוט לעולם לא מהוזים ע"י ה-LLM.

- DB: טבלת plans (V38) + CRUD + _normalize_plan_number (G1) + format_plan_citation;
  upsert idempotent (G3) עם כלל-מיזוג: תוקף מאושר לא נדרס — סתירה נרשמת ב-discrepancies
  (G10 / אין בליעה שקטה).
- services/plans_extractor.py: חילוץ עובדתי (claude CLI מקומי) → pending_review.
- block_writer.py: _build_plans_registry_context מזריק משפטי-ציטוט מאושרים בלבד לבלוק ט;
  תכניות חסרות/לא-מאושרות מסומנות במפורש (לא נבלעות).
- tools/plans.py + server.py: extract_plans / plan_get / plan_search / plan_list /
  plan_upsert / plan_review (שער-יו"ר G10), עם extract/get-symmetry (X9).
- scripts/backfill_plans_registry.py: ייבוא מקורפוס-ההחלטות (טיוטות + סופיי-דפנה).
- docs: block-schema (בלוק ט), SKILL, spec 02-data-model + 04.

Invariants: G1/INV-DM2/X1 (מזהה מנורמל בכתיבה) · G2/INV-DM6 (מקור-אמת יחיד, appraiser_facts
ללא שינוי) · G3 (upsert) · INV-DM4/G9 (provenance) · INV-DM5/G10 (review_status) ·
INV-AH (ציטוט דטרמיניסטי) · G5 (lookup לא קורפוס) · G11/block-schema (נוסח-הציטוט) · X9.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-14 13:46:26 +00:00
parent 83293ca619
commit 4be9cf8543
11 changed files with 929 additions and 2 deletions

View File

@@ -1511,6 +1511,54 @@ SCHEMA_V37_SQL = """
ALTER TABLE drain_controls ADD COLUMN IF NOT EXISTS burst_until TIMESTAMPTZ;
"""
SCHEMA_V38_SQL = """
-- plans: canonical registry of planning schemes (תכניות בניין-עיר) reused across
-- cases. SSOT for a plan's IDENTITY + VALIDITY (date published למתן תוקף ברשומות +
-- ילקוט-הפרסומים number) + one-line purpose. DISTINCT from appraiser_facts, which
-- stays the per-appraiser, per-case factual snapshot (G2 — no parallel path): the
-- same plan recurs across many cases, so its validity lives here ONCE rather than
-- being re-derived from the appraisals every time. LLM-extracted rows enter
-- 'pending_review' and are NOT used in writing until the chair approves
-- (INV-DM5/G10 — mirrors halachot.review_status). The approved validity feeds
-- block-tet's DETERMINISTIC citation sentence (format_plan_citation), so dates are
-- never hallucinated by the writer (INV-AH). Identity = the normalized plan_number
-- (G1/INV-DM2); display_name/citation_formatted are derived display fields, never
-- the key.
CREATE TABLE IF NOT EXISTS plans (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
plan_number TEXT NOT NULL UNIQUE, -- canonical, normalized at write (G1)
display_name TEXT NOT NULL DEFAULT '', -- surface form: "תכנית מי/820"
aliases TEXT[] NOT NULL DEFAULT '{}', -- other surface forms seen
plan_type TEXT NOT NULL DEFAULT '', -- ארצית|מחוזית|מקומית|מפורטת|כוללנית|''
gazette_date DATE, -- פורסמה למתן תוקף ברשומות ביום…
yalkut_number TEXT NOT NULL DEFAULT '', -- מס' ילקוט הפרסומים (י"פ)
purpose TEXT NOT NULL DEFAULT '', -- one-line ייעוד
citation_formatted TEXT NOT NULL DEFAULT '', -- rendered canonical sentence (derived)
review_status TEXT NOT NULL DEFAULT 'pending_review'
CHECK (review_status IN ('pending_review', 'approved', 'rejected')), -- G10 gate
-- provenance (INV-DM4/G9): where this record was learned from + by what model
source_case_number TEXT NOT NULL DEFAULT '',
source_document_id UUID REFERENCES documents(id) ON DELETE SET NULL,
model_used TEXT NOT NULL DEFAULT '',
-- discrepancies: validity values that DIFFER from an already-approved row,
-- captured for chair adjudication instead of silently overwriting (חוקה §6 —
-- אין בליעה שקטה). Shape: [{field, old, new, source_case_number}].
discrepancies JSONB NOT NULL DEFAULT '[]',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- lookup-only tsvector (G5 — this is a structured registry, NOT a 4th retrieval
-- corpus; no embeddings). GENERATED → auto-maintained on content change (INV-DM3).
meta_tsv tsvector GENERATED ALWAYS AS (
to_tsvector('simple',
coalesce(plan_number, '') || ' ' ||
coalesce(display_name, '') || ' ' ||
coalesce(purpose, ''))
) STORED
);
CREATE INDEX IF NOT EXISTS idx_plans_review ON plans(review_status);
CREATE INDEX IF NOT EXISTS idx_plans_meta_tsv ON plans USING gin(meta_tsv);
"""
# Stable, arbitrary key for the session-level advisory lock that serialises
# schema DDL across processes. Every short-lived process (cron drains, services)
@@ -1529,7 +1577,7 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await _apply_schema_ddl(conn)
finally:
await conn.execute("SELECT pg_advisory_unlock($1)", _MIGRATION_LOCK_KEY)
logger.info("Database schema initialized (v1-v37)")
logger.info("Database schema initialized (v1-v38)")
async def _apply_schema_ddl(conn: asyncpg.Connection) -> None:
@@ -1571,6 +1619,7 @@ async def _apply_schema_ddl(conn: asyncpg.Connection) -> None:
await conn.execute(SCHEMA_V35_SQL)
await conn.execute(SCHEMA_V36_SQL)
await conn.execute(SCHEMA_V37_SQL)
await conn.execute(SCHEMA_V38_SQL)
async def init_schema() -> None:
@@ -3375,6 +3424,280 @@ async def detect_appraiser_conflicts(case_id: UUID) -> list[dict]:
return conflicts
# ── Plans registry (V38) ──────────────────────────────────────────
# Canonical registry of planning schemes (תכניות). SSOT for a plan's identity +
# validity, reused across cases (G2). See SCHEMA_V38_SQL for the data contract.
def _normalize_plan_number(raw: str) -> str:
"""Canonical write-time form of a plan identifier (G1/INV-DM2; mirrors X1).
Deterministic and format-only — does NOT invent or drop data:
trim · unify Hebrew gershayim (״/‴ → ") · strip a leading bare scheme word
("תכנית"/"תוכנית") · collapse whitespace around '/' and runs of spaces.
NOTE: תמ"א / תב"ע are KEPT — the scheme acronym is part of the identifier; only
the generic lead word "תכנית"/"תוכנית" is stripped. The full surface form
(including "תכנית") lives in display_name, never in the key.
"""
s = (raw or "").strip()
s = s.replace("״", '"').replace("", '"') # gershayim → ASCII "
s = re.sub(r"^(?:תכנית|תוכנית)\s+", "", s)
s = re.sub(r"\s*/\s*", "/", s)
s = " ".join(s.split())
return s
def _coerce_plan_date(v) -> date | None:
"""Accept a date/datetime/ISO-string/DD.MM.YYYY and return a date (or None).
The extractors emit ISO (YYYY-MM-DD); DD.MM.YYYY / DD/MM/YYYY is tolerated for
safety. Unparseable → None (the validity clause is simply omitted — never guessed).
"""
if not v:
return None
if isinstance(v, datetime):
return v.date()
if isinstance(v, date):
return v
s = str(v).strip()
if not s:
return None
try:
return date.fromisoformat(s[:10])
except ValueError:
pass
m = re.match(r"^\s*(\d{1,2})[./](\d{1,2})[./](\d{4})\s*$", s)
if m:
d, mo, y = (int(g) for g in m.groups())
try:
return date(y, mo, d)
except ValueError:
return None
return None
def format_plan_citation(plan: dict) -> str:
"""Render the canonical block-tet citation sentence from a plan record.
DETERMINISTIC — the identity+validity clause is built from stored fields, never
by an LLM (INV-AH). Pattern (chair-approved, corpus-derived, רשומות-only):
{display} פורסמה למתן תוקף ברשומות ביום {D.M.YYYY}[, י"פ {yalkut}][ — {purpose}].
Returns '' when there is neither a display name nor a plan number.
"""
name = (plan.get("display_name") or "").strip() or (plan.get("plan_number") or "").strip()
if not name:
return ""
sentence = name
gd = _coerce_plan_date(plan.get("gazette_date"))
if gd:
clause = f"פורסמה למתן תוקף ברשומות ביום {gd.day}.{gd.month}.{gd.year}"
yalkut = (plan.get("yalkut_number") or "").strip()
if yalkut:
clause += f', י"פ {yalkut}'
sentence = f"{name} {clause}"
purpose = (plan.get("purpose") or "").strip()
if purpose:
sentence += f"{purpose}"
if not sentence.endswith("."):
sentence += "."
return sentence
def _plan_row_to_dict(row) -> dict | None:
if row is None:
return None
d = dict(row)
d["id"] = str(d["id"])
if d.get("source_document_id"):
d["source_document_id"] = str(d["source_document_id"])
if isinstance(d.get("discrepancies"), str):
d["discrepancies"] = json.loads(d["discrepancies"])
gd = d.get("gazette_date")
d["gazette_date"] = gd.isoformat() if gd else None
d["aliases"] = list(d.get("aliases") or [])
d.pop("meta_tsv", None)
return d
async def upsert_plan(
plan_number: str,
display_name: str = "",
aliases: list[str] | None = None,
plan_type: str = "",
gazette_date=None,
yalkut_number: str = "",
purpose: str = "",
review_status: str = "pending_review",
source_case_number: str = "",
source_document_id: UUID | None = None,
model_used: str = "",
) -> dict:
"""Idempotent upsert of a plan, keyed on the normalized plan_number.
G3 (idempotent) + G1 (normalize at write). Merge rule (G10 / no-silent-swallow):
• new plan → insert as given (LLM extractions arrive 'pending_review').
• existing APPROVED → NEVER overwrite validity; add unseen aliases and record any
DIFFERING validity value into `discrepancies` for the chair to adjudicate.
• existing PENDING/REJECTED → fill empty fields from the new data (merge-up).
Returns the resulting plan dict.
"""
if review_status not in ("pending_review", "approved", "rejected"):
raise ValueError(f"review_status לא חוקי: {review_status}")
num = _normalize_plan_number(plan_number)
if not num:
raise ValueError("plan_number ריק לאחר נרמול")
gd = _coerce_plan_date(gazette_date)
display_name = (display_name or "").strip() or num
incoming_aliases = [a.strip() for a in (aliases or []) if a and a.strip()]
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
ex = await conn.fetchrow(
"SELECT * FROM plans WHERE plan_number = $1 FOR UPDATE", num,
)
if ex is None:
merged_aliases = sorted({
a for a in incoming_aliases if a not in (num, display_name)
})
citation = format_plan_citation({
"display_name": display_name, "plan_number": num,
"gazette_date": gd, "yalkut_number": yalkut_number, "purpose": purpose,
})
row = await conn.fetchrow(
"""INSERT INTO plans
(plan_number, display_name, aliases, plan_type, gazette_date,
yalkut_number, purpose, citation_formatted, review_status,
source_case_number, source_document_id, model_used)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)
RETURNING *""",
num, display_name, merged_aliases, plan_type or "", gd,
yalkut_number or "", purpose or "", citation, review_status,
source_case_number or "", source_document_id, model_used or "",
)
return _plan_row_to_dict(row)
ex = dict(ex)
# Always accumulate newly-seen surface forms.
seen = set(ex.get("aliases") or [])
for a in incoming_aliases + [display_name]:
if a and a not in (num, ex.get("display_name") or ""):
seen.add(a)
merged_aliases = sorted(seen)
if ex["review_status"] == "approved":
# Do not touch chair-approved validity. Record any conflicting value.
disc = ex.get("discrepancies") or []
if isinstance(disc, str):
disc = json.loads(disc)
checks = (
("gazette_date", gd.isoformat() if gd else "",
ex["gazette_date"].isoformat() if ex["gazette_date"] else ""),
("yalkut_number", (yalkut_number or "").strip(), ex["yalkut_number"] or ""),
("purpose", (purpose or "").strip(), ex["purpose"] or ""),
)
for field, newval, oldval in checks:
if newval and newval != oldval:
disc.append({
"field": field, "old": oldval, "new": newval,
"source_case_number": source_case_number or "",
})
row = await conn.fetchrow(
"""UPDATE plans SET aliases = $2, discrepancies = $3, updated_at = now()
WHERE id = $1 RETURNING *""",
ex["id"], merged_aliases,
json.dumps(disc, ensure_ascii=False),
)
return _plan_row_to_dict(row)
# pending / rejected → merge-up empty fields, keep existing review_status.
new_display = ex["display_name"] or display_name
new_type = ex["plan_type"] or plan_type or ""
new_gd = ex["gazette_date"] or gd
new_yalkut = ex["yalkut_number"] or yalkut_number or ""
new_purpose = ex["purpose"] or purpose or ""
citation = format_plan_citation({
"display_name": new_display, "plan_number": num,
"gazette_date": new_gd, "yalkut_number": new_yalkut, "purpose": new_purpose,
})
row = await conn.fetchrow(
"""UPDATE plans SET
display_name = $2, aliases = $3, plan_type = $4, gazette_date = $5,
yalkut_number = $6, purpose = $7, citation_formatted = $8,
source_case_number = COALESCE(NULLIF($9, ''), source_case_number),
source_document_id = COALESCE($10, source_document_id),
model_used = COALESCE(NULLIF($11, ''), model_used),
updated_at = now()
WHERE id = $1 RETURNING *""",
ex["id"], new_display, merged_aliases, new_type, new_gd,
new_yalkut, new_purpose, citation,
source_case_number or "", source_document_id, model_used or "",
)
return _plan_row_to_dict(row)
async def get_plan_by_number(plan_number: str) -> dict | None:
"""Look up a plan by normalized number; falls back to an alias match."""
num = _normalize_plan_number(plan_number)
if not num:
return None
pool = await get_pool()
row = await pool.fetchrow("SELECT * FROM plans WHERE plan_number = $1", num)
if row is None:
row = await pool.fetchrow("SELECT * FROM plans WHERE $1 = ANY(aliases)", num)
return _plan_row_to_dict(row)
async def get_plan_by_id(plan_id: UUID) -> dict | None:
pool = await get_pool()
row = await pool.fetchrow("SELECT * FROM plans WHERE id = $1", plan_id)
return _plan_row_to_dict(row)
async def list_plans(review_status: str = "", limit: int = 500) -> list[dict]:
"""List plans, optionally filtered by review_status (backlog view)."""
pool = await get_pool()
if review_status:
rows = await pool.fetch(
"SELECT * FROM plans WHERE review_status = $1 ORDER BY updated_at DESC LIMIT $2",
review_status, limit,
)
else:
rows = await pool.fetch(
"SELECT * FROM plans ORDER BY review_status, updated_at DESC LIMIT $1", limit,
)
return [_plan_row_to_dict(r) for r in rows]
async def search_plans(query: str, limit: int = 20) -> list[dict]:
"""Fuzzy lookup by number / display name / purpose (ILIKE + tsvector)."""
q = (query or "").strip()
if not q:
return []
pool = await get_pool()
pattern = f"%{q}%"
rows = await pool.fetch(
"""SELECT * FROM plans
WHERE plan_number ILIKE $1 OR display_name ILIKE $1 OR purpose ILIKE $1
OR meta_tsv @@ plainto_tsquery('simple', $2)
ORDER BY review_status, updated_at DESC LIMIT $3""",
pattern, q, limit,
)
return [_plan_row_to_dict(r) for r in rows]
async def set_plan_review_status(plan_id: UUID, status: str) -> dict | None:
"""Chair gate (G10): approve / reject / reset a plan record."""
if status not in ("pending_review", "approved", "rejected"):
raise ValueError(f"review_status לא חוקי: {status}")
pool = await get_pool()
row = await pool.fetchrow(
"UPDATE plans SET review_status = $2, updated_at = now() WHERE id = $1 RETURNING *",
plan_id, status,
)
return _plan_row_to_dict(row)
# ── V7: External precedent library + halachot ─────────────────────