feat: Stage A finalizers + #35/#36/#37 — critical-gap closure
Some checks failed
Build & Deploy / build-and-deploy (push) Has been cancelled

Four parallel sub-agents closed the remaining critical gaps from the
26/05 Stage A/B sprint. Each block independently tested; aggregated here.

## #30/#31 finalizers (sub-agent A)
* Auto-derive practice_area in case_create from case_number prefix
  (1xxx→rishuy_uvniya, 8xxx→betterment_levy, 9xxx→compensation_197);
  default for CaseCreateRequest is now "" (the DB constraint catches
  any stray "appeals_committee").
* practice_area.py: derive_subtype now handles axis-B domain values
  (rishuy_uvniya/betterment_levy/compensation_197) without parsing the
  case number; new helper derive_domain_practice_area().
* Halacha re-extraction verified unnecessary — all 6 reclassified
  records already had is_binding=false and approved halachot.
* Regression tests: 6 cases in tests/test_corpus_constraints.py
  covering practice_area enum, internal-committee chair/district,
  external-upload arar prefix, MCP guard.
* UI: district input → Select dropdown (7 districts) in
  precedent-edit-sheet.tsx, preserving legacy free-text values.

## #37 בל"מ subtypes (sub-agent B)
* 3 new appeal_subtypes: extension_request_{building_permit,
  betterment_levy,compensation}. APPEALS_COMMITTEE_SUBTYPES extended,
  SUBTYPES_BY_AREA mappings added.
* New helpers: is_blam_subject(), is_blam_subtype(),
  derive_subtype_with_blam(case_number, subject, practice_area).
  case_create now uses it to auto-detect "בקשה להארכת מועד" subjects.
* 3 methodology templates under docs/methodology/extension-request-*.md.
* paperclip_client.py mapping updated for the 3 new subtypes
  (extension_request_building_permit→CMP, the other two→CMPA).
* Frontend: bilingual "בל"מ" badge + filter dropdown on cases list +
  detail header; appeal-type-bars collapseBlam() merges בל"מ into its
  parent domain for aggregate bars.
* Wizard auto-detects בל"מ from subject during case creation.
* 3 Berlinger cases (1017/1018/1019-03-26) migrated to
  appeal_subtype=extension_request_building_permit via psql.

## #35 missing_precedents feature (sub-agent C)
* Schema V13: missing_precedents table (citation, case_id, party,
  legal_topic, status, linked_case_law_id, claim_quote, ...) +
  FK constraints + 3 indexes. Applied via psql + idempotent migration.
* 6 db.py service functions, 3 MCP tools, 6 FastAPI endpoints
  (POST/GET/PATCH/DELETE/upload — upload routes by citation prefix
  to ingest_internal_decision or ingest_precedent).
* Next.js page /missing-precedents with 5 status tabs + filters +
  sidebar badge counter + detail drawer with metadata edit + smart
  upload form that switches fields per committee/court.
* Bootstrap: 7 rows imported from the JSON file
  (3 citations × cases, all status=closed with linked_case_law_id).
* legal-researcher.md: new §2ב.5 with missing_precedent_create
  usage + dedup semantics + tool grant.

## #36 legal_arguments aggregation (sub-agent D)
* Schema V14: legal_arguments + legal_argument_propositions M:M.
  Applied via psql.
* New service argument_aggregator.py with two functions —
  aggregate_claims_to_arguments() (Claude CLI / claude_session) and
  get_legal_arguments(). Graceful llm_unavailable handling when CLI
  is missing (containers).
* 2 MCP tools + 2 API endpoints (POST .../aggregate-arguments as
  BackgroundTask, GET .../legal-arguments).
* Frontend: shadcn Accordion + new legal-arguments-panel.tsx with
  hierarchical (party → priority badge → arguments) display, "טיעונים"
  tab on the case page, "חשב/חשב מחדש" buttons.
* scripts/backfill_legal_arguments.py + SCRIPTS.md entry — dry-run
  found 8 candidate cases including 1017/1018/1019.

## Open follow-ups (intentionally deferred)
* npm run api:types in web-ui (CLAUDE.md flow) — recommended before
  the next UI commit; not required for backend deployment.
* Run backfill_legal_arguments.py --apply once the container picks up
  the new aggregator service.
* webhook on missing-precedents upload-close to Paperclip (optional).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-26 08:34:40 +00:00
parent af651d0135
commit f3cc9ca9d4
33 changed files with 4588 additions and 37 deletions

View File

