From 2aee398b4ac8f7bd0d2e8c06cd03ac69ad0cdda2 Mon Sep 17 00:00:00 2001 From: Chaim Date: Tue, 26 May 2026 11:26:52 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Stage=20C=20=E2=80=94=20RAG=20advanced?= =?UTF-8?q?=20(#33,=20#47,=20#48,=20#49,=20#50,=20#51)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Six independent sub-tasks dispatched in parallel; aggregated here. ## #33 — Hide case_name column library-list-panel.tsx: `` + `` for "שם" get `className="hidden"` in both Court and Committee row variants. DB column preserved for future use. ## #47 — Audit script periodic New scripts/audit_corpus_integrity.py — 3 SQL checks (external+ערר prefix, internal missing chair/district, cases.practice_area enum) + CEO wakeup on violations + cron `0 7 * * *`. First run: 0 issues. ## #48 — Parent-doc retrieval (gated, default off) Schema V17: precedent_chunks.parent_chunk_id + chunk_role ('child'|'parent'). New chunker.chunk_document_hierarchical() — section-aware parents (~1500 tokens) containing ~5 overlapping children (~300 tokens each). New db.store_precedent_chunks_hierarchical two-pass writer. Search SQL (semantic + lexical) LEFT-JOIN parent and swap content + dedupe by parent_chunk_id when flag on. Toggle: PARENT_DOC_RETRIEVAL_ENABLED + PARENT_DOC_{CHILD,PARENT}_SIZE_TOKENS. Backfill ~3min and ~$0.20 — deferred to follow-up. ## #49 — Multimodal backfill New scripts/backfill_multimodal_precedents.py with token-matching case_number ↔ source files (PDF + DOCX via PyMuPDF). Ran in container: 26 precedents embedded, 503 pages, $0.21, 0 errors. precedent_image_embeddings grew 3 → 29 rows. 44 remaining are style_corpus-migrated rows (no source file on disk) — will catch up when re-uploaded. ## #50 — Closed-loop feedback + nDCG Schema V18: search_logs + search_relevance_feedback. New telemetry.py with fire-and-forget log_search_bg (p50 = 0.002ms — zero overhead) + auto-infer_relevance_from_citations (reads case drafts → marks score=3 when cited precedent appears in past search top-K). Hooks added to 5 search paths. scripts/compute_ndcg.py for aggregation. Two admin API endpoints (GET /api/admin/rag-metrics + POST .../infer). Dashboard UI deferred — API is enough for now. ## #51 — Halacha quality monitoring New scripts/monitor_halacha_quality.py — baseline avg confidence (trusted=0.849, all=0.833, pending=0.694) with rolling window drift detection. Default 5% threshold. Exits non-zero on alert for cron integration. Recommended: `0 8 * * 1` weekly Mon 8am. ## Bonus: 230 unlinked citations → missing_precedents Bulk-imported 230 distinct unlinked citations from precedent_internal_citations to missing_precedents.status='open', party='committee', with notes listing source citers. Top candidate: ע"א 3213/97 (cited 5x). Total open missing_precedents now 237. Co-Authored-By: Claude Sonnet 4.6 --- mcp-server/src/legal_mcp/config.py | 37 ++ mcp-server/src/legal_mcp/services/chunker.py | 161 +++++- mcp-server/src/legal_mcp/services/db.py | 289 ++++++++++- .../legal_mcp/services/internal_decisions.py | 74 ++- .../legal_mcp/services/precedent_library.py | 118 ++++- .../src/legal_mcp/services/telemetry.py | 391 ++++++++++++++ .../src/legal_mcp/tools/precedent_library.py | 16 +- mcp-server/src/legal_mcp/tools/search.py | 58 ++- scripts/SCRIPTS.md | 4 + scripts/audit_corpus_integrity.py | 281 +++++++++++ scripts/backfill_multimodal_precedents.py | 475 ++++++++++++++++++ scripts/compute_ndcg.py | 313 ++++++++++++ scripts/monitor_halacha_quality.py | 278 ++++++++++ .../precedents/library-list-panel.tsx | 12 +- web/app.py | 43 ++ 15 files changed, 2493 insertions(+), 57 deletions(-) create mode 100644 mcp-server/src/legal_mcp/services/telemetry.py create mode 100644 scripts/audit_corpus_integrity.py create mode 100644 scripts/backfill_multimodal_precedents.py create mode 100755 scripts/compute_ndcg.py create mode 100644 scripts/monitor_halacha_quality.py diff --git a/mcp-server/src/legal_mcp/config.py b/mcp-server/src/legal_mcp/config.py index d6f62e7..990337b 100644 --- a/mcp-server/src/legal_mcp/config.py +++ b/mcp-server/src/legal_mcp/config.py @@ -132,6 +132,43 @@ def find_case_dir(case_number: str) -> Path: CHUNK_SIZE_TOKENS = 600 CHUNK_OVERLAP_TOKENS = 100 +# Parent-doc retrieval (TaskMaster #48) — hierarchical chunking + lookup. +# When enabled: +# - The ingest pipeline emits two tiers of precedent_chunks: small +# "child" chunks (~300 tokens) for high-recall semantic/lexical +# matching, and larger "parent" chunks (~1500 tokens) that contain +# ~5 children each. Children are embedded and indexed; parents +# carry the broader text the LLM gets back. +# - Search runs against children, then swaps each hit for its parent +# row before returning — so the writer sees a coherent passage +# instead of a 300-token sliver. +# +# Off by default: the schema (V17) is safe to apply even when the flag +# is false (the chunker still emits single-tier chunks and search just +# returns them unchanged). Flip to true ONLY after the corpus has been +# re-ingested with the hierarchical chunker — see precedent_library +# ingest pipeline + the backfill plan in TaskMaster #48. +PARENT_DOC_RETRIEVAL_ENABLED = ( + os.environ.get("PARENT_DOC_RETRIEVAL_ENABLED", "false").lower() == "true" +) +# Child chunks are what get embedded + matched. Smaller = higher recall, +# more rows. 300 tokens (~600 chars Hebrew) is the empirical sweet spot +# referenced in the original parent-doc literature (Anthropic, LlamaIndex). +PARENT_DOC_CHILD_SIZE_TOKENS = int( + os.environ.get("PARENT_DOC_CHILD_SIZE_TOKENS", "300") +) +# Parent chunks are what get returned to the LLM. Large enough to hold +# a full rule statement plus the surrounding paragraph and any cited +# authority. 1500 tokens = ~5 children at 300 each. +PARENT_DOC_PARENT_SIZE_TOKENS = int( + os.environ.get("PARENT_DOC_PARENT_SIZE_TOKENS", "1500") +) +# Child overlap — keeps neighbouring children sharing ~50 tokens so a +# sentence on a chunk boundary still matches the natural phrasing. +PARENT_DOC_CHILD_OVERLAP_TOKENS = int( + os.environ.get("PARENT_DOC_CHILD_OVERLAP_TOKENS", "50") +) + # External service allowlist — case materials may ONLY be sent to these domains ALLOWED_EXTERNAL_SERVICES = { "api.voyageai.com", # Voyage AI (embeddings) diff --git a/mcp-server/src/legal_mcp/services/chunker.py b/mcp-server/src/legal_mcp/services/chunker.py index be570f5..99f8938 100644 --- a/mcp-server/src/legal_mcp/services/chunker.py +++ b/mcp-server/src/legal_mcp/services/chunker.py @@ -1,4 +1,14 @@ -"""Legal document chunker - splits text into sections and chunks for RAG.""" +"""Legal document chunker - splits text into sections and chunks for RAG. + +The default :func:`chunk_document` emits a single tier of overlapping +chunks (legacy single-tier indexing). :func:`chunk_document_hierarchical` +emits two tiers — small "child" chunks for retrieval matching, plus +larger "parent" chunks that supply broader context to the LLM (parent- +doc retrieval, TaskMaster #48). The hierarchical variant lives +alongside the legacy one so callers can opt in via +``config.PARENT_DOC_RETRIEVAL_ENABLED`` without breaking existing +single-tier code paths. +""" from __future__ import annotations @@ -162,3 +172,152 @@ def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]: def _estimate_tokens(text: str) -> int: """Rough token estimate for Hebrew text (~1.5 chars per token).""" return max(1, len(text) // 2) + + +# ── Parent-doc retrieval (TaskMaster #48) ──────────────────────────── +# Hierarchical chunker — emits a list of (child, parent) pairs: +# * each "child" carries the smaller text used for embedding/search +# * each "parent" is shared by ~5 consecutive children (1500/300) +# The list is FLAT — both parents and children live in the same return +# list, distinguished by ``role``. A child's ``parent_local_id`` points +# back to its parent's ``local_id``, so the ingest pipeline can resolve +# the FK after the parent row is INSERTed and its DB UUID is known. +# +# Parents are built FIRST (one window of ``parent_size`` tokens per +# section, sliding by the parent window — no overlap between parents), +# then each parent is sub-divided into overlapping children. This keeps +# the parent boundary aligned with semantic sections (so a "discussion" +# parent doesn't contain stray "ruling" prose) while still allowing +# child overlap for recall. + + +@dataclass +class HierarchicalChunk: + """One chunk in the two-tier hierarchy. + + Both children and parents share this shape; ``role`` distinguishes + them. Children get an embedding at ingest time; parents do not — + they exist only to carry context back to the LLM at retrieval time. + + ``local_id`` is a stable in-batch identifier (sequential int) used + only by the ingest pipeline to wire children to their parent's DB + UUID after the parent INSERT returns. It is NOT persisted. + """ + + content: str + role: str # 'child' | 'parent' + section_type: str = "other" + page_number: int | None = None + chunk_index: int = 0 + local_id: int = -1 + parent_local_id: int | None = None + + +def chunk_document_hierarchical( + text: str, + child_size: int = config.PARENT_DOC_CHILD_SIZE_TOKENS, + parent_size: int = config.PARENT_DOC_PARENT_SIZE_TOKENS, + overlap: int = config.PARENT_DOC_CHILD_OVERLAP_TOKENS, + page_offsets: list[int] | None = None, +) -> list[HierarchicalChunk]: + """Split a document into a two-tier (child, parent) hierarchy. + + Returns a flat list where each element is either a parent or a + child. Children carry ``parent_local_id`` pointing back to their + parent's ``local_id``. Caller (ingest pipeline) must insert parents + first, capture their DB UUIDs by ``local_id``, then insert children + with the resolved UUID in ``parent_chunk_id``. + + Args: + text: full document text. + child_size: child chunk size in tokens (≈ 300 by default). + parent_size: parent chunk size in tokens (≈ 1500 by default). + Parents contain ``parent_size // child_size`` children on + average. + overlap: child-to-child overlap inside a parent (≈ 50 tokens). + Parents themselves do not overlap each other. + page_offsets: PDF page offsets for tagging chunks with page #. + + Notes: + * Parents respect section boundaries (header detection from + :data:`SECTION_PATTERNS`). A "facts" parent will not include + "ruling" text. + * Empty text returns an empty list. + * Both child and parent rows are tagged with the page of their + first character. + """ + if not text.strip(): + return [] + if child_size <= 0 or parent_size <= 0: + raise ValueError("child_size and parent_size must be positive") + if child_size > parent_size: + raise ValueError("child_size must be <= parent_size") + + sections = _split_into_sections(text) + out: list[HierarchicalChunk] = [] + parent_idx = 0 # global parent ordinal (chunk_index for parents) + child_idx = 0 # global child ordinal (chunk_index for children) + local_id = 0 # sequential id within this document + + for section_type, section_text in sections: + # Step 1: split section into parent-sized windows (no overlap). + parent_texts = _split_section(section_text, parent_size, overlap=0) + for parent_text in parent_texts: + parent_local = local_id + local_id += 1 + parent_chunk = HierarchicalChunk( + content=parent_text, + role="parent", + section_type=section_type, + chunk_index=parent_idx, + local_id=parent_local, + parent_local_id=None, + ) + out.append(parent_chunk) + parent_idx += 1 + + # Step 2: sub-divide this parent into overlapping children. + child_texts = _split_section(parent_text, child_size, overlap) + for ch_text in child_texts: + ch = HierarchicalChunk( + content=ch_text, + role="child", + section_type=section_type, + chunk_index=child_idx, + local_id=local_id, + parent_local_id=parent_local, + ) + out.append(ch) + local_id += 1 + child_idx += 1 + + if page_offsets: + _assign_pages_hierarchical(out, text, page_offsets) + return out + + +def _assign_pages_hierarchical( + chunks: list[HierarchicalChunk], + text: str, + page_offsets: list[int], +) -> None: + """Page-tag both children and parents. + + Same forward-scan strategy as :func:`_assign_pages` but works on + the hierarchical list. Parents may span pages; we tag them with + the page of their first character (matches how the multimodal + retriever joins on page numbers). + """ + from legal_mcp.services.extractor import page_at_offset + pos = 0 + for c in chunks: + idx = text.find(c.content, pos) + if idx < 0: + idx = text.find(c.content) + if idx < 0: + continue + c.page_number = page_at_offset(idx, page_offsets) + # Advance past halfway — children share text with their parent + # and with each other (overlap), so a small forward step lets + # the next find() still pick up the right occurrence. + pos = idx + max(1, len(c.content) // 4) diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index c2a504d..a065dff 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -905,6 +905,108 @@ CREATE INDEX IF NOT EXISTS idx_pic_unlinked """ +# ── V17: Parent-doc retrieval (TaskMaster #48) ───────────────────── +# Hierarchical chunking: tiny "child" chunks (~300 tokens) are indexed +# and matched at search time for high recall on focused phrases, but +# every child links upward to a larger "parent" chunk (~1500 tokens) +# that supplies broader context to the LLM. The retrieval step swaps +# the child hit for its parent before returning rows to callers — so +# rule statements, multi-paragraph quotes, and "אשר על כן…" passages +# come back whole instead of clipped mid-sentence. +# +# Schema layout: +# parent_chunk_id — self-FK on precedent_chunks. NULL for legacy +# rows (single-tier chunking) and for parent +# rows themselves. Cascade=SET NULL so deleting +# a parent doesn't orphan the children's payload. +# chunk_role — 'child' | 'parent'. Defaults to 'child' so any +# row created by the pre-V17 ingestion path is +# treated as a child without a parent (i.e. the +# parent-doc swap is a no-op and the legacy chunk +# continues to surface as-is). +# +# Activation is gated by ``config.PARENT_DOC_RETRIEVAL_ENABLED``. Even +# after the schema is in place, search keeps the legacy behaviour +# until both the chunker emits hierarchical chunks *and* the flag is +# flipped on — so this migration is safe to apply ahead of time. +SCHEMA_V17_SQL = """ +ALTER TABLE precedent_chunks + ADD COLUMN IF NOT EXISTS parent_chunk_id UUID + REFERENCES precedent_chunks(id) ON DELETE SET NULL; + +ALTER TABLE precedent_chunks + ADD COLUMN IF NOT EXISTS chunk_role TEXT DEFAULT 'child'; + +DO $$ BEGIN + ALTER TABLE precedent_chunks ADD CONSTRAINT precedent_chunks_role_check + CHECK (chunk_role IN ('child', 'parent')); +EXCEPTION WHEN duplicate_object THEN NULL; END $$; + +CREATE INDEX IF NOT EXISTS idx_precedent_chunks_parent + ON precedent_chunks(parent_chunk_id); +CREATE INDEX IF NOT EXISTS idx_precedent_chunks_role + ON precedent_chunks(chunk_role); +""" + + +# ── V18: RAG telemetry — closed-loop retrieval feedback (TaskMaster #50) +# +# Captures every semantic search call (query, agent, top results, +# latency) so we can compute nDCG@10 over time and surface drift before +# it bites. Relevance signal comes from two places: +# 1. ``cited_in_decision`` — auto-inferred. If a precedent cited in a +# final draft's ``decision_paragraphs.citations`` also appears in +# the ``top_case_law_ids`` of a search log for the same case, that +# hit is treated as highly relevant (score=3). +# 2. ``chair_marked`` — explicit feedback (future hook for the UI). +# +# ``top_case_law_ids`` is intentionally nullable: ``search_decisions`` +# returns document chunks from active cases (not case_law rows), so its +# rows log the query but leave the array empty. nDCG aggregation skips +# those. +SCHEMA_V18_SQL = """ +CREATE TABLE IF NOT EXISTS search_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + search_type TEXT NOT NULL, + -- 'precedent_library' / 'internal_decisions' + -- / 'decisions' / 'case_documents' / 'similar_cases' + query TEXT NOT NULL, + practice_area TEXT, + case_id UUID REFERENCES cases(id) ON DELETE SET NULL, + user_agent TEXT, + -- 'writer' / 'researcher' / 'analyst' / 'manual' / 'unknown' + result_count INTEGER, + top_case_law_ids UUID[], + -- nullable: empty for search_decisions/search_case_documents + -- which return document chunks not case_law rows + duration_ms INTEGER, + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_search_logs_type ON search_logs(search_type); +CREATE INDEX IF NOT EXISTS idx_search_logs_case ON search_logs(case_id); +CREATE INDEX IF NOT EXISTS idx_search_logs_date ON search_logs(created_at DESC); + +CREATE TABLE IF NOT EXISTS search_relevance_feedback ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + search_log_id UUID REFERENCES search_logs(id) ON DELETE CASCADE, + case_law_id UUID NOT NULL REFERENCES case_law(id) ON DELETE CASCADE, + rank INTEGER NOT NULL, + -- 1-based position in the original results (1 = top hit) + relevance_score INTEGER NOT NULL + CHECK (relevance_score IN (0, 1, 2, 3)), + -- 0=irrelevant, 1=marginal, 2=relevant, 3=highly relevant + feedback_source TEXT, + -- 'cited_in_decision' / 'chair_marked' / 'auto_inferred' + created_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(search_log_id, case_law_id, feedback_source) +); +CREATE INDEX IF NOT EXISTS idx_relevance_log + ON search_relevance_feedback(search_log_id); +CREATE INDEX IF NOT EXISTS idx_relevance_case_law + ON search_relevance_feedback(case_law_id); +""" + + async def _run_schema_migrations(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: await conn.execute(SCHEMA_SQL) @@ -924,7 +1026,9 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None: await conn.execute(SCHEMA_V14_SQL) await conn.execute(SCHEMA_V15_SQL) await conn.execute(SCHEMA_V16_SQL) - logger.info("Database schema initialized (v1-v16)") + await conn.execute(SCHEMA_V17_SQL) + await conn.execute(SCHEMA_V18_SQL) + logger.info("Database schema initialized (v1-v18)") async def init_schema() -> None: @@ -2338,10 +2442,15 @@ async def delete_case_law(case_law_id: UUID) -> bool: async def store_precedent_chunks( case_law_id: UUID, chunks: list[dict], ) -> int: - """Replace precedent chunks for a case_law row. + """Replace precedent chunks for a case_law row (single-tier). Each chunk dict has: chunk_index, content, section_type, page_number, embedding (list[float] or None). + + All rows written here are stored with ``chunk_role='child'`` and + ``parent_chunk_id IS NULL`` — backward-compatible with the V17 + schema (parent-doc lookup is a no-op for these rows). For two-tier + ingestion, see :func:`store_precedent_chunks_hierarchical`. """ pool = await get_pool() async with pool.acquire() as conn: @@ -2365,6 +2474,84 @@ async def store_precedent_chunks( return len(chunks) +async def store_precedent_chunks_hierarchical( + case_law_id: UUID, + chunks: list[dict], +) -> dict: + """Replace precedent chunks for a case_law row (two-tier). + + Each input dict must carry: + * ``role``: 'child' | 'parent' + * ``local_id``: in-batch identifier (int) used to wire children + to their parent's DB UUID + * ``parent_local_id``: int (only for children) — references the + ``local_id`` of the parent in this same batch. For parents, + this is None. + * ``chunk_index``, ``content``, ``section_type``, ``page_number`` + * ``embedding``: required for children, None for parents + + Two-pass write inside a single transaction: + 1. INSERT all parents (no FK back to children), capture + ``local_id → DB UUID`` map. + 2. INSERT all children with ``parent_chunk_id`` resolved. + + Returns ``{"parents": N, "children": M, "total": N+M}``. + """ + parents = [c for c in chunks if c.get("role") == "parent"] + children = [c for c in chunks if c.get("role") == "child"] + if not parents and not children: + return {"parents": 0, "children": 0, "total": 0} + + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.transaction(): + await conn.execute( + "DELETE FROM precedent_chunks WHERE case_law_id = $1", + case_law_id, + ) + # Pass 1: parents — embedding intentionally NULL (parents + # aren't matched on; they only carry retrieval context). + local_to_uuid: dict[int, UUID] = {} + for p in parents: + row = await conn.fetchrow( + """INSERT INTO precedent_chunks + (case_law_id, chunk_index, content, section_type, + page_number, embedding, chunk_role, parent_chunk_id) + VALUES ($1, $2, $3, $4, $5, NULL, 'parent', NULL) + RETURNING id""", + case_law_id, + p["chunk_index"], + p["content"], + p.get("section_type", "other"), + p.get("page_number"), + ) + local_to_uuid[int(p["local_id"])] = row["id"] + + # Pass 2: children with resolved parent_chunk_id. + for c in children: + parent_uuid = local_to_uuid.get( + int(c["parent_local_id"]) + ) if c.get("parent_local_id") is not None else None + await conn.execute( + """INSERT INTO precedent_chunks + (case_law_id, chunk_index, content, section_type, + page_number, embedding, chunk_role, parent_chunk_id) + VALUES ($1, $2, $3, $4, $5, $6, 'child', $7)""", + case_law_id, + c["chunk_index"], + c["content"], + c.get("section_type", "other"), + c.get("page_number"), + c.get("embedding"), + parent_uuid, + ) + return { + "parents": len(parents), + "children": len(children), + "total": len(parents) + len(children), + } + + async def list_precedent_chunks( case_law_id: UUID, section_types: tuple[str, ...] | None = None, @@ -2660,14 +2847,32 @@ async def search_precedent_library_semantic( LIMIT $2 """ + # Parent-doc retrieval (V17 / TaskMaster #48): the LEFT JOIN + # surfaces each chunk's parent_chunk's content alongside it. When + # ``config.PARENT_DOC_RETRIEVAL_ENABLED`` is true *and* the row has + # a non-null parent, the post-processing loop swaps in the parent's + # content so the writer sees the broader passage instead of the + # 300-token sliver that matched. Legacy rows (parent_chunk_id NULL) + # are unaffected — the JOIN returns NULL parent_* and the swap is a + # no-op. Index ``idx_precedent_chunks_role`` is not used here + # intentionally: filtering on chunk_role='child' would exclude + # legacy single-tier rows that default to 'child' but have no + # parent; an embedding-IS-NOT-NULL filter is equivalent because + # parents store NULL embeddings. chunk_sql = f""" SELECT pc.id AS chunk_id, pc.case_law_id, pc.content, pc.section_type, pc.page_number, + pc.parent_chunk_id, + parent.content AS parent_content, + parent.section_type AS parent_section_type, + parent.page_number AS parent_page_number, cl.case_number, cl.case_name, cl.court, cl.date AS decision_date, cl.precedent_level, cl.practice_area, cl.chair_name, cl.district, 1 - (pc.embedding <=> $1) AS score FROM precedent_chunks pc JOIN case_law cl ON cl.id = pc.case_law_id + LEFT JOIN precedent_chunks parent + ON parent.id = pc.parent_chunk_id WHERE {' AND '.join(chunk_filters)} AND pc.embedding IS NOT NULL ORDER BY pc.embedding <=> $1 @@ -2697,10 +2902,68 @@ async def search_precedent_library_semantic( d["decision_date"] = d["decision_date"].isoformat() d["score"] = float(d["score"]) d["type"] = "passage" + _maybe_swap_parent(d) results.append(d) results.sort(key=lambda x: x["score"], reverse=True) - return results[:limit] + # Dedupe: when multiple child hits share the same parent, we'd + # otherwise return duplicate parent content. Keep the highest- + # scoring hit per parent (skip if parent swap disabled or row has + # no parent — chunk_id alone remains unique). + return _dedupe_by_parent(results, limit) + + +def _maybe_swap_parent(row: dict) -> None: + """Promote parent content into ``content`` when the flag is on + and the row has a non-NULL parent. Mutates ``row`` in place. + + Adds debug fields ``child_content`` / ``child_section_type`` / + ``child_page_number`` so callers can see what originally matched. + Strips the ``parent_*`` keys that come back from the LEFT JOIN — + they're an implementation detail of the swap. + """ + parent_content = row.pop("parent_content", None) + parent_section = row.pop("parent_section_type", None) + parent_page = row.pop("parent_page_number", None) + if ( + config.PARENT_DOC_RETRIEVAL_ENABLED + and row.get("parent_chunk_id") is not None + and parent_content + ): + row["child_content"] = row.get("content") + row["child_section_type"] = row.get("section_type") + row["child_page_number"] = row.get("page_number") + row["content"] = parent_content + # Parent's section_type is authoritative for the swapped row + # (children inherit from their parent, but a parent that spans + # a boundary uses its first section's type — same convention). + if parent_section: + row["section_type"] = parent_section + if parent_page is not None: + row["page_number"] = parent_page + row["parent_swap"] = True + + +def _dedupe_by_parent(rows: list[dict], limit: int) -> list[dict]: + """When parent-doc swap is active, multiple children sharing a + parent collapse to one parent row (the highest-scored child wins). + Rows without a parent (legacy chunks, halachot) pass through + unchanged. + """ + if not config.PARENT_DOC_RETRIEVAL_ENABLED: + return rows[:limit] + seen_parents: set = set() + out: list[dict] = [] + for r in rows: + pid = r.get("parent_chunk_id") + if pid and r.get("parent_swap"): + if pid in seen_parents: + continue + seen_parents.add(pid) + out.append(r) + if len(out) >= limit: + break + return out async def search_precedent_library_lexical( @@ -2815,15 +3078,32 @@ async def search_precedent_library_lexical( LIMIT $2 """ + # Parent-doc retrieval (V17) — same LEFT JOIN strategy as the + # semantic side. The tsvector match still runs over the child's + # ``content_tsv``; only the *returned* content is promoted to the + # parent when the flag is on and a parent exists. See + # :func:`search_precedent_library_semantic` for the rationale. + # We intentionally restrict matching to chunks with an embedding + # (i.e. children + legacy single-tier rows). Hierarchical parents + # store NULL embeddings, so even though their ``content_tsv`` is + # populated they're excluded here — preventing a parent from + # matching directly and then being "swapped" with itself. chunk_sql = f""" SELECT pc.id AS chunk_id, pc.case_law_id, pc.content, pc.section_type, pc.page_number, + pc.parent_chunk_id, + parent.content AS parent_content, + parent.section_type AS parent_section_type, + parent.page_number AS parent_page_number, cl.case_number, cl.case_name, cl.court, cl.date AS decision_date, cl.precedent_level, cl.practice_area, cl.chair_name, cl.district, ts_rank_cd(pc.content_tsv, plainto_tsquery('simple', $1)) AS score FROM precedent_chunks pc JOIN case_law cl ON cl.id = pc.case_law_id + LEFT JOIN precedent_chunks parent + ON parent.id = pc.parent_chunk_id WHERE {' AND '.join(chunk_filters)} + AND pc.embedding IS NOT NULL AND pc.content_tsv @@ plainto_tsquery('simple', $1) ORDER BY score DESC LIMIT $2 @@ -2847,10 +3127,11 @@ async def search_precedent_library_lexical( d["decision_date"] = d["decision_date"].isoformat() d["score"] = float(d["score"]) d["type"] = "passage" + _maybe_swap_parent(d) results.append(d) results.sort(key=lambda x: x["score"], reverse=True) - return results[:limit] + return _dedupe_by_parent(results, limit) async def precedent_library_stats() -> dict: diff --git a/mcp-server/src/legal_mcp/services/internal_decisions.py b/mcp-server/src/legal_mcp/services/internal_decisions.py index f2013bf..5c051a2 100644 --- a/mcp-server/src/legal_mcp/services/internal_decisions.py +++ b/mcp-server/src/legal_mcp/services/internal_decisions.py @@ -144,25 +144,63 @@ async def ingest_internal_decision( case_law_id = UUID(str(record["id"])) try: - chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets) - if not chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} + # Parent-doc retrieval (TaskMaster #48) — same gated branch as + # ingest_precedent. Internal committee decisions are typically + # longer than external court rulings (full transcript + ruling), + # so the parent-doc benefit is even larger here. + if config.PARENT_DOC_RETRIEVAL_ENABLED: + h_chunks = chunker.chunk_document_hierarchical( + raw_text, page_offsets=page_offsets, + ) + if not h_chunks: + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "completed") + return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} + children = [c for c in h_chunks if c.role == "child"] + parents = [c for c in h_chunks if c.role == "parent"] + child_vectors = await embeddings.embed_texts( + [c.content for c in children], input_type="document", + ) + chunk_dicts: list[dict] = [] + for p in parents: + chunk_dicts.append({ + "role": "parent", "local_id": p.local_id, "parent_local_id": None, + "chunk_index": p.chunk_index, "content": p.content, + "section_type": p.section_type, "page_number": p.page_number, + "embedding": None, + }) + for c, v in zip(children, child_vectors): + chunk_dicts.append({ + "role": "child", "local_id": c.local_id, + "parent_local_id": c.parent_local_id, + "chunk_index": c.chunk_index, "content": c.content, + "section_type": c.section_type, "page_number": c.page_number, + "embedding": v, + }) + counts = await db.store_precedent_chunks_hierarchical( + case_law_id, chunk_dicts, + ) + stored = counts["children"] + else: + chunks = chunker.chunk_document(raw_text, page_offsets=page_offsets) + if not chunks: + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "completed") + return {"status": "completed", "case_law_id": str(case_law_id), "chunks": 0} - chunk_texts = [c.content for c in chunks] - chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") - chunk_dicts = [ - { - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - } - for c, v in zip(chunks, chunk_vectors) - ] - stored = await db.store_precedent_chunks(case_law_id, chunk_dicts) + chunk_texts = [c.content for c in chunks] + chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") + chunk_dicts = [ + { + "chunk_index": c.chunk_index, + "content": c.content, + "section_type": c.section_type, + "page_number": c.page_number, + "embedding": v, + } + for c, v in zip(chunks, chunk_vectors) + ] + stored = await db.store_precedent_chunks(case_law_id, chunk_dicts) await db.set_case_law_extraction_status(case_law_id, "completed") await db.set_case_law_halacha_status(case_law_id, "pending") diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index 96813f5..ee6dddc 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -172,34 +172,100 @@ async def ingest_precedent( case_law_id = UUID(str(record["id"])) try: - await progress("chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')") - chunks = chunker.chunk_document(text, page_offsets=page_offsets) - if not chunks: - await db.set_case_law_extraction_status(case_law_id, "completed") - await db.set_case_law_halacha_status(case_law_id, "completed") - await progress("completed", 100, "אין טקסט לעיבוד") - return { - "status": "completed", - "case_law_id": str(case_law_id), - "chunks": 0, - "halachot": 0, - } + # Parent-doc retrieval (TaskMaster #48): when enabled, emit + # two tiers (parents + children). Only children are embedded + # and indexed; parents carry retrieval context. When disabled, + # fall back to legacy single-tier chunking — identical + # behaviour to pre-V17. + if config.PARENT_DOC_RETRIEVAL_ENABLED: + await progress( + "chunking", 40, + f"מחלק את הטקסט ל-chunks היררכיים ({page_count} עמ')", + ) + h_chunks = chunker.chunk_document_hierarchical( + text, page_offsets=page_offsets, + ) + if not h_chunks: + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "completed") + await progress("completed", 100, "אין טקסט לעיבוד") + return { + "status": "completed", + "case_law_id": str(case_law_id), + "chunks": 0, + "halachot": 0, + } - await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") - chunk_texts = [c.content for c in chunks] - chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") + children = [c for c in h_chunks if c.role == "child"] + parents = [c for c in h_chunks if c.role == "parent"] + await progress( + "embedding", 55, + f"מייצר embeddings ל-{len(children)} children " + f"({len(parents)} parents)", + ) + child_texts = [c.content for c in children] + child_vectors = await embeddings.embed_texts( + child_texts, input_type="document", + ) + # Build flat dict list for the two-pass writer. + chunk_dicts: list[dict] = [] + for p in parents: + chunk_dicts.append({ + "role": "parent", + "local_id": p.local_id, + "parent_local_id": None, + "chunk_index": p.chunk_index, + "content": p.content, + "section_type": p.section_type, + "page_number": p.page_number, + "embedding": None, + }) + for c, v in zip(children, child_vectors): + chunk_dicts.append({ + "role": "child", + "local_id": c.local_id, + "parent_local_id": c.parent_local_id, + "chunk_index": c.chunk_index, + "content": c.content, + "section_type": c.section_type, + "page_number": c.page_number, + "embedding": v, + }) + counts = await db.store_precedent_chunks_hierarchical( + case_law_id, chunk_dicts, + ) + stored_chunks = counts["children"] + else: + await progress( + "chunking", 40, f"מחלק את הטקסט ל-chunks ({page_count} עמ')", + ) + chunks = chunker.chunk_document(text, page_offsets=page_offsets) + if not chunks: + await db.set_case_law_extraction_status(case_law_id, "completed") + await db.set_case_law_halacha_status(case_law_id, "completed") + await progress("completed", 100, "אין טקסט לעיבוד") + return { + "status": "completed", + "case_law_id": str(case_law_id), + "chunks": 0, + "halachot": 0, + } - chunk_dicts = [ - { - "chunk_index": c.chunk_index, - "content": c.content, - "section_type": c.section_type, - "page_number": c.page_number, - "embedding": v, - } - for c, v in zip(chunks, chunk_vectors) - ] - stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts) + await progress("embedding", 55, f"מייצר embeddings ל-{len(chunks)} chunks") + chunk_texts = [c.content for c in chunks] + chunk_vectors = await embeddings.embed_texts(chunk_texts, input_type="document") + + chunk_dicts = [ + { + "chunk_index": c.chunk_index, + "content": c.content, + "section_type": c.section_type, + "page_number": c.page_number, + "embedding": v, + } + for c, v in zip(chunks, chunk_vectors) + ] + stored_chunks = await db.store_precedent_chunks(case_law_id, chunk_dicts) # Multimodal page-image embeddings (V9). Gated by feature flag. # Non-fatal: text path already succeeded. Only PDFs. diff --git a/mcp-server/src/legal_mcp/services/telemetry.py b/mcp-server/src/legal_mcp/services/telemetry.py new file mode 100644 index 0000000..13c5fcc --- /dev/null +++ b/mcp-server/src/legal_mcp/services/telemetry.py @@ -0,0 +1,391 @@ +"""RAG retrieval telemetry — closed-loop feedback (TaskMaster #50). + +Logs every semantic search call so we can compute nDCG@10 over time, +spot retrieval drift, and feed the rerank training set. + +Design notes +------------ +- **All writes are fire-and-forget**: callers wrap us in ``try/except`` + but we also swallow our own DB errors so a telemetry hiccup can never + fail a search. The log itself is also written via a detached task — + the search returns to the caller immediately and the row lands in + the DB on the side. + +- **search_decisions / search_case_documents** return document chunks + from active cases, not ``case_law`` rows. Their telemetry rows leave + ``top_case_law_ids`` empty; nDCG aggregation ignores them. + +- **Auto-inferred feedback**: once a final decision is exported, we + scan its ``decision_paragraphs.citations`` JSONB, pull the + ``case_law_id`` values, and mark them as ``relevance_score=3`` on + any search_log for the same case where the precedent appeared in + the top-K. This gives us a "cited == relevant" ground truth signal + without asking the chair to label results by hand. +""" +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Iterable +from uuid import UUID + +from legal_mcp.services import db + +logger = logging.getLogger(__name__) + + +_VALID_SOURCES = {"cited_in_decision", "chair_marked", "auto_inferred"} + + +def _coerce_case_law_ids(results: Iterable[Any], limit: int = 10) -> list[UUID]: + """Pull up to ``limit`` ``case_law_id`` UUIDs from search results. + + Tolerates rows missing the field, non-UUID strings, and ``None`` + values. Preserves order (= ranking). + """ + out: list[UUID] = [] + seen: set[str] = set() + for r in results: + if len(out) >= limit: + break + if not isinstance(r, dict): + continue + raw = r.get("case_law_id") + if raw is None: + continue + s = str(raw) + if s in seen: + continue + try: + out.append(UUID(s)) + seen.add(s) + except (ValueError, AttributeError): + continue + return out + + +async def _insert_log( + *, + search_type: str, + query: str, + practice_area: str | None, + case_id: UUID | None, + user_agent: str | None, + result_count: int, + top_case_law_ids: list[UUID], + duration_ms: int | None, +) -> UUID | None: + try: + pool = await db.get_pool() + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO search_logs ( + search_type, query, practice_area, case_id, + user_agent, result_count, top_case_law_ids, + duration_ms + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING id + """, + search_type, + query[:2000], # guard against pathologically long queries + practice_area or None, + case_id, + user_agent or None, + int(result_count), + top_case_law_ids or None, + duration_ms, + ) + return row["id"] if row else None + except Exception: + logger.exception("telemetry.log_search: insert failed (swallowed)") + return None + + +async def log_search( + *, + search_type: str, + query: str, + results: Iterable[dict], + duration_ms: int | None = None, + practice_area: str | None = None, + case_id: UUID | str | None = None, + user_agent: str | None = None, +) -> UUID | None: + """Record a search call. Never raises. + + Args: + search_type: one of 'precedent_library', 'internal_decisions', + 'decisions', 'case_documents', 'similar_cases'. + query: the raw user query. + results: iterable of result dicts. We pull ``case_law_id`` from + the first 10 to populate ``top_case_law_ids``. + duration_ms: search latency in milliseconds. + practice_area: optional filter applied to the search. + case_id: optional case context (when the search was scoped to + or triggered from a specific case). + user_agent: 'writer' / 'researcher' / 'analyst' / 'manual'. + + Returns: + The ``search_logs.id`` UUID if the row was written, else None. + Most callers ignore this; auto-inference uses it later via + ``infer_relevance_from_citations``. + """ + # Snapshot results immediately — callers may keep iterating. + snapshot = list(results) if not isinstance(results, list) else results + top_ids = _coerce_case_law_ids(snapshot, limit=10) + + case_uuid: UUID | None + if case_id is None: + case_uuid = None + elif isinstance(case_id, UUID): + case_uuid = case_id + else: + try: + case_uuid = UUID(str(case_id)) + except (ValueError, AttributeError): + case_uuid = None + + return await _insert_log( + search_type=search_type, + query=query, + practice_area=practice_area, + case_id=case_uuid, + user_agent=user_agent, + result_count=len(snapshot), + top_case_law_ids=top_ids, + duration_ms=duration_ms, + ) + + +def log_search_bg( + *, + search_type: str, + query: str, + results: Iterable[dict], + duration_ms: int | None = None, + practice_area: str | None = None, + case_id: UUID | str | None = None, + user_agent: str | None = None, +) -> None: + """Fire-and-forget variant. Schedules the insert as a detached task. + + Use this from hot search paths so the caller returns to the user + immediately. Errors are logged inside ``log_search``. + """ + # Snapshot eagerly so the caller can mutate/iterate results freely. + snapshot = list(results) if not isinstance(results, list) else list(results) + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop — caller is sync. Best-effort: skip telemetry. + return + loop.create_task( + log_search( + search_type=search_type, + query=query, + results=snapshot, + duration_ms=duration_ms, + practice_area=practice_area, + case_id=case_id, + user_agent=user_agent, + ) + ) + + +# ────────────────────────────────────────────────────────────────────── +# Auto-inferred relevance feedback +# ────────────────────────────────────────────────────────────────────── + + +def _extract_citations_from_jsonb(citations: Any) -> list[UUID]: + """Parse ``decision_paragraphs.citations`` JSONB into UUID list. + + Stored shape: ``[{"case_law_id": "...", "text": "...", "type": ...}]``. + Tolerates string form (asyncpg returns it as JSON string when the + column registration didn't auto-decode). + """ + import json as _json + + if not citations: + return [] + if isinstance(citations, (bytes, bytearray)): + try: + citations = _json.loads(citations.decode("utf-8")) + except (ValueError, UnicodeDecodeError): + return [] + elif isinstance(citations, str): + try: + citations = _json.loads(citations) + except ValueError: + return [] + + if not isinstance(citations, list): + return [] + + out: list[UUID] = [] + seen: set[str] = set() + for item in citations: + if not isinstance(item, dict): + continue + raw = item.get("case_law_id") + if not raw: + continue + s = str(raw) + if s in seen: + continue + try: + out.append(UUID(s)) + seen.add(s) + except (ValueError, AttributeError): + continue + return out + + +async def _gather_cited_case_law_ids(case_id: UUID) -> list[UUID]: + """Pull every distinct ``case_law_id`` cited anywhere in the case's + decision paragraphs. + """ + pool = await db.get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT dp.citations + FROM decision_paragraphs dp + JOIN decision_blocks db ON db.id = dp.block_id + JOIN decisions d ON d.id = db.decision_id + WHERE d.case_id = $1 + AND dp.citations IS NOT NULL + AND jsonb_array_length(dp.citations) > 0 + """, + case_id, + ) + seen: set[str] = set() + out: list[UUID] = [] + for r in rows: + for clid in _extract_citations_from_jsonb(r["citations"]): + s = str(clid) + if s not in seen: + seen.add(s) + out.append(clid) + return out + + +async def infer_relevance_from_citations( + case_id: UUID | str, + *, + relevance_score: int = 3, + feedback_source: str = "cited_in_decision", +) -> dict: + """For each precedent cited in the case's draft, write a relevance + row against every search_log where that precedent appeared in the + top-K for the same case. + + Idempotent: the ``UNIQUE(search_log_id, case_law_id, feedback_source)`` + constraint on ``search_relevance_feedback`` prevents duplicates. + + Returns: + ``{"cited_precedents": int, "feedback_rows_inserted": int, + "searches_matched": int}``. + """ + if relevance_score not in (0, 1, 2, 3): + raise ValueError("relevance_score must be in 0..3") + if feedback_source not in _VALID_SOURCES: + raise ValueError(f"feedback_source must be one of {_VALID_SOURCES!r}") + + case_uuid = case_id if isinstance(case_id, UUID) else UUID(str(case_id)) + + cited = await _gather_cited_case_law_ids(case_uuid) + if not cited: + return { + "cited_precedents": 0, + "feedback_rows_inserted": 0, + "searches_matched": 0, + } + + pool = await db.get_pool() + inserted = 0 + matched_searches: set[str] = set() + + async with pool.acquire() as conn: + # For each cited precedent, find all logs where it appeared in + # top_case_law_ids for this case, and record its rank. + for clid in cited: + rows = await conn.fetch( + """ + SELECT id, top_case_law_ids + FROM search_logs + WHERE case_id = $1 + AND top_case_law_ids IS NOT NULL + AND $2 = ANY(top_case_law_ids) + """, + case_uuid, + clid, + ) + for row in rows: + top_ids = row["top_case_law_ids"] or [] + # asyncpg returns uuid[] as list[UUID] + try: + rank = top_ids.index(clid) + 1 + except ValueError: + continue + result = await conn.execute( + """ + INSERT INTO search_relevance_feedback ( + search_log_id, case_law_id, rank, + relevance_score, feedback_source + ) VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (search_log_id, case_law_id, feedback_source) + DO NOTHING + """, + row["id"], + clid, + rank, + relevance_score, + feedback_source, + ) + # ``execute`` returns 'INSERT 0 1' or 'INSERT 0 0' for + # the no-op path; count only the writes. + if result.endswith(" 1"): + inserted += 1 + matched_searches.add(str(row["id"])) + + return { + "cited_precedents": len(cited), + "feedback_rows_inserted": inserted, + "searches_matched": len(matched_searches), + } + + +async def infer_relevance_for_all_finalized_cases(limit: int | None = None) -> dict: + """Bulk-run auto-inference for every case whose draft is final/exported. + + Useful for back-filling after V18 schema lands and a few decisions + have already been written. Skips cases with no cited precedents + silently (they contribute zero to the totals). + """ + pool = await db.get_pool() + sql = """ + SELECT DISTINCT c.id + FROM cases c + JOIN decisions d ON d.case_id = c.id + WHERE c.status IN ('final', 'exported') + """ + if limit is not None and limit > 0: + sql += " LIMIT $1" + async with pool.acquire() as conn: + rows = await conn.fetch(sql, *([limit] if limit else [])) + + totals = { + "cases_processed": 0, + "cited_precedents": 0, + "feedback_rows_inserted": 0, + "searches_matched": 0, + } + for r in rows: + stats = await infer_relevance_from_citations(r["id"]) + totals["cases_processed"] += 1 + totals["cited_precedents"] += stats["cited_precedents"] + totals["feedback_rows_inserted"] += stats["feedback_rows_inserted"] + totals["searches_matched"] += stats["searches_matched"] + return totals diff --git a/mcp-server/src/legal_mcp/tools/precedent_library.py b/mcp-server/src/legal_mcp/tools/precedent_library.py index 6bfd30a..2e869b0 100644 --- a/mcp-server/src/legal_mcp/tools/precedent_library.py +++ b/mcp-server/src/legal_mcp/tools/precedent_library.py @@ -18,9 +18,10 @@ the chair approves them — per project review policy. from __future__ import annotations import json +import time from uuid import UUID -from legal_mcp.services import db, precedent_library +from legal_mcp.services import db, precedent_library, telemetry def _ok(payload) -> str: @@ -260,8 +261,10 @@ async def search_precedent_library( """ if not query or len(query.strip()) < 2: return json.dumps([], ensure_ascii=False) + q = query.strip() + t0 = time.perf_counter() results = await precedent_library.search_library( - query=query.strip(), + query=q, practice_area=practice_area, court=court, precedent_level=precedent_level, @@ -271,6 +274,15 @@ async def search_precedent_library( limit=limit, include_halachot=include_halachot, ) + elapsed_ms = int((time.perf_counter() - t0) * 1000) + telemetry.log_search_bg( + search_type="precedent_library", + query=q, + results=results, + duration_ms=elapsed_ms, + practice_area=practice_area or None, + user_agent="unknown", + ) return _ok(results) diff --git a/mcp-server/src/legal_mcp/tools/search.py b/mcp-server/src/legal_mcp/tools/search.py index 4a0b584..0288cbb 100644 --- a/mcp-server/src/legal_mcp/tools/search.py +++ b/mcp-server/src/legal_mcp/tools/search.py @@ -4,9 +4,10 @@ from __future__ import annotations import json import logging +import time from uuid import UUID -from legal_mcp.services import db, embeddings, hybrid_search +from legal_mcp.services import db, embeddings, hybrid_search, telemetry logger = logging.getLogger(__name__) @@ -30,11 +31,16 @@ async def search_decisions( case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק """ # Auto-resolve practice_area from case_number if available + resolved_case_id: UUID | None = None if case_number and not practice_area: case = await db.get_case_by_number(case_number) if case: practice_area = case.get("practice_area") or "" appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "") + try: + resolved_case_id = UUID(case["id"]) + except (KeyError, ValueError, TypeError): + resolved_case_id = None if not practice_area: logger.warning( @@ -43,6 +49,7 @@ async def search_decisions( ) query_emb = await embeddings.embed_query(query) + t0 = time.perf_counter() results = await hybrid_search.search_documents_hybrid( query=query, query_text_embedding=query_emb, @@ -51,6 +58,16 @@ async def search_decisions( practice_area=practice_area or None, appeal_subtype=appeal_subtype or None, ) + elapsed_ms = int((time.perf_counter() - t0) * 1000) + telemetry.log_search_bg( + search_type="decisions", + query=query, + results=results, + duration_ms=elapsed_ms, + practice_area=practice_area or None, + case_id=resolved_case_id, + user_agent="unknown", + ) if not results: return "לא נמצאו תוצאות." @@ -87,13 +104,24 @@ async def search_case_documents( if not case: return f"תיק {case_number} לא נמצא." + case_uuid = UUID(case["id"]) query_emb = await embeddings.embed_query(query) # Restricted to case_id — practice_area filter would be redundant. + t0 = time.perf_counter() results = await hybrid_search.search_documents_hybrid( query=query, query_text_embedding=query_emb, limit=limit, - case_id=UUID(case["id"]), + case_id=case_uuid, + ) + elapsed_ms = int((time.perf_counter() - t0) * 1000) + telemetry.log_search_bg( + search_type="case_documents", + query=query, + results=results, + duration_ms=elapsed_ms, + case_id=case_uuid, + user_agent="unknown", ) if not results: @@ -130,11 +158,16 @@ async def find_similar_cases( appeal_subtype: סוג ערר לסינון case_number: אם סופק, ה-practice_area/subtype יוסקו אוטומטית מהתיק """ + resolved_case_id: UUID | None = None if case_number and not practice_area: case = await db.get_case_by_number(case_number) if case: practice_area = case.get("practice_area") or "" appeal_subtype = appeal_subtype or (case.get("appeal_subtype") or "") + try: + resolved_case_id = UUID(case["id"]) + except (KeyError, ValueError, TypeError): + resolved_case_id = None if not practice_area: logger.warning( @@ -145,6 +178,7 @@ async def find_similar_cases( query_emb = await embeddings.embed_query(description) # Even with rerank we ask for ``limit*3`` so the dedup-by-case # step downstream still has enough rows to pick the best per case. + t0 = time.perf_counter() results = await hybrid_search.search_documents_hybrid( query=description, query_text_embedding=query_emb, @@ -152,6 +186,16 @@ async def find_similar_cases( practice_area=practice_area or None, appeal_subtype=appeal_subtype or None, ) + elapsed_ms = int((time.perf_counter() - t0) * 1000) + telemetry.log_search_bg( + search_type="similar_cases", + query=description, + results=results, + duration_ms=elapsed_ms, + practice_area=practice_area or None, + case_id=resolved_case_id, + user_agent="unknown", + ) if not results: return "לא נמצאו תיקים דומים." @@ -213,6 +257,7 @@ async def search_internal_decisions( # expansion more useful. primary_limit = limit if not include_cited_by else max(limit, limit * 2) + t0 = time.perf_counter() results = await int_svc.search_internal( query, practice_area=practice_area, @@ -222,6 +267,15 @@ async def search_internal_decisions( limit=primary_limit, include_halachot=include_halachot, ) + elapsed_ms = int((time.perf_counter() - t0) * 1000) + telemetry.log_search_bg( + search_type="internal_decisions", + query=query, + results=results, + duration_ms=elapsed_ms, + practice_area=practice_area or None, + user_agent="unknown", + ) if not results: return "לא נמצאו החלטות ועדת ערר רלוונטיות." diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index b6a80e0..94baf6b 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -28,9 +28,13 @@ | `voyage_rerank_corpus_poc.py` | python | POC #5 — voyage-3 vs rerank-2 על קורפוס מלא (785 docs). הכרעה: +4.5% mean@3 כללי, +11.6% על P queries (practical) | בנצ'מרק חד-פעמי, אישר את שלב B | | `multimodal_backfill.py` | python | Backfill voyage-multimodal-3 page embeddings על מסמכי תיקים קיימים. idempotent (skips by default), forces `MULTIMODAL_ENABLED=true` ל-run, רץ מהקונטיינר. שלב C — ראה `docs/voyage-upgrades-plan.md` | ידני per-case (`python multimodal_backfill.py 8174-24 8137-24`) | | `backfill_chunk_pages.py` | python | Backfill `page_number` ב-`document_chunks` קיימים. legacy chunker לא tracked עמודים → `page_number=NULL` חוסם boost של multimodal hybrid (text+image join על אותו עמוד). re-extracts כל PDF (re-OCR אם צריך, ~$0.0015/page), מחשב page_offsets, ומעדכן chunks. idempotent | ידני per-case (`python backfill_chunk_pages.py 8174-24 8137-24`) | +| `audit_corpus_integrity.py` | python | בדיקה תקופתית של עקביות הקורפוס — 3 בדיקות SQL read-only על `case_law` ו-`cases`: (A) `external_upload` עם prefix פנימי `ערר`/`בל"מ`; (B) `internal_committee` חסר `chair_name`/`district`; (C) `cases.practice_area` מחוץ ל-{`rishuy_uvniya`, `betterment_levy`, `compensation_197`, `''`}. כותב log מצטבר ל-`data/logs/corpus_integrity_audit.log` ובמצב הפרות שולח wakeup ל-CEO ב-Paperclip (best-effort, רק אם `PAPERCLIP_API_URL`+`PAPERCLIP_API_KEY` מוגדרים). דגל: `--no-notify`. Idempotent, יוצא 0. **Cron יומי 07:00**: `0 7 * * * /home/chaim/legal-ai/mcp-server/.venv/bin/python /home/chaim/legal-ai/scripts/audit_corpus_integrity.py` | `0 7 * * *` (cron) | | `backfill_legal_arguments.py` | python | Backfill `legal_arguments` לתיקים עם `claims` קיימים (TaskMaster #36). מקבץ פרופוזיציות גולמיות לטיעונים משפטיים מובחנים (~6-12 לכל צד) דרך `argument_aggregator.aggregate_claims_to_arguments` (Claude CLI). תומך `--dry-run`/`--apply`/`--force`/`--case ...`. **חייב לרוץ מהמכונה המקומית** (לא קונטיינר) — `claude_session` דורש Claude CLI | ידני per-case (`python scripts/backfill_legal_arguments.py --apply --case 1017-03-26`) | | `upload_blam_decisions.py` | python | חד-פעמי (2026-05-26) — העלאת 2 החלטות בל"מ ל-`case_law` (8126/24 סופר נוח, 8047/23 הרנון) דרך `ingest_internal_decision` ישיר, עוקף MCP server שטרם נטען מחדש אחרי הוספת `proceeding_type`. **לא להריץ שוב** | חד-פעמי — להעביר ל-`.archive/` בהזדמנות | | `process_pending_blam.py` | python | חד-פעמי (2026-05-26) — הרצת metadata + halacha extraction על 2 החלטות בל"מ שעלו ב-`upload_blam_decisions.py`. עוקף MCP (אותו טעם). **לא להריץ שוב** | חד-פעמי — להעביר ל-`.archive/` בהזדמנות | +| `compute_ndcg.py` | python | חישוב nDCG@10 על `search_relevance_feedback` (TaskMaster #50, Stage C). aggregation לפי `search_type` ולפי שבוע, כולל top-cited case_law ו-coverage %. דגלים: `--k 10`, `--weeks 12`, `--pretty`. read-only, פלט JSON. משמש גם את `GET /api/admin/rag-metrics` (מיובא inline) — שינוי חתימה ב-`compute()` ישבור את ה-endpoint | ידני / cron עתידי לדיווח שבועי | +| `backfill_multimodal_precedents.py` | python | Backfill voyage-multimodal-3 page embeddings על רשומות `case_law` (external_upload + internal_committee) שחסרות `precedent_image_embeddings`. בונה אינדקס קבצים מ-`data/precedent-library/` ו-`data/internal-decisions/`, מנסה התאמה לפי tokens של מספרי תיק (כולל parts-match לפורמטים שונים של Nevo doc-id). מדלג על רשומות בלי קובץ-מקור או עם MD בלבד (PyMuPDF לא מרנדר MD). תומך `--dry-run` (default) / `--apply` / `--only external_upload\|internal_committee` / `--limit N`. רץ בקונטיינר (יש `/data` + Voyage env). **הופעל 2026-05-26**: 70 חסרים → 26 backfilled (503 pages, ~$0.21 voyage tokens), 44 אין-קובץ-מקור. ניתן להריץ שוב אחרי שיועלו עוד PDF/DOCX לספרייה | ידני | +| `monitor_halacha_quality.py` | python | מנטר איכות חילוץ הלכות. בודק drift של `avg(confidence)` בין baseline היסטורי לחלון אחרון. מחזיר JSON מטריקות + alert ב-stderr אם drift > threshold (ברירת מחדל 5%). 2 סדרות: trusted (approved+published) ו-all_extracted. תומך `--window N` / `--threshold X` / `--min-sample N` / `--silent` / `--exit-on-alert`. רץ ב-container או מקומית עם `mcp-server/.venv` (אין תלות ב-LLM, רק SQL). **תזמון מומלץ**: `0 8 * * 1` (יום ראשון 08:00, שבועי) | `0 8 * * 1` (לתזמן) | ## תיקיית `.archive/` — סקריפטים שהושלמו diff --git a/scripts/audit_corpus_integrity.py b/scripts/audit_corpus_integrity.py new file mode 100644 index 0000000..6dee350 --- /dev/null +++ b/scripts/audit_corpus_integrity.py @@ -0,0 +1,281 @@ +"""Periodic corpus-integrity audit. + +Runs a set of read-only SQL checks against the legal-ai DB to detect rows +that violate domain constraints which are *not* enforced by the schema +(or were added after the constraint was put in place). + +Checks performed: + + A. ``case_law`` rows with ``source_kind='external_upload'`` whose + ``case_number`` starts with the Hebrew prefixes ``ערר`` / ``בל"מ``. + Internal committee decisions belong to ``source_kind='internal_committee'``. + + B. ``case_law`` rows with ``source_kind='internal_committee'`` that + lack a ``chair_name`` and/or ``district``. Internal decisions must + carry both. + + C. ``cases`` rows with a ``practice_area`` outside the closed set + {``rishuy_uvniya``, ``betterment_levy``, ``compensation_197``, ``''``}. + +Output: + + * Appends a timestamped block to ``data/logs/corpus_integrity_audit.log``. + * If hits are found AND env ``PAPERCLIP_API_URL`` + ``PAPERCLIP_API_KEY`` + are set, posts a CEO wakeup comment via ``POST /api/agents/{ceo}/wakeup`` + (best-effort, never fails the script). + * Always exits 0 unless an unexpected error occurs (so cron stays quiet). + +Cron suggestion (daily 07:00): + + 0 7 * * * /home/chaim/legal-ai/mcp-server/.venv/bin/python \\ + /home/chaim/legal-ai/scripts/audit_corpus_integrity.py + +Idempotent. Read-only on the DB. +""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +# Load ~/.env so POSTGRES_* / PAPERCLIP_* are picked up when run from cron. +ENV_PATH = os.path.expanduser("~/.env") +if os.path.isfile(ENV_PATH): + with open(ENV_PATH, encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + k, v = line.split("=", 1) + os.environ.setdefault(k, v) + +import asyncpg # noqa: E402 + +try: + import httpx # noqa: E402 +except ImportError: # httpx is part of the legal-ai venv; not required for DB checks + httpx = None # type: ignore[assignment] + + +REPO_ROOT = Path(__file__).resolve().parent.parent +LOG_PATH = REPO_ROOT / "data" / "logs" / "corpus_integrity_audit.log" + +CHECK_A_SQL = ( + "SELECT id, case_number FROM case_law " + "WHERE source_kind = 'external_upload' AND case_number ~ '^ערר|^בל\"מ' " + "ORDER BY case_number" +) +CHECK_B_SQL = ( + "SELECT id, case_number, chair_name, district FROM case_law " + "WHERE source_kind = 'internal_committee' " + "AND (chair_name IS NULL OR chair_name = '' " + " OR district IS NULL OR district = '') " + "ORDER BY case_number" +) +CHECK_C_SQL = ( + "SELECT id, case_number, practice_area FROM cases " + "WHERE practice_area IS NOT NULL " + "AND practice_area NOT IN ('rishuy_uvniya', 'betterment_levy', " + " 'compensation_197', '') " + "ORDER BY case_number" +) + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger("audit_corpus_integrity") + + +def _pg_url() -> str: + """Resolve POSTGRES URL from env, falling back to discrete vars.""" + url = os.environ.get("POSTGRES_URL") + if url: + return url + pg_host = os.environ.get("POSTGRES_HOST", "127.0.0.1") + pg_port = int(os.environ.get("POSTGRES_PORT", "5433")) + pg_user = os.environ.get("POSTGRES_USER", "legal_ai") + pg_pw = os.environ.get("POSTGRES_PASSWORD", "") + pg_db = os.environ.get("POSTGRES_DB", "legal_ai") + if not pg_pw: + raise SystemExit("POSTGRES_PASSWORD / POSTGRES_URL not set") + return f"postgres://{pg_user}:{pg_pw}@{pg_host}:{pg_port}/{pg_db}" + + +async def _run_check(conn: asyncpg.Connection, sql: str) -> list[dict]: + rows = await conn.fetch(sql) + return [dict(r) for r in rows] + + +async def _resolve_ceo_agent_id() -> str | None: + """Best-effort: look up the CEO agent UUID for CMP via the API. + + Returns None if PAPERCLIP env is missing or the lookup fails. + """ + base_url = os.environ.get("PAPERCLIP_API_URL") + api_key = os.environ.get("PAPERCLIP_API_KEY") + if not (base_url and api_key and httpx is not None): + return None + try: + async with httpx.AsyncClient(timeout=5.0) as client: + r = await client.get( + f"{base_url}/api/agents", + headers={"Authorization": f"Bearer {api_key}"}, + ) + r.raise_for_status() + payload = r.json() + items = payload if isinstance(payload, list) else payload.get("items", []) + for item in items: + # Look for a CMP-side CEO (master); the CMPA mirror has a different id. + title = (item.get("title") or "").lower() + role = (item.get("role") or "").lower() + if "ceo" in title or "ceo" in role or "מנכ" in title: + return item.get("id") + except Exception as e: + logger.warning("CEO lookup failed: %s", e) + return None + + +async def _notify_ceo(summary: str) -> bool: + """Post a wakeup comment to the CEO agent. Returns True on best-effort success.""" + base_url = os.environ.get("PAPERCLIP_API_URL") + api_key = os.environ.get("PAPERCLIP_API_KEY") + if not (base_url and api_key and httpx is not None): + logger.info("Paperclip env not set — skipping CEO wakeup") + return False + ceo_id = await _resolve_ceo_agent_id() + if not ceo_id: + logger.info("Could not resolve CEO agent id — skipping wakeup") + return False + try: + async with httpx.AsyncClient(timeout=5.0) as client: + r = await client.post( + f"{base_url}/api/agents/{ceo_id}/wakeup", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json={ + "source": "automation", + "triggerDetail": "audit_corpus_integrity", + "reason": "corpus integrity audit found violations", + "payload": {"summary": summary}, + }, + ) + r.raise_for_status() + logger.info("Notified CEO (agent_id=%s)", ceo_id) + return True + except Exception as e: + logger.warning("CEO wakeup failed: %s", e) + return False + + +def _format_report( + a_hits: list[dict], + b_hits: list[dict], + c_hits: list[dict], + ts: datetime, +) -> str: + parts: list[str] = [] + parts.append(f"=== Corpus integrity audit @ {ts.isoformat()} ===") + parts.append("") + parts.append( + f"Check A (case_law external_upload with internal-style " + f"case_number prefix): {len(a_hits)} hit(s)" + ) + for row in a_hits[:50]: + parts.append(f" - id={row['id']} case_number={row['case_number']!r}") + if len(a_hits) > 50: + parts.append(f" ... ({len(a_hits) - 50} more truncated)") + parts.append("") + parts.append( + f"Check B (case_law internal_committee missing chair_name/district): " + f"{len(b_hits)} hit(s)" + ) + for row in b_hits[:50]: + parts.append( + f" - id={row['id']} case_number={row['case_number']!r} " + f"chair_name={row.get('chair_name')!r} district={row.get('district')!r}" + ) + if len(b_hits) > 50: + parts.append(f" ... ({len(b_hits) - 50} more truncated)") + parts.append("") + parts.append( + f"Check C (cases.practice_area outside closed set): {len(c_hits)} hit(s)" + ) + for row in c_hits[:50]: + parts.append( + f" - id={row['id']} case_number={row['case_number']!r} " + f"practice_area={row.get('practice_area')!r}" + ) + if len(c_hits) > 50: + parts.append(f" ... ({len(c_hits) - 50} more truncated)") + parts.append("") + return "\n".join(parts) + + +async def main(args: argparse.Namespace) -> int: + pg_url = _pg_url() + conn = await asyncpg.connect(pg_url) + try: + a_hits = await _run_check(conn, CHECK_A_SQL) + b_hits = await _run_check(conn, CHECK_B_SQL) + c_hits = await _run_check(conn, CHECK_C_SQL) + finally: + await conn.close() + + total = len(a_hits) + len(b_hits) + len(c_hits) + ts = datetime.now(timezone.utc) + report = _format_report(a_hits, b_hits, c_hits, ts) + + # Always write to log (creates dir + file if missing). + LOG_PATH.parent.mkdir(parents=True, exist_ok=True) + with LOG_PATH.open("a", encoding="utf-8") as f: + f.write(report) + f.write("\n") + + # Echo to stdout so cron mail / manual run shows the result. + print(report) + + if total == 0: + logger.info("clean: no integrity violations found") + return 0 + + logger.warning( + "found %d total violation(s) (A=%d, B=%d, C=%d)", + total, len(a_hits), len(b_hits), len(c_hits), + ) + + if args.notify: + summary_lines = [ + "ה-audit היומי על הקורפוס מצא הפרות:", + f"- Check A (external_upload עם prefix פנימי): {len(a_hits)}", + f"- Check B (internal_committee חסר chair/district): {len(b_hits)}", + f"- Check C (cases.practice_area לא תקין): {len(c_hits)}", + "", + f"פירוט מלא: {LOG_PATH}", + ] + await _notify_ceo("\n".join(summary_lines)) + + return 0 + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--no-notify", + dest="notify", + action="store_false", + help="Don't post a CEO wakeup even if hits are found", + ) + parser.set_defaults(notify=True) + args = parser.parse_args() + try: + rc = asyncio.run(main(args)) + except KeyboardInterrupt: + sys.exit(130) + sys.exit(rc) diff --git a/scripts/backfill_multimodal_precedents.py b/scripts/backfill_multimodal_precedents.py new file mode 100644 index 0000000..a25d6f1 --- /dev/null +++ b/scripts/backfill_multimodal_precedents.py @@ -0,0 +1,475 @@ +"""Multimodal backfill for precedent library — fills voyage-multimodal-3 +page embeddings for case_law rows (external_upload + internal_committee) +that don't have them yet. + +Background +---------- +77 (in practice 70 today, 2026-05-26) case_law rows were ingested before +``MULTIMODAL_ENABLED=true`` was permanently turned on, so they only have +text chunks and no per-page image embeddings. The retrieval blend is +hybrid (text + image), so the image side of the blend silently degrades +for these rows. + +Strategy +-------- +Most rows have no PDF (they were ingested via text or are MD-only). The +script: + +1. Lists every case_law row with ``source_kind in (external_upload, + internal_committee)`` that is missing image embeddings. +2. Tries to find a staged file by matching token-rich substrings of the + case_number against filenames under ``data/precedent-library/`` and + ``data/internal-decisions/``. +3. If the file is a PDF or DOCX (both renderable by PyMuPDF/fitz), + renders pages at ``MULTIMODAL_DPI``, embeds via voyage-multimodal-3 + in batches of 50, and stores rows into ``precedent_image_embeddings``. +4. Skips rows whose only candidate file is .md (PyMuPDF can't render + markdown) or rows with no staged file. + +Designed to run inside the FastAPI/MCP container (where ``/data/...`` +exists and Voyage env vars are present). Locally, it falls back to +``/home/chaim/legal-ai/data/...`` via ``_resolve_local_path``. + +Usage:: + + # Inside container (Coolify): + docker exec -it /opt/api/.venv/bin/python \\ + /opt/api/scripts/backfill_multimodal_precedents.py --dry-run + # then: + docker exec -it /opt/api/.venv/bin/python \\ + /opt/api/scripts/backfill_multimodal_precedents.py --apply + +Notes +----- +- Token cost: voyage-multimodal-3 averages ~3-4K tokens per dense legal + page. 70 rows * ~30 pages avg = ~2,100 pages = ~7M tokens ≈ $0.70. +- Estimate-only mode (``--dry-run``) prints the matched files and + page counts without calling Voyage or touching the DB. +- Idempotent: per-record DELETE+INSERT inside + ``store_precedent_image_embeddings``, but the outer loop also + skips rows that already have rows in ``precedent_image_embeddings``. +""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import re +import sys +import time +from pathlib import Path +from uuid import UUID + +import fitz # PyMuPDF + + +def _setup_paths(): + """Ensure mcp-server src is on path even when run as a standalone script. + + Works both from host (``/home/chaim/legal-ai/scripts/...``) and from + inside the container (``/app/mcp-server/src``). + """ + here = Path(__file__).resolve().parent + candidates = [ + here.parent / "mcp-server" / "src", # host + Path("/app/mcp-server/src"), # container + ] + for c in candidates: + if c.is_dir() and str(c) not in sys.path: + sys.path.insert(0, str(c)) + + +_setup_paths() +# Force multimodal on for this script regardless of env — backfill is +# the entire point. The deploy-time default stays whatever Coolify sets. +os.environ["MULTIMODAL_ENABLED"] = "true" + +from legal_mcp import config # noqa: E402 +from legal_mcp.services import db, embeddings, extractor # noqa: E402 + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger("backfill_multimodal_precedents") + + +# ───────────────────────── file matching ───────────────────────── + +# Roots to search for staged precedent files. Both paths are tried; the +# first that exists wins. ``/data/`` is the in-container mount; +# ``/home/chaim/legal-ai/data/`` is the host path. +SEARCH_ROOTS = [ + Path("/data/precedent-library"), + Path("/data/internal-decisions"), + Path("/home/chaim/legal-ai/data/precedent-library"), + Path("/home/chaim/legal-ai/data/internal-decisions"), +] + +# Extensions we can render with PyMuPDF (fitz). MD and TXT cannot be +# rendered as page images, so we skip them. +RENDERABLE_EXTS = {".pdf", ".docx"} + + +# Token-extraction regex: only tokens that contain a slash or hyphen +# (real case-number kernels like "8064/20" or "25226-04-25"). We +# deliberately exclude pure numeric runs like "2011" (which is just a +# year in "(נבו 5.4.2011)") to avoid false-positive matches against +# unrelated filenames that happen to contain the same year. +_NUMBER_TOKEN = re.compile(r"\d+[-/]\d+(?:[-/]\d+)*") + + +def _extract_number_tokens(case_number: str) -> list[str]: + """Pull numeric kernels out of a Hebrew case_number string. + + Only returns tokens containing a slash or hyphen (real case-number + kernels), so years like "2011" and "2024" don't leak through and + falsely match filenames. + + >>> _extract_number_tokens('בר"מ 25226-04-25 הוועדה') + ['25226-04-25'] + >>> _extract_number_tokens('ערר 8064/20 חברת') + ['8064/20'] + >>> _extract_number_tokens('עע"מ 10089/07 (נבו 5.4.2011)') + ['10089/07', '5.4.2011'] # date stays; but '5.4.2011' is hyphenless after normalize → no match against random filenames + """ + # filter out date-shaped tokens (dotted) by additional check — only + # keep tokens whose form is N/N or N-N-..., not N.N.N + tokens = _NUMBER_TOKEN.findall(case_number) + return [t for t in tokens if "." not in t] + + +def _normalize_for_match(s: str) -> str: + """Lowercase + strip whitespace/punct for filename matching.""" + return re.sub(r"[\s/_-]+", "", s.lower()) + + +def _build_file_index() -> dict[str, list[Path]]: + """Walk SEARCH_ROOTS and return {normalized_filename: [paths]}. + + Only renderable extensions are included. + """ + idx: dict[str, list[Path]] = {} + for root in SEARCH_ROOTS: + if not root.is_dir(): + continue + for p in root.rglob("*"): + if not p.is_file(): + continue + if p.suffix.lower() not in RENDERABLE_EXTS: + continue + if "thumbnails" in p.parts: + continue + key = _normalize_for_match(p.name) + idx.setdefault(key, []).append(p) + return idx + + +def _digit_parts(token: str) -> list[str]: + """Split a token like '14306-09-23' into ['14306','09','23'].""" + return [p for p in re.split(r"[-/]", token) if p] + + +def _find_file_for_case_number(case_number: str, file_index: dict[str, list[Path]]) -> Path | None: + """Best-effort match a case_number → staged file path. + + Two strategies: + + 1. **Direct contiguous match** — token normalized (e.g. "8064/20" + → "806420") appears as substring of the filename normalized. + 2. **Parts-match** — every digit part of the token appears + somewhere in the filename (handles reordered formats like + case_number "14306-09-23" matched to "MM-23-09-14306-967.docx", + where Nevo's case_number ordering differs from the legal + template's filename ordering). Only accepts when the longest + part has at least 4 digits — that filters out matches where + only short pieces (year fragments) overlap. + + Returns the first match found, preferring PDFs over DOCX. + """ + tokens = _extract_number_tokens(case_number) + if not tokens: + return None + + candidates: list[Path] = [] + for token in tokens: + # Strategy 1: contiguous + normalized_token = _normalize_for_match(token) + token_hyphenated = token.replace("/", "-") + normalized_hyphenated = _normalize_for_match(token_hyphenated) + # Strategy 2: parts + parts = _digit_parts(token) + longest_part = max((len(p) for p in parts), default=0) + + for normalized_name, paths in file_index.items(): + if normalized_token in normalized_name or normalized_hyphenated in normalized_name: + candidates.extend(paths) + continue + # Parts-match requires longest part >= 4 digits AND all parts present + if longest_part >= 4 and parts and all(p in normalized_name for p in parts): + candidates.extend(paths) + + if not candidates: + return None + + # Dedupe while preserving order + seen = set() + unique = [] + for p in candidates: + if p not in seen: + seen.add(p) + unique.append(p) + + # Prefer PDFs over DOCX (PDF rendering is more reliable for embedded fonts/images) + pdf = next((p for p in unique if p.suffix.lower() == ".pdf"), None) + return pdf or unique[0] + + +# ───────────────────────── backfill core ───────────────────────── + + +PRECEDENT_LIBRARY_THUMBNAILS = Path(config.DATA_DIR) / "precedent-library" / "thumbnails" + + +async def _embed_one_precedent(case_law_id: UUID, src_path: Path) -> dict: + """Render + embed + store image embeddings for a single precedent. + + Mirrors ``precedent_library._embed_precedent_pages`` but takes any + fitz-renderable file (PDF or DOCX). + """ + thumb_dir = PRECEDENT_LIBRARY_THUMBNAILS / str(case_law_id) + # PyMuPDF reads DOCX natively (uses its own MuPDF backend). We use + # the same renderer as the live pipeline for consistency. + rendered = await asyncio.to_thread( + extractor.render_pages_for_multimodal, + src_path, + config.MULTIMODAL_DPI, + config.MULTIMODAL_THUMB_DPI, + thumb_dir, + ) + if not rendered: + return {"pages_embedded": 0, "status": "no_pages"} + + images = [pil for pil, _ in rendered] + thumbs = [t for _, t in rendered] + + img_embs = await embeddings.embed_images(images) + + page_records = [] + for i, (emb, thumb) in enumerate(zip(img_embs, thumbs)): + rel_thumb = None + if thumb is not None: + try: + rel_thumb = str(thumb.relative_to(config.DATA_DIR)) + except ValueError: + rel_thumb = str(thumb) + page_records.append({ + "page_number": i + 1, + "embedding": emb, + "image_thumbnail_path": rel_thumb, + }) + + stored = await db.store_precedent_image_embeddings( + case_law_id, page_records, model_name=config.MULTIMODAL_MODEL, + ) + return {"pages_embedded": stored, "status": "ok"} + + +async def _scan_missing_records() -> list[dict]: + pool = await db.get_pool() + rows = await pool.fetch( + """ + SELECT id, case_number, source_kind, length(full_text) AS text_len + FROM case_law cl + WHERE NOT EXISTS ( + SELECT 1 FROM precedent_image_embeddings ppi + WHERE ppi.case_law_id = cl.id + ) + AND cl.source_kind IN ('external_upload', 'internal_committee') + ORDER BY cl.source_kind, cl.case_number + """ + ) + return [ + { + "id": UUID(str(r["id"])), + "case_number": r["case_number"], + "source_kind": r["source_kind"], + "text_len": r["text_len"], + } + for r in rows + ] + + +async def backfill_all( + *, + dry_run: bool, + limit: int | None = None, + only_source_kind: str | None = None, +) -> dict: + """Main entrypoint — scan, match, render, embed, store.""" + await db.init_schema() + records = await _scan_missing_records() + if only_source_kind: + records = [r for r in records if r["source_kind"] == only_source_kind] + if limit: + records = records[:limit] + + file_index = _build_file_index() + logger.info("Indexed %d renderable files under %s", + sum(len(v) for v in file_index.values()), + ", ".join(str(r) for r in SEARCH_ROOTS if r.is_dir())) + + summary = { + "scanned": len(records), + "matched": 0, + "no_match": 0, + "embedded": 0, + "skipped_md_only": 0, + "errors": 0, + "total_pages": 0, + "details": [], + } + + for rec in records: + case_law_id = rec["id"] + case_number = rec["case_number"] + src = _find_file_for_case_number(case_number, file_index) + + if not src: + summary["no_match"] += 1 + summary["details"].append({ + "case_law_id": str(case_law_id), + "case_number": case_number, + "source_kind": rec["source_kind"], + "status": "no_match", + }) + logger.info(" NO MATCH: %s", case_number[:80]) + continue + + # Probe page count without rendering (cheap) + try: + doc = fitz.open(str(src)) + page_count = len(doc) + doc.close() + except Exception as e: + summary["errors"] += 1 + summary["details"].append({ + "case_law_id": str(case_law_id), + "case_number": case_number, + "matched_file": str(src), + "status": "open_error", + "error": str(e), + }) + logger.warning(" OPEN ERROR for %s: %s", case_number[:60], e) + continue + + summary["matched"] += 1 + summary["total_pages"] += page_count + logger.info(" MATCHED: %s -> %s (%d pages)", + case_number[:60], src.name, page_count) + + if dry_run: + summary["details"].append({ + "case_law_id": str(case_law_id), + "case_number": case_number, + "matched_file": str(src), + "pages": page_count, + "status": "would_embed", + }) + continue + + # Actually embed + store + t0 = time.time() + try: + result = await _embed_one_precedent(case_law_id, src) + elapsed = time.time() - t0 + summary["embedded"] += 1 + summary["details"].append({ + "case_law_id": str(case_law_id), + "case_number": case_number, + "matched_file": str(src), + "pages": page_count, + "elapsed_sec": round(elapsed, 1), + "status": "ok", + **result, + }) + logger.info(" EMBEDDED %d pages in %.1fs", result["pages_embedded"], elapsed) + except Exception as e: + summary["errors"] += 1 + summary["details"].append({ + "case_law_id": str(case_law_id), + "case_number": case_number, + "matched_file": str(src), + "status": "embed_error", + "error": str(e), + }) + logger.exception(" EMBED ERROR for %s", case_number[:60]) + + return summary + + +# ───────────────────────── CLI ───────────────────────── + + +def main(): + parser = argparse.ArgumentParser( + description="Backfill voyage-multimodal-3 embeddings for case_law records " + "(external_upload + internal_committee) missing them.", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Only scan + match; do not call Voyage or write to DB.", + ) + parser.add_argument( + "--apply", action="store_true", + help="Render, embed, and store. Implies not --dry-run.", + ) + parser.add_argument( + "--limit", type=int, default=None, + help="Max number of records to process (debugging).", + ) + parser.add_argument( + "--only", choices=["external_upload", "internal_committee"], default=None, + help="Restrict to a single source_kind.", + ) + args = parser.parse_args() + + if not args.apply and not args.dry_run: + # Default to dry_run for safety. + args.dry_run = True + + logger.info( + "Mode=%s MULTIMODAL_MODEL=%s DPI=%d THUMB_DPI=%d", + "DRY-RUN" if args.dry_run else "APPLY", + config.MULTIMODAL_MODEL, config.MULTIMODAL_DPI, config.MULTIMODAL_THUMB_DPI, + ) + + summary = asyncio.run( + backfill_all( + dry_run=args.dry_run, + limit=args.limit, + only_source_kind=args.only, + ) + ) + + print() + print("=" * 60) + print("BACKFILL SUMMARY") + print("=" * 60) + print(f" scanned: {summary['scanned']}") + print(f" matched: {summary['matched']}") + print(f" no_match: {summary['no_match']}") + print(f" total pages: {summary['total_pages']}") + if args.dry_run: + # Cost estimate: ~3.5K tokens/page * $0.12/1M tokens + est_tokens = summary["total_pages"] * 3500 + est_cost = est_tokens / 1_000_000 * 0.12 + print(f" est. tokens: ~{est_tokens:,} (~${est_cost:.2f})") + else: + print(f" embedded: {summary['embedded']}") + print(f" errors: {summary['errors']}") + + +if __name__ == "__main__": + main() diff --git a/scripts/compute_ndcg.py b/scripts/compute_ndcg.py new file mode 100755 index 0000000..f0c686f --- /dev/null +++ b/scripts/compute_ndcg.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +"""Compute nDCG@10 over the RAG retrieval feedback table (TaskMaster #50). + +Outputs aggregated metrics as JSON: + + { + "generated_at": "2026-05-26T12:34:56+00:00", + "k": 10, + "summary": { + "total_searches_with_feedback": int, + "total_searches_logged": int, + "feedback_coverage_pct": float, + "avg_ndcg_at_10": float | null + }, + "by_search_type": [ + {"search_type": "precedent_library", + "searches_with_feedback": int, + "avg_ndcg_at_10": float | null}, + ... + ], + "by_week": [ + {"week_start": "2026-05-19", + "search_type": "precedent_library", + "searches_with_feedback": int, + "avg_ndcg_at_10": float | null}, + ... + ], + "top_cited_case_law": [ + {"case_law_id": "...", "case_number": "...", + "case_name": "...", "cite_count": int}, + ... + ] + } + +Run: + python ~/legal-ai/scripts/compute_ndcg.py + python ~/legal-ai/scripts/compute_ndcg.py --weeks 12 --k 10 + python ~/legal-ai/scripts/compute_ndcg.py --pretty +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import math +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +import asyncpg + +# Allow running as a standalone script — no package install required. +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT / "mcp-server" / "src")) + + +def _postgres_url() -> str: + """Resolve POSTGRES_URL the same way the MCP server does.""" + url = os.environ.get("POSTGRES_URL") + if url: + return url + user = os.environ.get("POSTGRES_USER", "legal_ai") + pw = os.environ.get("POSTGRES_PASSWORD", "") + host = os.environ.get("POSTGRES_HOST", "127.0.0.1") + port = os.environ.get("POSTGRES_PORT", "5433") + db = os.environ.get("POSTGRES_DB", "legal_ai") + return f"postgres://{user}:{pw}@{host}:{port}/{db}" + + +def dcg(relevances: list[int]) -> float: + """Discounted Cumulative Gain at the length of ``relevances``. + + Uses the "gain = 2^rel - 1" form so high-relevance hits get + significantly more weight than marginal ones — matches the + convention used by most IR papers and TREC-EVAL. + """ + total = 0.0 + for i, rel in enumerate(relevances, start=1): + gain = (2 ** rel) - 1 + total += gain / math.log2(i + 1) + return total + + +def ndcg_at_k(rel_at_rank: dict[int, int], k: int) -> float | None: + """Compute nDCG@k. + + Args: + rel_at_rank: ``{rank (1-based): relevance_score (0..3)}``. + Ranks above ``k`` are ignored. Missing ranks count as 0. + k: cutoff. + + Returns: + nDCG in [0,1], or ``None`` if there's nothing to score + (no relevant hits in the top-k -> IDCG = 0). + """ + actual = [rel_at_rank.get(r, 0) for r in range(1, k + 1)] + if not any(actual): + return None + ideal = sorted(actual, reverse=True) + idcg = dcg(ideal) + if idcg == 0: + return None + return dcg(actual) / idcg + + +async def _fetch_feedback_rows(conn: asyncpg.Connection, weeks: int | None) -> list[dict]: + """Pull all (search_log_id, rank, relevance_score, search_type, created_at) + rows where there's at least one feedback row. + + Restricting to recent weeks keeps the scan cheap on a growing log. + """ + where = "" + params: list = [] + if weeks is not None and weeks > 0: + where = "WHERE sl.created_at >= NOW() - ($1::int * INTERVAL '1 week')" + params.append(weeks) + sql = f""" + SELECT sl.id::text AS search_log_id, + sl.search_type AS search_type, + sl.created_at AS created_at, + srf.rank AS rank, + srf.relevance_score AS relevance_score + FROM search_relevance_feedback srf + JOIN search_logs sl ON sl.id = srf.search_log_id + {where} + """ + rows = await conn.fetch(sql, *params) + return [dict(r) for r in rows] + + +async def _fetch_corpus_totals(conn: asyncpg.Connection, weeks: int | None) -> dict[str, int]: + """Total search_logs count (overall and by type) — used for coverage %.""" + where = "" + params: list = [] + if weeks is not None and weeks > 0: + where = "WHERE created_at >= NOW() - ($1::int * INTERVAL '1 week')" + params.append(weeks) + total_row = await conn.fetchrow( + f"SELECT COUNT(*) AS n FROM search_logs {where}", + *params, + ) + by_type = await conn.fetch( + f"SELECT search_type, COUNT(*) AS n FROM search_logs {where} GROUP BY search_type", + *params, + ) + return { + "_total": int(total_row["n"]) if total_row else 0, + **{r["search_type"]: int(r["n"]) for r in by_type}, + } + + +async def _fetch_top_cited(conn: asyncpg.Connection, limit: int = 20) -> list[dict]: + """Most-cited case_law (from auto-inferred feedback).""" + rows = await conn.fetch( + """ + SELECT cl.id::text AS case_law_id, + cl.case_number AS case_number, + cl.case_name AS case_name, + COUNT(*) AS cite_count + FROM search_relevance_feedback srf + JOIN case_law cl ON cl.id = srf.case_law_id + WHERE srf.feedback_source = 'cited_in_decision' + GROUP BY cl.id, cl.case_number, cl.case_name + ORDER BY COUNT(*) DESC + LIMIT $1 + """, + limit, + ) + return [dict(r) for r in rows] + + +def _aggregate( + feedback_rows: list[dict], + k: int, +) -> tuple[dict[str, float], dict[tuple[str, str], float], int]: + """Group feedback by search_log, compute per-log nDCG, then aggregate + by search_type and by (week, search_type).""" + by_log: dict[str, dict] = {} + for row in feedback_rows: + slid = row["search_log_id"] + if slid not in by_log: + by_log[slid] = { + "search_type": row["search_type"], + "created_at": row["created_at"], + "rels": {}, + } + rank = int(row["rank"]) + if 1 <= rank <= k: + by_log[slid]["rels"][rank] = int(row["relevance_score"]) + + type_ndcg: dict[str, list[float]] = {} + week_ndcg: dict[tuple[str, str], list[float]] = {} + total_logs_with_feedback = 0 + for entry in by_log.values(): + score = ndcg_at_k(entry["rels"], k) + if score is None: + continue + total_logs_with_feedback += 1 + type_ndcg.setdefault(entry["search_type"], []).append(score) + week_start = entry["created_at"].date() + # Round down to ISO week Monday. + week_start = week_start.fromordinal( + week_start.toordinal() - week_start.weekday() + ) + wkey = (week_start.isoformat(), entry["search_type"]) + week_ndcg.setdefault(wkey, []).append(score) + + type_avg = {t: sum(v) / len(v) for t, v in type_ndcg.items() if v} + week_avg = {k_: sum(v) / len(v) for k_, v in week_ndcg.items() if v} + return type_avg, week_avg, total_logs_with_feedback + + +async def compute(weeks: int | None, k: int) -> dict: + conn = await asyncpg.connect(_postgres_url()) + try: + fb_rows = await _fetch_feedback_rows(conn, weeks) + totals = await _fetch_corpus_totals(conn, weeks) + top_cited = await _fetch_top_cited(conn) + finally: + await conn.close() + + type_avg, week_avg, logs_scored = _aggregate(fb_rows, k) + + total_logs = totals.get("_total", 0) + overall_avg = ( + sum(v * len([s for s in type_avg]) for v in []) or None # placeholder + ) + # Recompute overall_avg cleanly: micro-average over all per-log scores. + all_scores: list[float] = [] + for v in [type_avg[t] for t in type_avg]: + # type_avg already collapsed per-type — instead, re-run aggregation + # over fb_rows by reusing the per-log calc, micro-averaged. + pass + # Simpler: redo with per-log granularity for overall mean. + by_log_overall: dict[str, dict[int, int]] = {} + log_to_type: dict[str, str] = {} + for row in fb_rows: + slid = row["search_log_id"] + by_log_overall.setdefault(slid, {}) + rank = int(row["rank"]) + if 1 <= rank <= k: + by_log_overall[slid][rank] = int(row["relevance_score"]) + log_to_type[slid] = row["search_type"] + per_log_scores: list[float] = [] + for slid, rels in by_log_overall.items(): + s = ndcg_at_k(rels, k) + if s is not None: + per_log_scores.append(s) + overall_avg = (sum(per_log_scores) / len(per_log_scores)) if per_log_scores else None + + by_search_type = [] + for t, totals_n in sorted(totals.items()): + if t == "_total": + continue + by_search_type.append({ + "search_type": t, + "searches_logged": totals_n, + "searches_with_feedback": sum( + 1 for slid, tp in log_to_type.items() if tp == t + ), + "avg_ndcg_at_k": round(type_avg[t], 4) if t in type_avg else None, + }) + + by_week = [ + { + "week_start": week, + "search_type": stype, + "avg_ndcg_at_k": round(score, 4), + } + for (week, stype), score in sorted(week_avg.items()) + ] + + return { + "generated_at": datetime.now(timezone.utc).isoformat(), + "k": k, + "window_weeks": weeks, + "summary": { + "total_searches_logged": total_logs, + "total_searches_with_feedback": logs_scored, + "feedback_coverage_pct": ( + round(100 * logs_scored / total_logs, 2) if total_logs else 0.0 + ), + "avg_ndcg_at_k": round(overall_avg, 4) if overall_avg is not None else None, + }, + "by_search_type": by_search_type, + "by_week": by_week, + "top_cited_case_law": [ + {**r, "cite_count": int(r["cite_count"])} for r in top_cited + ], + } + + +def main() -> int: + p = argparse.ArgumentParser(description="Compute nDCG@k from search_relevance_feedback") + p.add_argument("--k", type=int, default=10, help="cutoff (default: 10)") + p.add_argument( + "--weeks", + type=int, + default=None, + help="restrict to the last N weeks (default: all time)", + ) + p.add_argument("--pretty", action="store_true", help="indented JSON output") + args = p.parse_args() + + result = asyncio.run(compute(weeks=args.weeks, k=args.k)) + indent = 2 if args.pretty else None + print(json.dumps(result, ensure_ascii=False, indent=indent, default=str)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/monitor_halacha_quality.py b/scripts/monitor_halacha_quality.py new file mode 100644 index 0000000..e6ac971 --- /dev/null +++ b/scripts/monitor_halacha_quality.py @@ -0,0 +1,278 @@ +"""Halacha extraction quality monitor. + +Tracks ``avg(confidence)`` of halachot extracted by the LLM pipeline +over time and emits an alert when the recent-window average drops more +than a configurable threshold below the lifetime baseline. + +Intended schedule: weekly cron, e.g. ``0 8 * * 1`` (Monday 08:00). + +Output: a single-line JSON payload to stdout (suitable for piping +into ``notify.py`` or a webhook), plus a human-readable alert text +on stderr when drift is detected. + +Usage +----- + +:: + + # Default — weekly window, 5% drop threshold (relative) + python scripts/monitor_halacha_quality.py + + # Custom window/threshold: + python scripts/monitor_halacha_quality.py --window 14 --threshold 0.03 + + # Only emit JSON, no stderr alert: + python scripts/monitor_halacha_quality.py --silent +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + + +def _setup_paths(): + """Make ``legal_mcp`` importable when run from anywhere.""" + here = Path(__file__).resolve().parent + candidates = [ + here.parent / "mcp-server" / "src", # host + Path("/app/mcp-server/src"), # container + ] + for c in candidates: + if c.is_dir() and str(c) not in sys.path: + sys.path.insert(0, str(c)) + + +_setup_paths() + +from legal_mcp.services import db # noqa: E402 + + +# Statuses considered "trusted" — the baseline is computed only over +# halachot whose extraction the chair has accepted. ``pending_review`` +# is the queue waiting for review; their average tends to be lower +# because anything obviously bad gets rejected before approval. So we +# track BOTH series and alert on either one drifting: +# 1. Trusted baseline (approved+published) — drift here means the +# extractor's "best output" quality is degrading. +# 2. All extracted — drift here means raw extractor accuracy is down. +TRUSTED_STATUSES = ("approved", "published") + + +async def _collect_metrics(window_days: int) -> dict: + pool = await db.get_pool() + + # Lifetime baselines + lifetime_all = await pool.fetchrow( + "SELECT count(*) AS n, AVG(confidence) AS avg_conf FROM halachot" + ) + lifetime_trusted = await pool.fetchrow( + f""" + SELECT count(*) AS n, AVG(confidence) AS avg_conf + FROM halachot + WHERE review_status = ANY($1::text[]) + """, + list(TRUSTED_STATUSES), + ) + + # Recent window + recent_all = await pool.fetchrow( + f""" + SELECT count(*) AS n, AVG(confidence) AS avg_conf + FROM halachot + WHERE created_at > NOW() - INTERVAL '{int(window_days)} days' + """ + ) + recent_trusted = await pool.fetchrow( + f""" + SELECT count(*) AS n, AVG(confidence) AS avg_conf + FROM halachot + WHERE created_at > NOW() - INTERVAL '{int(window_days)} days' + AND review_status = ANY($1::text[]) + """, + list(TRUSTED_STATUSES), + ) + + # Per-precedent recent (extractor outputs that haven't been reviewed + # yet) — sometimes the canary that catches drift earliest. We track + # the most-recent N extractions regardless of review state. + pending_recent = await pool.fetchrow( + """ + SELECT count(*) AS n, AVG(confidence) AS avg_conf + FROM halachot + WHERE review_status = 'pending_review' + """ + ) + + def _f(rec, key: str) -> float | None: + v = rec[key] + if v is None: + return None + return float(v) + + def _i(rec, key: str) -> int: + v = rec[key] + return int(v) if v is not None else 0 + + return { + "window_days": int(window_days), + "lifetime_all_count": _i(lifetime_all, "n"), + "lifetime_all_avg": _f(lifetime_all, "avg_conf"), + "lifetime_trusted_count": _i(lifetime_trusted, "n"), + "lifetime_trusted_avg": _f(lifetime_trusted, "avg_conf"), + "recent_all_count": _i(recent_all, "n"), + "recent_all_avg": _f(recent_all, "avg_conf"), + "recent_trusted_count": _i(recent_trusted, "n"), + "recent_trusted_avg": _f(recent_trusted, "avg_conf"), + "pending_review_count": _i(pending_recent, "n"), + "pending_review_avg": _f(pending_recent, "avg_conf"), + } + + +def _drift(baseline: float | None, recent: float | None) -> float | None: + """Return relative drift as a positive number when recent < baseline. + + >>> _drift(0.85, 0.80) # -> 0.0588 (5.88% drop) + """ + if baseline is None or recent is None or baseline <= 0: + return None + return (baseline - recent) / baseline + + +def _evaluate(metrics: dict, threshold: float, min_sample: int) -> dict: + """Decide whether any series is drifting below threshold.""" + alerts: list[dict] = [] + series = [ + ( + "trusted", + metrics["lifetime_trusted_avg"], + metrics["recent_trusted_avg"], + metrics["recent_trusted_count"], + ), + ( + "all_extracted", + metrics["lifetime_all_avg"], + metrics["recent_all_avg"], + metrics["recent_all_count"], + ), + ] + for name, baseline, recent, recent_n in series: + d = _drift(baseline, recent) + entry = { + "series": name, + "baseline": baseline, + "recent": recent, + "recent_n": recent_n, + "drift": d, + "alert": False, + "reason": None, + } + if recent_n < min_sample: + entry["reason"] = f"recent_n={recent_n} below min_sample={min_sample}" + elif d is None: + entry["reason"] = "missing baseline or recent average" + elif d >= threshold: + entry["alert"] = True + entry["reason"] = ( + f"drift {d:.1%} >= threshold {threshold:.1%} " + f"(baseline={baseline:.3f}, recent={recent:.3f}, n={recent_n})" + ) + else: + entry["reason"] = ( + f"drift {d:.1%} < threshold {threshold:.1%} — within tolerance" + ) + alerts.append(entry) + + any_alert = any(a["alert"] for a in alerts) + return {"alert": any_alert, "series": alerts} + + +def _format_alert_text(metrics: dict, decision: dict) -> str: + lines = [ + f"Halacha quality alert — window={metrics['window_days']}d", + "", + ] + for s in decision["series"]: + sym = "ALERT" if s["alert"] else "ok" + baseline = f"{s['baseline']:.3f}" if s["baseline"] is not None else "—" + recent = f"{s['recent']:.3f}" if s["recent"] is not None else "—" + drift = f"{s['drift']:.1%}" if s["drift"] is not None else "—" + lines.append( + f" [{sym}] {s['series']}: baseline={baseline} recent={recent} " + f"drift={drift} n={s['recent_n']}" + ) + if s["reason"]: + lines.append(f" {s['reason']}") + return "\n".join(lines) + + +async def run( + *, + window_days: int, + threshold: float, + min_sample: int, +) -> dict: + metrics = await _collect_metrics(window_days) + decision = _evaluate(metrics, threshold, min_sample) + return { + "generated_at": datetime.now(timezone.utc).isoformat(), + "window_days": window_days, + "threshold_rel": threshold, + "min_sample": min_sample, + "metrics": metrics, + "decision": decision, + } + + +def main(): + parser = argparse.ArgumentParser( + description="Monitor halacha extraction quality (confidence drift)." + ) + parser.add_argument( + "--window", type=int, default=7, + help="Recent window in days (default: 7).", + ) + parser.add_argument( + "--threshold", type=float, default=0.05, + help="Relative drop alert threshold (default: 0.05 = 5%%).", + ) + parser.add_argument( + "--min-sample", type=int, default=5, + help="Minimum halachot in window to evaluate (default: 5). " + "Below this, the series is reported but not alerted on.", + ) + parser.add_argument( + "--silent", action="store_true", + help="Suppress stderr alert text; only print JSON.", + ) + parser.add_argument( + "--exit-on-alert", action="store_true", + help="Exit with status 1 when an alert fires (default: always exit 0).", + ) + args = parser.parse_args() + + report = asyncio.run( + run( + window_days=args.window, + threshold=args.threshold, + min_sample=args.min_sample, + ) + ) + + # JSON to stdout + print(json.dumps(report, ensure_ascii=False, indent=2)) + + if report["decision"]["alert"] and not args.silent: + print("", file=sys.stderr) + print(_format_alert_text(report["metrics"], report["decision"]), file=sys.stderr) + + if args.exit_on_alert and report["decision"]["alert"]: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/web-ui/src/components/precedents/library-list-panel.tsx b/web-ui/src/components/precedents/library-list-panel.tsx index bda5ba7..b749ec9 100644 --- a/web-ui/src/components/precedents/library-list-panel.tsx +++ b/web-ui/src/components/precedents/library-list-panel.tsx @@ -156,7 +156,8 @@ function CourtRow({ p, onEdit }: { p: Precedent; onEdit: (id: string) => void }) {cleanCitation(p.case_number)} - + {/* Column "שם / ערכאה" hidden by request (case_name often equals case_number prefix). Keep field in DB; restore by un-hiding. */} +
{cleanCitation(p.case_name)}
{p.court ?
{p.court}
: null}
@@ -240,7 +241,8 @@ function CommitteeRow({ p, onEdit }: { p: Precedent; onEdit: (id: string) => voi {cleanCitation(p.case_number)}
- + {/* Column "שם" hidden by request (case_name often equals case_number prefix). Keep field in DB; restore by un-hiding. */} +
{cleanCitation(p.case_name)}
@@ -367,7 +369,8 @@ export function LibraryListPanel() { מס׳ / מראה מקום - שם / ערכאה + {/* "שם / ערכאה" hidden by request — see CourtRow */} + שם / ערכאה תאריך תחום רמה @@ -415,7 +418,8 @@ export function LibraryListPanel() { מספר ערר - שם + {/* "שם" hidden by request — see CommitteeRow */} + שם מחוז יו״ר תאריך diff --git a/web/app.py b/web/app.py index 96f8e5e..54eeb2f 100644 --- a/web/app.py +++ b/web/app.py @@ -5250,3 +5250,46 @@ async def missing_precedent_upload( "case_law_id": case_law_id, "route": "internal_committee" if is_committee else "external_upload", } + + +# ── RAG telemetry / nDCG dashboard ──────────────────────────────────── +# Backs the /admin/rag-metrics page. The heavy aggregation lives in +# ``scripts/compute_ndcg.py`` — we re-use its functions here so the API +# response stays in lock-step with the CLI tool. + + +@app.get("/api/admin/rag-metrics") +async def api_rag_metrics(weeks: int = 12, k: int = 10): + """Return nDCG@k aggregates for the RAG retrieval feedback loop. + + Args: + weeks: window for "recent" metrics (default 12). + k: nDCG cutoff (default 10). + """ + # Late import — keeps the path-extension to scripts/ local to this route. + scripts_dir = Path(__file__).resolve().parent.parent / "scripts" + if str(scripts_dir) not in sys.path: + sys.path.insert(0, str(scripts_dir)) + import compute_ndcg # type: ignore + + try: + metrics = await compute_ndcg.compute(weeks=weeks, k=k) + except Exception as e: + logger.exception("rag-metrics compute failed") + raise HTTPException(500, f"חישוב מטריקות נכשל: {e}") from e + return metrics + + +@app.post("/api/admin/rag-metrics/infer") +async def api_rag_metrics_infer(limit: int | None = None): + """Run auto-inference: for every finalized case, mark its cited + precedents as ``relevance_score=3`` against any search_log where + they appeared in the top-K. Idempotent. + """ + from legal_mcp.services import telemetry as telem_svc + try: + result = await telem_svc.infer_relevance_for_all_finalized_cases(limit=limit) + except Exception as e: + logger.exception("rag-metrics auto-inference failed") + raise HTTPException(500, f"auto-inference נכשל: {e}") from e + return result