diff --git a/mcp-server/src/legal_mcp/config.py b/mcp-server/src/legal_mcp/config.py index 1dd8675..fc87bfa 100644 --- a/mcp-server/src/legal_mcp/config.py +++ b/mcp-server/src/legal_mcp/config.py @@ -179,6 +179,10 @@ HALACHA_PANEL_MAX_NEW = int(os.environ.get("HALACHA_PANEL_MAX_NEW", "5")) # a floor misses genuine cross-model agreement → undercounts votes → over-culls. # Calibrate against the gold-set in Phase C before the production cull. HALACHA_PANEL_MATCH_COSINE = float(os.environ.get("HALACHA_PANEL_MATCH_COSINE", "0.80")) +# When on (default), extraction uses the decision-level 3-model panel regime above +# instead of the legacy per-chunk single-model auto-approve. Set false to fall back +# to the legacy path (e.g. if all three judges are unreachable). +HALACHA_PANEL_REGIME_ENABLED = os.environ.get("HALACHA_PANEL_REGIME_ENABLED", "true").lower() == "true" # Halacha dedup-on-insert — within-precedent semantic cosine ceiling. Before # storing a halacha, store_halachot_for_chunk skips it if its rule-embedding has diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 6ef2c9f..df5e2cc 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -5408,6 +5408,82 @@ async def store_halachot_for_chunk( return inserted +async def store_panel_principles( + case_law_id: UUID, principles: list[dict], +) -> dict: + """Persist principles selected by the tri-model panel regime (#152, Phase B). + + Unlike :func:`store_halachot_for_chunk`, the verdict is ALREADY decided by the + panel — this does NOT recompute auto-approve from confidence. Each principle + carries its own ``review_status`` (approved / pending_review from the panel), + ``instance_type`` ('original' = new canonical, 'citation' = link to existing), + and ``canonical_id`` (when a citation). The cap-of-5 + dedup-frees-slot is + applied by the CALLER (the extractor); this is the atomic writer. + + Provenance (G9): reviewer = "panel: v/s". Returns + {created_new, linked} counts. + """ + pool = await get_pool() + created_new = linked = 0 + async with pool.acquire() as conn: + async with conn.transaction(): + base = await conn.fetchval( + "SELECT COALESCE(MAX(halacha_index), -1) + 1 FROM halachot " + "WHERE case_law_id = $1", case_law_id, + ) + for p in principles: + voters = ",".join(p.get("voters") or []) + reviewer = f"panel:{voters} v{p.get('votes', 0)}/s{p.get('score', 0)}" + approved = p.get("review_status") == "approved" + emb = p.get("embedding") + canonical_id = p.get("canonical_id") + instance_type = p.get("instance_type", "original") + idx = base + created_new + linked + await conn.execute( + """INSERT INTO halachot + (case_law_id, halacha_index, rule_statement, rule_type, + reasoning_summary, supporting_quote, page_reference, + practice_areas, subject_tags, cites, confidence, + quote_verified, quality_flags, embedding, review_status, + reviewer, reviewed_at, canonical_id, instance_type) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16, + CASE WHEN $17 THEN now() ELSE NULL END,$18,$19)""", + case_law_id, idx, p["rule_statement"], + p.get("rule_type", "interpretive"), p.get("reasoning_summary", ""), + p["supporting_quote"], p.get("page_reference", ""), + p.get("practice_areas", []), p.get("subject_tags", []), + p.get("cites", []), float(p.get("confidence", p.get("score", 0.0))), + p.get("quote_verified", False), p.get("quality_flags", []), + emb, p.get("review_status", "pending_review"), reviewer, + approved, canonical_id, instance_type, + ) + if instance_type == "citation" and canonical_id is not None: + await conn.execute( + "UPDATE canonical_halachot SET " + "instance_count = instance_count + 1, updated_at = now() " + "WHERE id = $1", canonical_id, + ) + linked += 1 + else: + new_canon_id = await conn.fetchval( + "INSERT INTO canonical_halachot " + "(canonical_statement, rule_type, practice_areas, subject_tags, " + " embedding, first_established_in, review_status, instance_count) " + "VALUES ($1,$2,$3,$4,$5,$6,'pending_synthesis',1) RETURNING id", + p.get("rule_statement") or "", p.get("rule_type", "interpretive"), + p.get("practice_areas") or [], p.get("subject_tags") or [], + emb, case_law_id, + ) + await conn.execute( + "UPDATE halachot SET canonical_id=$1 WHERE case_law_id=$2 " + "AND halacha_index=$3", new_canon_id, case_law_id, idx, + ) + created_new += 1 + logger.info("store_panel_principles: case_law=%s — %d new, %d linked", + case_law_id, created_new, linked) + return {"created_new": created_new, "linked": linked} + + async def list_halachot( case_law_id: UUID | None = None, review_status: str | None = None, diff --git a/mcp-server/src/legal_mcp/services/halacha_extractor.py b/mcp-server/src/legal_mcp/services/halacha_extractor.py index 6f0a148..b5a43e5 100644 --- a/mcp-server/src/legal_mcp/services/halacha_extractor.py +++ b/mcp-server/src/legal_mcp/services/halacha_extractor.py @@ -31,7 +31,7 @@ import asyncpg from legal_mcp import config from legal_mcp.config import parse_llm_json from legal_mcp.services import ( - claude_session, db, embeddings, halacha_quality, proofreader, + claude_session, db, embeddings, halacha_quality, panel_extraction, proofreader, ) logger = logging.getLogger(__name__) @@ -603,6 +603,15 @@ async def extract(case_law_id: UUID | str, force: bool = False, stop_keepalive = asyncio.Event() keepalive_task = asyncio.create_task(_lock_keepalive(lock_conn, stop_keepalive)) try: + if config.HALACHA_PANEL_REGIME_ENABLED: + # #152 Phase B — decision-level 3-model panel (votes+cap+source label). + res = await _extract_via_panel(case_law_id, force=force) + if res is not None: + return res + # panel unavailable (all judges down) → degrade to legacy path so + # extraction still makes progress instead of stalling the queue. + logger.warning("panel regime returned no result for %s — " + "falling back to legacy per-chunk extraction", case_law_id) return await _extract_impl(case_law_id, force=force, effort=effort) finally: # Stop the keepalive and await it BEFORE reusing lock_conn for unlock — @@ -894,3 +903,122 @@ async def _extract_impl(case_law_id: UUID, force: bool = False, "preserved_approved": preserved_approved, "total_chunks": len(chunks), } + + +# Cap the text sent to each judge — extractable chunks already exclude +# facts/intro/arguments, but a very long decision could still blow a context. +_PANEL_MAX_CHARS = 80_000 + +# Canonical states a new extraction may dedup-link against (frees a cap slot). +_PANEL_DEDUP_STATES = ("pending_synthesis", "pending_review", "approved", "published") + + +async def _extract_via_panel( + case_law_id: UUID, force: bool = False, dry_run: bool = False, +) -> dict | None: + """Decision-level tri-model panel extraction (#152, Phase B). + + Replaces the per-chunk single-model auto-approve with: 3 models propose → + cross-model votes + mean-score → chair's approval rule → dedup vs corpus + (link known → frees a slot) → cap of HALACHA_PANEL_MAX_NEW genuinely-new + principles per decision (by score). A principle from a binding higher court + is a הלכה; from the appeals committee a כלל פרשני (labeling via source). + + Returns the result dict, or **None** when fewer than 2 judges are reachable + (caller falls back to the legacy path so the queue still drains). ``dry_run`` + computes the full plan WITHOUT writing — used for validation/chair preview. + """ + record = await db.get_case_law(case_law_id) + if not record: + return {"status": "not_found", "extracted": 0, "stored": 0} + + # Idempotency: panel extraction is decision-level (no per-chunk checkpoints). + # Without force, skip if this decision already has halachot (avoid dup re-run). + if not force and not dry_run: + existing = await db.list_halachot(case_law_id=case_law_id, limit=1) + if existing: + total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) + await db.set_case_law_halacha_status(case_law_id, "completed") + return {"status": "completed", "extracted": total, "stored": total, + "resumed": True, "panel": True} + + source_kind = record.get("source_kind") or "external_upload" + is_binding = bool(record.get("is_binding")) + full_text = record.get("full_text") or "" + + chunks, used_fallback = await _select_extractable_chunks(case_law_id) + if not chunks: + if not dry_run: + await db.set_case_law_halacha_status(case_law_id, "completed") + return {"status": "no_chunks", "extracted": 0, "stored": 0, "panel": True} + + preserved = 0 + if force and not dry_run: + reset = await db.reset_halacha_extraction(case_law_id) + preserved = reset.get("preserved", 0) + + if not dry_run: + await db.set_case_law_halacha_status(case_law_id, "processing") + + text = "\n\n".join(c["content"] for c in chunks)[:_PANEL_MAX_CHARS] + clusters = await panel_extraction.panel_extract( + text, source_kind=source_kind, is_binding=is_binding, + ) + if not clusters: + # distinguish "judges down" (→ fallback) from "genuinely nothing found". + if sum(panel_extraction.panel_judges.available().values()) < 2: + return None + if not dry_run: + await db.mark_all_chunks_extracted(case_law_id) + await db.set_case_law_halacha_status(case_law_id, "completed") + return {"status": "completed", "extracted": 0, "stored": 0, + "new": 0, "linked": 0, "panel": True, "preserved_approved": preserved} + + kept = [c for c in clusters if c["verdict"] in ("approved", "pending_review")] + max_new = config.HALACHA_PANEL_MAX_NEW + new_count = linked = dropped_cap = 0 + to_store: list[dict] = [] + for c in kept: # strongest first (panel_extract sorts by votes,score) + emb = c.get("embedding") + canonical_id, instance_type = None, "original" + if emb is not None and config.HALACHA_CANONICAL_LOOKUP_ENABLED: + match = await db.nearest_canonical_halacha( + emb, threshold=config.HALACHA_CANONICAL_THRESHOLD, + status_filter=_PANEL_DEDUP_STATES, + ) + if match: + canonical_id, instance_type = match[0], "citation" + if instance_type == "original": + if new_count >= max_new: # cap: linked don't count, only new + dropped_cap += 1 + continue + new_count += 1 + else: + linked += 1 + to_store.append({ + "rule_statement": c["rule_statement"], "supporting_quote": c["supporting_quote"], + "reasoning_summary": c["reasoning_summary"], "rule_type": c["rule_type"], + "confidence": c["score"], "score": c["score"], "votes": c["votes"], + "voters": c["voters"], "review_status": c["verdict"], "embedding": emb, + "instance_type": instance_type, "canonical_id": canonical_id, + "quote_verified": _verify_quote(c["supporting_quote"], full_text), + }) + + if dry_run: + return {"status": "dry_run", "panel": True, "source_kind": source_kind, + "candidates": clusters, "to_store": to_store, + "new": new_count, "linked": linked, "dropped_over_cap": dropped_cap} + + res = await db.store_panel_principles(case_law_id, to_store) + await db.mark_all_chunks_extracted(case_law_id) + await db.set_case_law_halacha_status(case_law_id, "completed") + total = len(await db.list_halachot(case_law_id=case_law_id, limit=10_000)) + logger.info( + "halacha panel: case_law=%s (%s) — %d new + %d linked stored, " + "%d dropped over cap-%d", case_law_id, source_kind, + res["created_new"], res["linked"], dropped_cap, max_new, + ) + return {"status": "completed", "extracted": total, + "stored": res["created_new"] + res["linked"], "new": res["created_new"], + "linked": res["linked"], "dropped_over_cap": dropped_cap, + "panel": True, "preserved_approved": preserved} diff --git a/mcp-server/tests/test_panel_extract_selection.py b/mcp-server/tests/test_panel_extract_selection.py new file mode 100644 index 0000000..5141711 --- /dev/null +++ b/mcp-server/tests/test_panel_extract_selection.py @@ -0,0 +1,92 @@ +"""Phase B selection logic — cap-of-5 + dedup-frees-slot in _extract_via_panel (#152). + +Drives the orchestrator in dry_run mode with panel_extract / corpus-dedup / chunk +selection monkeypatched, so the cap and the "linked-existing frees a slot" rule +are verified without LLM/DB. +""" +from __future__ import annotations + +import asyncio +from uuid import uuid4 + +import pytest + +from legal_mcp import config +from legal_mcp.services import halacha_extractor as he + +CID = uuid4() + + +def _cluster(rule, verdict, votes=3, score=0.9): + return {"rule_statement": rule, "supporting_quote": f"q:{rule}", + "reasoning_summary": "", "rule_type": "interpretive", + "votes": votes, "score": score, "voters": ["claude", "deepseek", "gemini"][:votes], + "verdict": verdict, "embedding": [1.0, 0.0]} + + +def _patch_common(monkeypatch, clusters): + async def fake_case(_id): + return {"id": CID, "source_kind": "external_upload", "is_binding": True, + "full_text": " ".join(f"q:{c['rule_statement']}" for c in clusters)} + + async def fake_chunks(_id): + return ([{"content": "reasoning text"}], False) + + async def fake_panel(text, **kw): + return clusters + + async def none_match(emb, threshold=0.85, status_filter=()): + return None # default: nothing known → all new (tests override per-case) + + monkeypatch.setattr(he.db, "get_case_law", fake_case) + monkeypatch.setattr(he, "_select_extractable_chunks", fake_chunks) + monkeypatch.setattr(he.panel_extraction, "panel_extract", fake_panel) + monkeypatch.setattr(he.db, "nearest_canonical_halacha", none_match) + + +def _run(monkeypatch, clusters, nearest_fn=None): + if nearest_fn: + monkeypatch.setattr(he.db, "nearest_canonical_halacha", nearest_fn) + return asyncio.run(he._extract_via_panel(CID, dry_run=True)) + + +def test_drops_rejected_keeps_approved_and_pending(monkeypatch): + clusters = [ + _cluster("A", "approved"), _cluster("B", "pending_review", votes=2, score=0.7), + _cluster("C", "rejected", votes=1, score=0.9), + ] + _patch_common(monkeypatch, clusters) + res = _run(monkeypatch, clusters) + rules = [p["rule_statement"] for p in res["to_store"]] + assert "A" in rules and "B" in rules and "C" not in rules + + +def test_cap_limits_new_to_max(monkeypatch): + monkeypatch.setattr(config, "HALACHA_PANEL_MAX_NEW", 3) + clusters = [_cluster(f"R{i}", "approved") for i in range(6)] + _patch_common(monkeypatch, clusters) + + async def none_match(emb, threshold=0.85, status_filter=()): + return None # all new + res = _run(monkeypatch, clusters, none_match) + assert res["new"] == 3 and res["dropped_over_cap"] == 3 + assert len(res["to_store"]) == 3 + + +def test_linked_existing_does_not_consume_cap(monkeypatch): + monkeypatch.setattr(config, "HALACHA_PANEL_MAX_NEW", 2) + # 5 candidates; the first 3 are "known" (link), last 2 are new + clusters = [_cluster(f"K{i}", "approved") for i in range(3)] + \ + [_cluster(f"N{i}", "approved") for i in range(2)] + _patch_common(monkeypatch, clusters) + known = {"K0", "K1", "K2"} + + async def nearest(emb, threshold=0.85, status_filter=()): + # called per candidate in order; pop from a queue mirroring clusters + rule = nearest._order.pop(0) + return ("canon", 0.99) if rule in known else None + nearest._order = [c["rule_statement"] for c in clusters] + res = _run(monkeypatch, clusters, nearest) + # 3 linked (free) + 2 new (within cap) → all 5 stored, nothing dropped + assert res["linked"] == 3 and res["new"] == 2 and res["dropped_over_cap"] == 0 + assert len(res["to_store"]) == 5