@@ -0,0 +1,358 @@
"""כינוס פרופוזיציות לטיעונים משפטיים מובחנים — argument de-duplication.
Workflow:
1. ``claims_extractor`` extracts ~20-30 raw propositions per litigation
brief into the ``claims`` table.
2. This module groups those raw propositions, per party, into 6-12
distinct legal arguments via Claude headless (`claude_session`).
3. The result is stored in ``legal_arguments`` plus ``legal_argument_
propositions`` (M:M join) so we keep traceability back to the source
claims.
Manually de-duping 184 propositions in 3 cases yielded 82 arguments
(~24/case) — see ``data/cases/{1017,1018,1019}-03-26/documents/research/
legal-arguments.md`` for the gold standard.
**Architectural constraint**: ``claude_session`` only works from the local
MCP server (Claude CLI is not installed in the FastAPI container). Calls
from ``web/`` must go through MCP tools; calls from MCP tools land here
directly.
"""
from __future__ import annotations
import json
import logging
from uuid import UUID
from legal_mcp.services import claude_session, db
logger = logging.getLogger(__name__)
# Allowed enum values mirror the DB CHECK constraints.
ALLOWED_PARTIES = {"appellant", "respondent", "committee", "permit_applicant", "unknown"}
ALLOWED_PRIORITIES = {"threshold", "substantive", "procedural", "relief"}
# Hebrew labels for the prompt (Claude needs context in the same
# language as the source material).
PARTY_LABELS_HE = {
"appellant": "עוררים",
"respondent": "משיבים",
"committee": "ועדה מקומית",
"permit_applicant": "מבקשי היתר",
"unknown": "צד לא מזוהה",
}
AGGREGATE_PROMPT_TEMPLATE = """אתה מנתח כתבי טענות בתחום תכנון ובנייה (ועדת ערר).
לפניך {n} פרופוזיציות גולמיות שחולצו ממסמכי {party_he} בתיק ערר.
מטרתך: לקבץ אותן ל-{target_min}-{target_max} **טיעונים משפטיים מובחנים**
(ארגומנטים אמיתיים, לא חזרה מילולית של הפרופוזיציות).
## כללי איגוד:
1. **טיעון אמיתי = רעיון משפטי אחד** — לא רשימה של פרופוזיציות, אלא טענה משפטית עצמאית.
2. **מקבצים פרופוזיציות שתומכות באותו רעיון משפטי** — גם אם הניסוח שלהן שונה.
3. **מפרידים בין סוגי טענות**:
- **threshold** = טענות סף (זכות עמידה, סמכות, מועדים, שיהוי)
- **substantive** = טענות מהותיות (תחולת חוק, פרשנות, חישוב)
- **procedural** = פגמי הליך (פרסום, פרוטוקול, ניגוד עניינים)
- **relief** = סעדים מבוקשים / סיכומים
4. **כותרת קצרה ובהירה** — תיאורית, לא משפטית מפורטת. 5-15 מילים.
5. **גוף הטיעון בפסקה אחת** — 3-7 שורות עברית, נאמן למקור.
6. **שמירת ה-claim_ids המקוריים** — לכל טיעון, רשום אילו פרופוזיציות תומכות בו.
## פלט:
החזר JSON בלבד (ללא markdown, ללא הסברים), array של אובייקטים:
```
[
{{
"title": "כותרת קצרה של הטיעון",
"body": "גוף הטיעון בפסקה אחת",
"topic": "סוגיה משפטית קצרה (לדוגמה: 'זכות עמידה', 'תחולת תמ\\"א 38')",
"priority": "threshold|substantive|procedural|relief",
"claim_ids": ["uuid-1", "uuid-2"]
}}
]
```
## הפרופוזיציות:
{propositions_json}
"""
def _build_prompt(party: str, propositions: list[dict]) -> str:
"""Compose the per-party aggregation prompt."""
n = len(propositions)
# Conservative target: ~1 argument per 2-3 propositions, clamped 4-12.
target_min = max(4, n // 4)
target_max = max(target_min + 1, min(12, n // 2 + 1))
party_he = PARTY_LABELS_HE.get(party, party)
# Strip noise from propositions for the prompt — Claude only needs
# the id and the text to do the grouping.
compact = [
{"id": str(p["id"]), "text": p["claim_text"]}
for p in propositions
]
propositions_json = json.dumps(compact, ensure_ascii=False, indent=2)
return AGGREGATE_PROMPT_TEMPLATE.format(
n=n,
party_he=party_he,
target_min=target_min,
target_max=target_max,
propositions_json=propositions_json,
)
def _normalize_argument(raw: dict, fallback_topic: str = "") -> dict | None:
"""Validate & normalize a single argument dict from Claude.
Returns None if the row is unusable (missing required fields).
"""
if not isinstance(raw, dict):
return None
title = (raw.get("title") or "").strip()
body = (raw.get("body") or "").strip()
if not title or not body:
return None
priority = raw.get("priority", "substantive")
if priority not in ALLOWED_PRIORITIES:
priority = "substantive"
topic = (raw.get("topic") or fallback_topic or "").strip() or None
claim_ids_raw = raw.get("claim_ids") or []
claim_ids: list[UUID] = []
if isinstance(claim_ids_raw, list):
for cid in claim_ids_raw:
try:
claim_ids.append(UUID(str(cid)))
except (ValueError, TypeError):
continue
return {
"title": title,
"body": body,
"topic": topic,
"priority": priority,
"claim_ids": claim_ids,
}
async def _aggregate_party(
party: str, propositions: list[dict],
) -> list[dict]:
"""Ask Claude to group one party's propositions; return normalized rows."""
if not propositions:
return []
prompt = _build_prompt(party, propositions)
try:
raw_result = await claude_session.query_json(prompt)
except RuntimeError as e:
# Surface CLI-unavailable specifically so the caller can report
# cleanly instead of crashing the whole job.
raise RuntimeError(
f"argument_aggregator: claude_session.query_json failed for party "
f"'{party}': {e}"
) from e
if not isinstance(raw_result, list):
logger.warning(
"argument_aggregator: Claude returned non-list (%s) for party '%s'",
type(raw_result).__name__, party,
)
return []
out: list[dict] = []
for entry in raw_result:
norm = _normalize_argument(entry)
if norm:
out.append(norm)
return out
async def aggregate_claims_to_arguments(
case_id: UUID, force: bool = False,
) -> dict:
"""For a given case, group existing claims into distinct legal arguments.
Args:
case_id: The case UUID.
force: If True, delete existing ``legal_arguments`` for the case
before aggregating. Otherwise short-circuit if any rows exist.
Returns:
A summary dict:
``{"status": "completed"|"skipped"|"no_claims"|"llm_unavailable",
"by_party": {party: count}, "total": int, "message": ...}``
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
existing = await conn.fetchval(
"SELECT COUNT(*) FROM legal_arguments WHERE case_id = $1",
case_id,
)
if existing and not force:
return {
"status": "skipped",
"message": f"Found {existing} existing arguments. Use force=True to re-run.",
"total": existing,
}
if force and existing:
await conn.execute(
"DELETE FROM legal_arguments WHERE case_id = $1", case_id,
)
# Pull all claims for this case, grouped by party.
rows = await conn.fetch(
"""SELECT id, party_role, claim_text, claim_index, source_document
FROM claims
WHERE case_id = $1
ORDER BY party_role, claim_index""",
case_id,
)
if not rows:
return {
"status": "no_claims",
"message": "No claims found for this case. Run extract_claims first.",
"total": 0,
}
# Group propositions by party.
by_party: dict[str, list[dict]] = {}
for r in rows:
party = r["party_role"]
# Map deprecated 'appraiser' or unknown labels to 'unknown'.
if party not in ALLOWED_PARTIES:
party = "unknown"
by_party.setdefault(party, []).append(dict(r))
party_counts: dict[str, int] = {}
inserted = 0
errors: list[str] = []
for party, props in by_party.items():
try:
arguments = await _aggregate_party(party, props)
except RuntimeError as e:
# Most likely cause: Claude CLI not installed (running from
# the container). Don't crash — record the gap and continue.
msg = str(e)
if "Claude CLI not found" in msg:
return {
"status": "llm_unavailable",
"message": (
"Claude CLI not available. This service must run from "
"the local MCP server (not the FastAPI container)."
),
"total": 0,
}
errors.append(f"{party}: {msg}")
continue
if not arguments:
party_counts[party] = 0
continue
async with pool.acquire() as conn:
async with conn.transaction():
for idx, arg in enumerate(arguments):
arg_id = await conn.fetchval(
"""INSERT INTO legal_arguments
(case_id, party, argument_index, argument_title,
argument_body, legal_topic, priority)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id""",
case_id,
party,
idx + 1,
arg["title"],
arg["body"],
arg["topic"],
arg["priority"],
)
for cid in arg["claim_ids"]:
try:
await conn.execute(
"""INSERT INTO legal_argument_propositions
(argument_id, claim_id)
VALUES ($1, $2)
ON CONFLICT DO NOTHING""",
arg_id, cid,
)
except Exception as e: # noqa: BLE001
# Likely FK violation if the LLM hallucinated
# a claim_id. Log and continue.
logger.warning(
"argument_aggregator: skipped bad claim_id %s for arg %s: %s",
cid, arg_id, e,
)
inserted += 1
party_counts[party] = len(arguments)
result: dict = {
"status": "completed",
"total": inserted,
"by_party": party_counts,
"propositions_processed": len(rows),
}
if errors:
result["errors"] = errors
result["status"] = "completed_with_errors"
return result
async def get_legal_arguments(
case_id: UUID, party: str = "",
) -> list[dict]:
"""Return aggregated legal arguments for a case, optionally filtered by party.
Each row includes ``supporting_claims`` (list of source claim_ids).
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
if party and party in ALLOWED_PARTIES:
rows = await conn.fetch(
"""SELECT id, case_id, party, argument_index, argument_title,
argument_body, legal_topic, priority, cited_precedents,
created_at, updated_at
FROM legal_arguments
WHERE case_id = $1 AND party = $2
ORDER BY priority, argument_index""",
case_id, party,
)
else:
rows = await conn.fetch(
"""SELECT id, case_id, party, argument_index, argument_title,
argument_body, legal_topic, priority, cited_precedents,
created_at, updated_at
FROM legal_arguments
WHERE case_id = $1
ORDER BY party, priority, argument_index""",
case_id,
)
# Pull supporting claim ids for each argument in one round-trip.
arg_ids = [r["id"] for r in rows]
supporting: dict[UUID, list[str]] = {}
if arg_ids:
joins = await conn.fetch(
"""SELECT argument_id, claim_id
FROM legal_argument_propositions
WHERE argument_id = ANY($1::uuid[])""",
arg_ids,
)
for j in joins:
supporting.setdefault(j["argument_id"], []).append(str(j["claim_id"]))
out: list[dict] = []
for r in rows:
d = dict(r)
d["id"] = str(d["id"])
d["case_id"] = str(d["case_id"])
d["supporting_claims"] = supporting.get(r["id"], [])
out.append(d)
return out

View File

@@ -745,6 +745,84 @@ CREATE INDEX IF NOT EXISTS idx_halachot_tsv
"""
# ── V13: Missing precedents log ───────────────────────────────────
# Track citations that the parties brought up but which are NOT yet in
# the precedent_library. Created by the researcher (auto or chair)
# whenever a citation can't be found in the corpus; closed by uploading
# the actual decision via internal_decision_upload or
# precedent_library_upload, at which point linked_case_law_id points to
# the new case_law row and status flips to 'closed'.
SCHEMA_V13_SQL = """
CREATE TABLE IF NOT EXISTS missing_precedents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
citation TEXT NOT NULL,
case_name TEXT,
cited_in_case_id UUID REFERENCES cases(id) ON DELETE CASCADE,
cited_in_document_id UUID REFERENCES documents(id) ON DELETE SET NULL,
cited_by_party TEXT CHECK (cited_by_party IN (
'appellant', 'respondent', 'committee', 'permit_applicant', 'unknown'
)),
cited_by_party_name TEXT,
legal_topic TEXT,
legal_issue TEXT,
claim_quote TEXT,
status TEXT DEFAULT 'open' CHECK (status IN (
'open', 'uploaded', 'closed', 'irrelevant'
)),
linked_case_law_id UUID REFERENCES case_law(id) ON DELETE SET NULL,
closed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
notes TEXT
);
CREATE INDEX IF NOT EXISTS idx_missing_precedents_case
ON missing_precedents(cited_in_case_id);
CREATE INDEX IF NOT EXISTS idx_missing_precedents_status
ON missing_precedents(status);
CREATE INDEX IF NOT EXISTS idx_missing_precedents_citation
ON missing_precedents(citation);
"""
# ── V14: Legal arguments (aggregated propositions) ────────────────
# After ``claims_extractor`` extracts raw propositions (rows in ``claims``)
# the LLM-driven aggregator groups them into ~6-12 distinct legal arguments
# per party. ``legal_arguments`` holds the consolidated argument; the M:M
# join table ``legal_argument_propositions`` links back to the source
# propositions for traceability ("which raw claims feed this argument?").
SCHEMA_V14_SQL = """
CREATE TABLE IF NOT EXISTS legal_arguments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
case_id UUID NOT NULL REFERENCES cases(id) ON DELETE CASCADE,
party TEXT NOT NULL CHECK (party IN (
'appellant', 'respondent', 'committee', 'permit_applicant', 'unknown'
)),
argument_index INTEGER NOT NULL,
argument_title TEXT NOT NULL,
argument_body TEXT NOT NULL,
legal_topic TEXT,
priority TEXT DEFAULT 'substantive' CHECK (priority IN (
'threshold', 'substantive', 'procedural', 'relief'
)),
cited_precedents TEXT[],
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_legal_arguments_case
ON legal_arguments(case_id);
CREATE INDEX IF NOT EXISTS idx_legal_arguments_party
ON legal_arguments(case_id, party);
-- M:M back to ``claims`` (raw propositions).
CREATE TABLE IF NOT EXISTS legal_argument_propositions (
argument_id UUID NOT NULL REFERENCES legal_arguments(id) ON DELETE CASCADE,
claim_id UUID NOT NULL REFERENCES claims(id) ON DELETE CASCADE,
PRIMARY KEY (argument_id, claim_id)
);
"""
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
async with pool.acquire() as conn:
await conn.execute(SCHEMA_SQL)
@@ -760,7 +838,9 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await conn.execute(SCHEMA_V10_SQL)
await conn.execute(SCHEMA_V11_SQL)
await conn.execute(SCHEMA_V12_SQL)
logger.info("Database schema initialized (v1-v12)")
await conn.execute(SCHEMA_V13_SQL)
await conn.execute(SCHEMA_V14_SQL)
logger.info("Database schema initialized (v1-v14)")
async def init_schema() -> None:
@@ -782,7 +862,10 @@ async def create_case(
hearing_date: date | None = None,
notes: str = "",
expected_outcome: str = "",
practice_area: str = "appeals_committee",
# Default "" — DB CHECK constraint accepts empty, the upstream tool
# (cases.case_create) is responsible for deriving the domain value
# from the case_number prefix before calling here.
practice_area: str = "",
appeal_subtype: str = "",
) -> dict:
pool = await get_pool()
@@ -3106,3 +3189,228 @@ async def search_precedent_library_hybrid(
merged.append(d)
merged.sort(key=lambda x: -x["score"])
return merged[:limit]
# ── Missing precedents (V13) ───────────────────────────────────────
# Track citations from party briefs that aren't yet in the corpus.
# Lifecycle: 'open' → researcher logs gap → chair uploads decision
# → status='uploaded' (file ingested) → status='closed' (linked to
# case_law row). 'irrelevant' = chair decided the citation isn't worth
# adding to the library.
ALLOWED_MP_PARTIES = {
"appellant", "respondent", "committee", "permit_applicant", "unknown",
}
ALLOWED_MP_STATUS = {"open", "uploaded", "closed", "irrelevant"}
def _row_to_missing_precedent(row: asyncpg.Record) -> dict:
d = dict(row)
d["id"] = str(d["id"])
if d.get("cited_in_case_id") is not None:
d["cited_in_case_id"] = str(d["cited_in_case_id"])
if d.get("cited_in_document_id") is not None:
d["cited_in_document_id"] = str(d["cited_in_document_id"])
if d.get("linked_case_law_id") is not None:
d["linked_case_law_id"] = str(d["linked_case_law_id"])
return d
async def create_missing_precedent(
citation: str,
case_name: str | None = None,
cited_in_case_id: UUID | None = None,
cited_in_document_id: UUID | None = None,
cited_by_party: str | None = None,
cited_by_party_name: str | None = None,
legal_topic: str | None = None,
legal_issue: str | None = None,
claim_quote: str | None = None,
notes: str | None = None,
) -> dict:
"""Create a new missing-precedent row (status='open' by default)."""
if not citation.strip():
raise ValueError("citation is required")
if cited_by_party and cited_by_party not in ALLOWED_MP_PARTIES:
raise ValueError(
f"cited_by_party must be one of {sorted(ALLOWED_MP_PARTIES)}"
)
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""INSERT INTO missing_precedents (
citation, case_name, cited_in_case_id, cited_in_document_id,
cited_by_party, cited_by_party_name, legal_topic, legal_issue,
claim_quote, notes
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING *""",
citation.strip(), case_name, cited_in_case_id, cited_in_document_id,
cited_by_party, cited_by_party_name, legal_topic, legal_issue,
claim_quote, notes,
)
return _row_to_missing_precedent(row)
async def list_missing_precedents(
status: str | None = None,
case_id: UUID | None = None,
legal_topic: str | None = None,
limit: int = 200,
offset: int = 0,
) -> list[dict]:
"""List missing precedents, joining the cited-in case_number for display."""
pool = await get_pool()
conditions: list[str] = []
params: list = []
idx = 1
if status:
conditions.append(f"mp.status = ${idx}")
params.append(status)
idx += 1
if case_id:
conditions.append(f"mp.cited_in_case_id = ${idx}")
params.append(case_id)
idx += 1
if legal_topic:
conditions.append(f"mp.legal_topic ILIKE ${idx}")
params.append(f"%{legal_topic}%")
idx += 1
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
params.append(offset)
sql = f"""
SELECT mp.*,
c.case_number AS cited_in_case_number,
cl.case_number AS linked_case_law_number,
cl.case_name AS linked_case_law_name
FROM missing_precedents mp
LEFT JOIN cases c ON c.id = mp.cited_in_case_id
LEFT JOIN case_law cl ON cl.id = mp.linked_case_law_id
{where}
ORDER BY
CASE mp.status
WHEN 'open' THEN 0
WHEN 'uploaded' THEN 1
WHEN 'closed' THEN 2
WHEN 'irrelevant' THEN 3
END,
mp.created_at DESC
LIMIT ${idx} OFFSET ${idx + 1}
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [_row_to_missing_precedent(r) for r in rows]
async def get_missing_precedent(mp_id: UUID) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT mp.*,
c.case_number AS cited_in_case_number,
cl.case_number AS linked_case_law_number,
cl.case_name AS linked_case_law_name
FROM missing_precedents mp
LEFT JOIN cases c ON c.id = mp.cited_in_case_id
LEFT JOIN case_law cl ON cl.id = mp.linked_case_law_id
WHERE mp.id = $1
""",
mp_id,
)
return _row_to_missing_precedent(row) if row else None
async def update_missing_precedent(mp_id: UUID, **fields) -> dict | None:
"""Patch a missing-precedent row. Allowed fields: legal_topic,
legal_issue, notes, cited_by_party, cited_by_party_name, case_name,
status, linked_case_law_id, closed_at."""
if not fields:
return await get_missing_precedent(mp_id)
allowed = {
"legal_topic", "legal_issue", "notes", "cited_by_party",
"cited_by_party_name", "case_name", "status", "linked_case_law_id",
"closed_at", "claim_quote", "citation",
}
clean = {k: v for k, v in fields.items() if k in allowed}
if not clean:
return await get_missing_precedent(mp_id)
if "status" in clean and clean["status"] not in ALLOWED_MP_STATUS:
raise ValueError(
f"status must be one of {sorted(ALLOWED_MP_STATUS)}"
)
if "cited_by_party" in clean and clean["cited_by_party"] and \
clean["cited_by_party"] not in ALLOWED_MP_PARTIES:
raise ValueError(
f"cited_by_party must be one of {sorted(ALLOWED_MP_PARTIES)}"
)
set_clauses = []
values = []
for i, (key, val) in enumerate(clean.items(), start=2):
set_clauses.append(f"{key} = ${i}")
values.append(val)
set_clauses.append("updated_at = now()")
sql = (
f"UPDATE missing_precedents SET {', '.join(set_clauses)} "
f"WHERE id = $1 RETURNING *"
)
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(sql, mp_id, *values)
return _row_to_missing_precedent(row) if row else None
async def close_missing_precedent(
mp_id: UUID,
linked_case_law_id: UUID | None = None,
notes: str | None = None,
status: str = "closed",
) -> dict | None:
"""Mark a missing-precedent row as closed (or 'uploaded'/'irrelevant')
and link it to a case_law row if provided."""
if status not in ALLOWED_MP_STATUS:
raise ValueError(
f"status must be one of {sorted(ALLOWED_MP_STATUS)}"
)
pool = await get_pool()
async with pool.acquire() as conn:
sets = ["status = $2", "closed_at = now()", "updated_at = now()"]
params: list = [mp_id, status]
idx = 3
if linked_case_law_id is not None:
sets.append(f"linked_case_law_id = ${idx}")
params.append(linked_case_law_id)
idx += 1
if notes is not None:
sets.append(f"notes = ${idx}")
params.append(notes)
idx += 1
sql = (
f"UPDATE missing_precedents SET {', '.join(sets)} "
f"WHERE id = $1 RETURNING *"
)
row = await conn.fetchrow(sql, *params)
return _row_to_missing_precedent(row) if row else None
async def find_missing_precedent_by_citation(
citation: str,
case_id: UUID | None = None,
) -> dict | None:
"""Look up an existing row by citation string (exact match) and optionally
cited-in case_id. Used to deduplicate auto-creation by the researcher."""
pool = await get_pool()
async with pool.acquire() as conn:
if case_id is not None:
row = await conn.fetchrow(
"SELECT * FROM missing_precedents "
"WHERE citation = $1 AND cited_in_case_id = $2 LIMIT 1",
citation.strip(), case_id,
)
else:
row = await conn.fetchrow(
"SELECT * FROM missing_precedents WHERE citation = $1 LIMIT 1",
citation.strip(),
)
return _row_to_missing_precedent(row) if row else None

