feat(principles): canonical_statement synthesis service + throttled backfill (Phase E groundwork, #152)
Grounded (INV-AH) multi-instance synthesis with drift guard + chair gate (pending_review, G10). Single path used by backfill, MCP tool, nightly drain. HELD from production run pending the principles-redesign (rename+cull, #152). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
220
mcp-server/src/legal_mcp/services/canonical_synthesis.py
Normal file
220
mcp-server/src/legal_mcp/services/canonical_synthesis.py
Normal file
@@ -0,0 +1,220 @@
|
||||
"""Canonical-halacha synthesis (V41 Phase 4).
|
||||
|
||||
The backfill carried each canonical's ``canonical_statement`` over verbatim from
|
||||
its representative halacha. This pass asks a local ``claude_session`` model to
|
||||
rewrite that statement into ONE clean, case-independent legal principle — for the
|
||||
~6 multi-instance canonicals a genuine merge of the N phrasings, for the singleton
|
||||
majority a faithful generalising polish — then advances ``review_status``
|
||||
pending_synthesis → pending_review for the chair gate (G10 / INV-LRN1).
|
||||
|
||||
Invariants this module upholds:
|
||||
• INV-AH — the synthesis is GROUNDED in the instances' ``supporting_quote``s.
|
||||
The model abstains (``grounded=false``) rather than invent law, no
|
||||
new case citations may appear, and a re-embedding **drift guard**
|
||||
rejects any rewrite that drifts too far from the source statement.
|
||||
• G10/INV-LRN1 — never auto-approves; lands at ``pending_review`` for the chair.
|
||||
• G9 — every outcome (accepted / kept-original / abstained) is logged + returned.
|
||||
• G2 — single synthesis path; the backfill script, the on-demand MCP tool and
|
||||
the nightly drain all call :func:`synthesize_canonical` here.
|
||||
|
||||
LLM calls go through ``claude_session`` (local ``claude -p`` CLI) only — never the
|
||||
Anthropic SDK, never from the FastAPI container (see claude_session docstring).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import math
|
||||
import re
|
||||
from uuid import UUID
|
||||
|
||||
from legal_mcp import config
|
||||
from legal_mcp.services import claude_session, db, embeddings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Case-citation shapes (docket numbers) that must NOT be invented by the rewrite:
|
||||
# "1234/05", "85074-09-24", "8125-09-24". Statute section refs ("סעיף 197") do not
|
||||
# match and are legitimately part of a principle.
|
||||
_CITATION_RE = re.compile(r"\d{3,5}[-/]\d{2}(?:[-/]\d{2,4})?")
|
||||
|
||||
_SYSTEM = (
|
||||
"אתה עורך-דין בכיר המנסח עקרונות-הלכה קנוניים לבסיס-ידע משפטי של ועדת ערר "
|
||||
"לתכנון ובנייה. תפקידך לזקק ניסוח אחד, כללי ומדויק, של עיקרון משפטי — לא לסכם "
|
||||
"תיק ולא להמציא דין."
|
||||
)
|
||||
|
||||
|
||||
def _build_prompt(data: dict) -> str:
|
||||
instances = data.get("instances") or []
|
||||
blocks: list[str] = []
|
||||
for i, inst in enumerate(instances, 1):
|
||||
parts = [f"### מופע {i} (תיק {inst.get('case_number') or '—'}, "
|
||||
f"סוג: {inst.get('instance_type') or '—'})"]
|
||||
if inst.get("rule_statement"):
|
||||
parts.append(f"ניסוח-העיקרון: {inst['rule_statement']}")
|
||||
if inst.get("supporting_quote"):
|
||||
parts.append(f"ציטוט-תומך (מקור-העיגון): \"{inst['supporting_quote']}\"")
|
||||
if inst.get("reasoning_summary"):
|
||||
parts.append(f"נימוק: {inst['reasoning_summary']}")
|
||||
blocks.append("\n".join(parts))
|
||||
evidence = "\n\n".join(blocks) if blocks else "(אין מופעים)"
|
||||
multi = len(instances) > 1
|
||||
|
||||
task = (
|
||||
"מזג את כל ניסוחי-המופעים לעיקרון קנוני אחד המשותף לכולם."
|
||||
if multi else
|
||||
"נסח מחדש את העיקרון לניסוח קנוני נקי וכללי."
|
||||
)
|
||||
|
||||
return f"""{_SYSTEM}
|
||||
|
||||
הניסוח הקנוני הנוכחי (שיש לשפר):
|
||||
{data.get('canonical_statement') or '(ריק)'}
|
||||
|
||||
מקורות-העיגון (מופעי העיקרון בפסיקה):
|
||||
{evidence}
|
||||
|
||||
## המשימה
|
||||
{task}
|
||||
|
||||
## כללים מחייבים (INV-AH — עיגון, ללא הזיה)
|
||||
1. **עיגון-מקור בלבד.** הניסוח חייב לנבוע מהציטוטים-התומכים שלמעלה. אסור להוסיף דין, חריג, סייג או תנאי שאינו עולה מהמקורות.
|
||||
2. **ללא ציטוטי-תיקים חדשים.** אל תוסיף מספרי-תיק/פסקי-דין שאינם מופיעים במקורות. הפניה לסעיף-חוק כללי (למשל "סעיף 197 לחוק התכנון והבניה") מותרת אם היא חלק מהעיקרון.
|
||||
3. **כללי ובלתי-תלוי-תיק.** הסר שמות-צדדים, עובדות-תיק ספציפיות ומספרים קונקרטיים. נסח עיקרון רב-תחולה, לא סיכום של מקרה.
|
||||
4. **רגיסטר משפטי נקי** בעברית, משפט אחד עד שניים, ללא מילות-פתיחה ("נקבע כי", "בית-המשפט קבע") — רק העיקרון עצמו.
|
||||
5. **הימנעות עדיפה על המצאה.** אם אינך יכול לזקק עיקרון מעוגן מהמקורות — החזר grounded=false והשאר את הניסוח הקיים.
|
||||
|
||||
## פלט — JSON בלבד, ללא markdown וללא הסבר:
|
||||
{{
|
||||
"canonical_statement": "<הניסוח הקנוני המזוקק, או הניסוח הקיים אם grounded=false>",
|
||||
"grounded": true,
|
||||
"changed": true,
|
||||
"reason": "<משפט קצר: מה שונה, או מדוע נמנעת>"
|
||||
}}"""
|
||||
|
||||
|
||||
def _cosine(a: list[float], b: list[float]) -> float:
|
||||
dot = sum(x * y for x, y in zip(a, b))
|
||||
na = math.sqrt(sum(x * x for x in a))
|
||||
nb = math.sqrt(sum(y * y for y in b))
|
||||
if na == 0 or nb == 0:
|
||||
return 0.0
|
||||
return dot / (na * nb)
|
||||
|
||||
|
||||
def _new_citations(text: str, source_text: str) -> list[str]:
|
||||
"""Docket-number tokens present in the rewrite but absent from the source evidence."""
|
||||
src = set(_CITATION_RE.findall(source_text))
|
||||
return [tok for tok in _CITATION_RE.findall(text) if tok not in src]
|
||||
|
||||
|
||||
async def synthesize_canonical(
|
||||
canonical_id: UUID,
|
||||
*,
|
||||
model: str | None = None,
|
||||
effort: str | None = None,
|
||||
drift_floor: float | None = None,
|
||||
) -> dict:
|
||||
"""Synthesize one canonical's statement. PURE — does not write to the DB.
|
||||
|
||||
Returns a proposal dict the caller applies (or not, for dry-run):
|
||||
{status, canonical_id, accepted, original, proposed, embedding, drift_cosine, reason}
|
||||
|
||||
status ∈ {accepted, abstained, drift_rejected, new_citation, no_instances,
|
||||
llm_error, not_found}. ``accepted`` carries ``proposed`` + ``embedding``
|
||||
(the rewrite's vector, to commit alongside the statement). Every other status
|
||||
keeps the original statement.
|
||||
"""
|
||||
model = model or config.HALACHA_CANONICAL_SYNTH_MODEL
|
||||
effort = effort or config.HALACHA_CANONICAL_SYNTH_EFFORT
|
||||
drift_floor = config.HALACHA_CANONICAL_SYNTH_DRIFT_FLOOR if drift_floor is None else drift_floor
|
||||
|
||||
data = await db.fetch_canonical_synthesis_input(canonical_id)
|
||||
if data is None:
|
||||
return {"status": "not_found", "canonical_id": str(canonical_id)}
|
||||
|
||||
original = data.get("canonical_statement") or ""
|
||||
instances = data.get("instances") or []
|
||||
base = {"status": "", "canonical_id": str(canonical_id), "accepted": False,
|
||||
"original": original, "proposed": original, "embedding": None,
|
||||
"drift_cosine": None, "reason": ""}
|
||||
|
||||
if not instances:
|
||||
return {**base, "status": "no_instances", "reason": "no linked instances"}
|
||||
|
||||
try:
|
||||
result = await claude_session.query_json(
|
||||
_build_prompt(data), model=model, effort=effort, tools="",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("synthesize_canonical %s: LLM error: %s", canonical_id, e)
|
||||
return {**base, "status": "llm_error", "reason": str(e)}
|
||||
|
||||
if not isinstance(result, dict) or not result.get("canonical_statement"):
|
||||
return {**base, "status": "llm_error", "reason": "malformed LLM output"}
|
||||
|
||||
if not result.get("grounded", True):
|
||||
return {**base, "status": "abstained",
|
||||
"reason": result.get("reason") or "model abstained (not grounded)"}
|
||||
|
||||
proposed = str(result["canonical_statement"]).strip()
|
||||
if not proposed or proposed == original:
|
||||
return {**base, "status": "abstained", "reason": "no change proposed"}
|
||||
|
||||
# AH-2: no invented docket citations. Source = current statement + all evidence.
|
||||
source_text = original + " " + " ".join(
|
||||
f"{i.get('rule_statement', '')} {i.get('supporting_quote', '')}" for i in instances
|
||||
)
|
||||
invented = _new_citations(proposed, source_text)
|
||||
if invented:
|
||||
return {**base, "status": "new_citation", "proposed": proposed,
|
||||
"reason": f"introduced citations absent from source: {invented}"}
|
||||
|
||||
# Drift guard: re-embed the rewrite, compare to the source statement's vector.
|
||||
new_emb = (await embeddings.embed_texts([proposed]))[0]
|
||||
src_emb = data.get("embedding")
|
||||
if not src_emb:
|
||||
src_emb = (await embeddings.embed_texts([original]))[0]
|
||||
drift = _cosine(new_emb, src_emb)
|
||||
if drift < drift_floor:
|
||||
return {**base, "status": "drift_rejected", "proposed": proposed,
|
||||
"drift_cosine": round(drift, 4),
|
||||
"reason": f"drift {drift:.3f} < floor {drift_floor}"}
|
||||
|
||||
return {**base, "status": "accepted", "accepted": True, "proposed": proposed,
|
||||
"embedding": new_emb, "drift_cosine": round(drift, 4),
|
||||
"reason": result.get("reason") or "synthesized"}
|
||||
|
||||
|
||||
async def synthesize_and_apply(
|
||||
canonical_id: UUID,
|
||||
*,
|
||||
model: str | None = None,
|
||||
effort: str | None = None,
|
||||
drift_floor: float | None = None,
|
||||
) -> dict:
|
||||
"""Synthesize one canonical and commit the outcome.
|
||||
|
||||
On ``accepted`` writes the new statement + its embedding. On any other terminal
|
||||
outcome (abstained / drift_rejected / new_citation) the ORIGINAL statement is
|
||||
kept but ``review_status`` still advances to ``pending_review`` — a synthesis was
|
||||
attempted, so the row leaves the queue (no infinite re-attempt) and reaches the
|
||||
chair as-is. ``not_found`` / ``no_instances`` / ``llm_error`` are NOT committed
|
||||
(transient or empty) so they are retried on the next pass.
|
||||
"""
|
||||
proposal = await synthesize_canonical(
|
||||
canonical_id, model=model, effort=effort, drift_floor=drift_floor,
|
||||
)
|
||||
status = proposal["status"]
|
||||
if status in ("not_found", "no_instances", "llm_error"):
|
||||
return proposal
|
||||
|
||||
if proposal["accepted"]:
|
||||
await db.apply_canonical_synthesis(
|
||||
canonical_id, proposal["proposed"], embedding=proposal["embedding"],
|
||||
)
|
||||
else:
|
||||
# keep original statement + embedding, just advance the gate
|
||||
await db.apply_canonical_synthesis(canonical_id, proposal["original"])
|
||||
return proposal
|
||||
Reference in New Issue
Block a user