View File

@@ -52,16 +52,44 @@ DOMAIN_PRACTICE_AREAS: set[str] = {
"compensation_197",
}
# Union — what ``validate()`` accepts for backward-compat
PRACTICE_AREAS: set[str] = MULTI_TENANT_PRACTICE_AREAS | DOMAIN_PRACTICE_AREAS
# Union — what ``validate()`` accepts for backward-compat.
# Empty string is permitted because the DB CHECK constraint allows it as
# a "not yet classified" sentinel (e.g. when auto-derivation fails on an
# unrecognized case_number format).
PRACTICE_AREAS: set[str] = MULTI_TENANT_PRACTICE_AREAS | DOMAIN_PRACTICE_AREAS | {""}
APPEALS_COMMITTEE_SUBTYPES: set[str] = {
"building_permit",
"betterment_levy",
"compensation_197",
# בל"מ — בקשה להארכת מועד להגשת ערר. מסלולים נפרדים לפי domain:
"extension_request_building_permit", # 1xxx — סעיף 152, 30 ימים
"extension_request_betterment_levy", # 8xxx — סעיף 14 לתוספת ג', 45 ימים
"extension_request_compensation", # 9xxx — סעיף 198(ד), 30 ימים
"unknown",
}
# בל"מ subtypes — קל לזהות ע"י prefix
BLAM_SUBTYPES: set[str] = {
"extension_request_building_permit",
"extension_request_betterment_levy",
"extension_request_compensation",
}
# מיפוי domain → בל"מ subtype
_DOMAIN_TO_BLAM_SUBTYPE: dict[str, str] = {
"rishuy_uvniya": "extension_request_building_permit",
"betterment_levy": "extension_request_betterment_levy",
"compensation_197": "extension_request_compensation",
}
# מיפוי first-digit → בל"מ subtype (אותו מבנה כמו _APPEALS_COMMITTEE_DIGIT_TO_SUBTYPE)
_APPEALS_COMMITTEE_DIGIT_TO_BLAM = {
"1": "extension_request_building_permit",
"8": "extension_request_betterment_levy",
"9": "extension_request_compensation",
}
DEFAULT_PRACTICE_AREA = "appeals_committee"
# Subtypes per practice_area (extend when adding domains)
@@ -70,9 +98,11 @@ SUBTYPES_BY_AREA: dict[str, set[str]] = {
"national_insurance": {"unknown"},
"labor_law": {"unknown"},
# Domain values — subtype is implicit in the value itself
"rishuy_uvniya": {"building_permit", "unknown"},
"betterment_levy": {"betterment_levy", "unknown"},
"compensation_197": {"compensation_197", "unknown"},
"rishuy_uvniya": {"building_permit", "extension_request_building_permit", "unknown"},
"betterment_levy": {"betterment_levy", "extension_request_betterment_levy", "unknown"},
"compensation_197": {"compensation_197", "extension_request_compensation", "unknown"},
# Empty (unclassified) — allow any of the appeals_committee subtypes
"": APPEALS_COMMITTEE_SUBTYPES,
}
# Mapping: (multi_tenant_pa, appeal_subtype) → domain_pa
@@ -80,9 +110,39 @@ _SUBTYPE_TO_DOMAIN: dict[str, str] = {
"building_permit": "rishuy_uvniya",
"betterment_levy": "betterment_levy",
"compensation_197": "compensation_197",
"extension_request_building_permit": "rishuy_uvniya",
"extension_request_betterment_levy": "betterment_levy",
"extension_request_compensation": "compensation_197",
}
# Regex לזיהוי "בקשה להארכת מועד" בנושא הערר (subject) —
# וריאציות נפוצות. case-insensitive, מתחשב במרכאות חכמות/רגילות.
_BLAM_SUBJECT_PATTERNS = (
re.compile(r"בקשה\s+להארכת\s+מועד", re.IGNORECASE),
re.compile(r"בל[\"״״]מ", re.IGNORECASE), # בל"מ עם quote variants
re.compile(r"הארכת\s+מועד\s+להגשת", re.IGNORECASE),
)
def is_blam_subject(subject: str) -> bool:
"""True iff subject indicates a בל"מ (extension-of-time request).
מזהה: "בקשה להארכת מועד", "בל\"מ", "הארכת מועד להגשת..."
Examples:
>>> is_blam_subject("בל\"מ אלחנן ברלינגר נ' לינדאב")
True
>>> is_blam_subject("בקשה להארכת מועד להגשת ערר")
True
>>> is_blam_subject("היתר בנייה ברחוב X")
False
"""
if not subject:
return False
return any(p.search(subject) for p in _BLAM_SUBJECT_PATTERNS)
def to_db_practice_area(practice_area: str, appeal_subtype: str = "") -> str:
"""Convert a multi-tenant practice_area + appeal_subtype to the
domain value stored in DB columns (case_law/cases).
@@ -120,14 +180,28 @@ _CASE_NUM = re.compile(r"(?:ARAR[-\s]*\d{2}[-\s]*(?:\d{2}[-\s]*)?)(\d{4})", re.I
_PLAIN_NUM = re.compile(r"(\d{4})")
_DOMAIN_TO_SUBTYPE: dict[str, str] = {
"rishuy_uvniya": "building_permit",
"betterment_levy": "betterment_levy",
"compensation_197": "compensation_197",
}
def derive_subtype(case_number: str, practice_area: str = DEFAULT_PRACTICE_AREA) -> str:
"""Infer the appeal_subtype from case_number.
For appeals_committee, the convention is:
For appeals_committee (axis A), the convention is:
1xxx → building_permit, 8xxx → betterment_levy, 9xxx → compensation_197.
For domain values (axis B — rishuy_uvniya/betterment_levy/compensation_197),
the subtype is implicit in the practice_area itself — we map directly
without parsing the case number.
Handles multiple formats: ARAR-25-8126, 8126/25, 1170, ערר 1024-25.
"""
# Axis B: practice_area is already a domain value — map directly.
if practice_area in DOMAIN_PRACTICE_AREAS:
return _DOMAIN_TO_SUBTYPE.get(practice_area, "unknown")
if practice_area != "appeals_committee":
return "unknown"
cn = case_number or ""
@@ -142,6 +216,82 @@ def derive_subtype(case_number: str, practice_area: str = DEFAULT_PRACTICE_AREA)
return _APPEALS_COMMITTEE_DIGIT_TO_SUBTYPE.get(first_digit, "unknown")
def derive_subtype_with_blam(
case_number: str,
subject: str = "",
practice_area: str = DEFAULT_PRACTICE_AREA,
) -> str:
"""Like ``derive_subtype()`` but also detects בל"מ from the subject.
If ``subject`` indicates a בקשה להארכת מועד, the returned subtype is
one of the ``extension_request_*`` values (chosen per case_number /
practice_area). Otherwise behaviour matches ``derive_subtype()``.
Examples:
>>> derive_subtype_with_blam("1017-03-26", "בל\"מ ברלינגר נ' לינדאב")
'extension_request_building_permit'
>>> derive_subtype_with_blam("8500-25", "בקשה להארכת מועד")
'extension_request_betterment_levy'
>>> derive_subtype_with_blam("1033-25", "ערר על החלטת ועדה")
'building_permit'
"""
base = derive_subtype(case_number, practice_area)
if not is_blam_subject(subject):
return base
# subject says it's בל"מ — return the matching extension_request_* variant.
# For domain practice_area (axis B), use the direct mapping.
if practice_area in DOMAIN_PRACTICE_AREAS:
return _DOMAIN_TO_BLAM_SUBTYPE.get(practice_area, base)
# For appeals_committee (axis A), derive from case_number digit.
if practice_area == "appeals_committee":
cn = case_number or ""
m = _CASE_NUM.search(cn) or _PLAIN_NUM.search(cn)
if m:
first_digit = m.group(1)[0]
blam = _APPEALS_COMMITTEE_DIGIT_TO_BLAM.get(first_digit)
if blam:
return blam
return base
def is_blam_subtype(appeal_subtype: str) -> bool:
"""True iff appeal_subtype is one of the extension_request_* variants.
Useful for UI badges and routing logic that need to detect בל"מ cases
regardless of which domain they belong to.
"""
return appeal_subtype in BLAM_SUBTYPES
def derive_domain_practice_area(case_number: str) -> str:
"""Map a case_number prefix to a domain practice_area (axis B).
Returns:
``"rishuy_uvniya"`` for 1xxx, ``"betterment_levy"`` for 8xxx,
``"compensation_197"`` for 9xxx, or ``""`` when the prefix is
unrecognized (caller decides the fallback).
Examples:
>>> derive_domain_practice_area("8126/25")
'betterment_levy'
>>> derive_domain_practice_area("1170")
'rishuy_uvniya'
>>> derive_domain_practice_area("ARAR-24-01-9007")
'compensation_197'
>>> derive_domain_practice_area("foo")
''
"""
cn = case_number or ""
m = _CASE_NUM.search(cn) or _PLAIN_NUM.search(cn)
if not m:
return ""
first_digit = m.group(1)[0]
subtype = _APPEALS_COMMITTEE_DIGIT_TO_SUBTYPE.get(first_digit)
if not subtype:
return ""
return _SUBTYPE_TO_DOMAIN.get(subtype, "")
# ── Validation ─────────────────────────────────────────────────────
@@ -164,6 +314,20 @@ def validate(practice_area: str, appeal_subtype: str | None) -> None:
def is_override(case_number: str, practice_area: str, appeal_subtype: str) -> bool:
"""True iff the user-supplied subtype disagrees with what derive_subtype
would have produced (and the derived value is not 'unknown')."""
would have produced (and the derived value is not 'unknown').
Note: בל"מ variants (extension_request_*) are NOT considered overrides
of their parent domain — extension_request_building_permit on a 1xxx
case is consistent with the case-number convention.
"""
derived = derive_subtype(case_number, practice_area)
return derived != "unknown" and derived != appeal_subtype
if derived == "unknown":
return False
if derived == appeal_subtype:
return False
# בל"מ variants of the same domain are not overrides.
if appeal_subtype in BLAM_SUBTYPES:
# extension_request_building_permit ↔ building_permit (1xxx) — same domain
if _SUBTYPE_TO_DOMAIN.get(appeal_subtype) == _SUBTYPE_TO_DOMAIN.get(derived):
return False
return True