"""Internal citation graph extractor (TaskMaster #34). When Daphna (or any other internal_committee chair) cites another committee decision inside the body of a ruling, she uses fairly stable phrases: "ונפנה לערר 1110/20 ירושלים שקופה …" "כפי שקבעתי בערר 1041/24 …" "בדומה לעמדתי בהחלטה ערר 8048/24 …" "כפי שנקבע במחוז ת\"א בערר 1234/20 …" "ראה החלטתי בערר 1015-01-24 …" This module scans the ``full_text`` of internal-committee ``case_law`` rows, extracts those citations via regex, tries to link each cited case_number to a row already in ``case_law`` (any source_kind), and stores the result in ``precedent_internal_citations``. Unresolved citations are kept with ``cited_case_law_id = NULL`` so the chair can see what's missing from the corpus (and ``search_internal_decisions`` can surface "cited but absent" gaps). The result is a *citation graph* that downstream tools (search, researcher agent) can join on to surface "decisions cited by this one" alongside keyword/semantic hits — without re-running an LLM on every query. Patterns are *intentionally* permissive: we accept stray Hebrew quote marks (both straight ``"`` and curly ``״``), optional district parens, and several trigger phrases. False positives are de-duplicated downstream by the ``UNIQUE (source_case_law_id, cited_case_number)`` constraint and by case- number normalization (see ``_normalize_case_number``). """ from __future__ import annotations import logging import re from typing import Iterator from uuid import UUID from legal_mcp.services import db logger = logging.getLogger(__name__) # ── Patterns ───────────────────────────────────────────────────────── # # Two pattern families: # 1. Appeals-committee citations ("ערר" / "בל\"מ") — primary target. # These are the ones we resolve against ``case_law``. # 2. Court rulings ("עע\"מ", "בר\"מ", "עמ\"נ", "ע\"א", "בג\"ץ", "רע\"א"). # Stored as unlinked rows by default, so the researcher knows the # decision quotes a higher court. # # Trigger words ("ונפנה", "כפי שקבעתי", "בדומה ל…", "ראה החלטתי", # "כפי שנקבע") are *optional* — many citations appear without one (Daphna # often introduces a quote with just "כפי שצוין בערר…"). We therefore # match the citation core (prefix + number) and capture the surrounding # sentence as context. # # Regex notes: # * Hebrew gershayim/quotation: both straight (") and curly (״) are # accepted via the character class [\"״]. # * Case numbers can be NNNN/YY, NNNN-YY, or NNNN-MM-YY (the third form # is the Nevo "filed" format: 1015-01-24 means file #1015 of Jan 2024). # * Optional district paren: ערר (ועדות ערר - תכנון ובנייה ירושלים) # 1110/20 — we allow up to 60 chars of parenthetical content. # * \b doesn't behave well with Hebrew, so we anchor by whitespace or # punctuation lookarounds. _TRIGGER = ( r"(?:ונפנה\s+ל|" r"כפי\s+ש(?:קבעתי|נקבע|פסקתי)\s+ב|" r"בדומה\s+ל(?:עמדתי\s+ב)?|" r"ראה\s+(?:את\s+)?(?:החלטתי\s+ב|פסיקת\s+ה?ועדה\s+ב)?|" r"בעניין\s+|" r"בהחלטת(?:י|ה|נו)?\s+ב?)?" ) # Optional district / committee parenthetical between the prefix and the # case number. Matches things like "(ועדות ערר - תכנון ובנייה ירושלים)" # or "(ירושלים)" or "(מרכז)". Up to 80 chars to be safe. Required actual # parentheses (the `\(` and `\)` are NOT optional) — otherwise the regex # greedily absorbs the next sentence's content and skips intermediate # citations like "ראה גם ערר 1041/24 …\nכפי שקבעתי בערר (…) 1110/20". _DISTRICT_PAREN = r"(?:\s*\([^)\n]{0,80}\)\s*)?" # Case-number core: 3-5 digits, optional separator and 2-4 digits (and # optional third group for the NNNN-MM-YY format). _NUM_RX = r"(\d{3,5}(?:[-/]\d{2,4}(?:[-/]\d{2,4})?)?)" _PATTERNS = [ # 1. Appeals-committee — ערר / בל"מ ( "appeals_committee", re.compile( _TRIGGER + r"(ערר|בל[\"״]מ)" + _DISTRICT_PAREN + r"\s*" + _NUM_RX, re.UNICODE, ), ), # 2. Higher courts — עע"מ, בר"מ, עמ"נ, ע"א, בג"ץ, רע"א, דנ"א, בש"א ( "court_ruling", re.compile( _TRIGGER + r"(עע[\"״]מ|בר[\"״]מ|עמ[\"״]נ|ע[\"״]א|בג[\"״]ץ|רע[\"״]א|דנ[\"״]א|בש[\"״]א)" + r"\s*" + _NUM_RX, re.UNICODE, ), ), ] # Context window for storing the match (characters before/after). _CTX_BEFORE = 120 _CTX_AFTER = 240 def _normalize_case_number(raw: str) -> str: """Normalize a case-number for matching. The same case can appear in the corpus as "1110/20", "1110-20", "ערר 1110/20", "1110-01-20" — different rules for the third form, which is the Nevo file format. We canonicalize by: * stripping non-digit/separator chars * unifying "/" → "-" * lowercasing The result is used only for matching, never for display. """ cleaned = re.sub(r"[^\d/\-]", "", raw or "") return cleaned.replace("/", "-").strip("-") def extract_citations_from_text(text: str) -> Iterator[dict]: """Yield citation dicts extracted from ``text``. Each dict has: prefix: matched prefix (ערר / בל\"מ / עע\"מ / …) case_number: raw number as captured case_number_norm: normalized (slashes → dashes, digits only) raw: the full matched span context: ±300 chars surrounding the match (whitespace normalized) pattern_kind: 'appeals_committee' or 'court_ruling' """ if not text: return seen: set[tuple[str, str]] = set() for kind, pattern in _PATTERNS: for m in pattern.finditer(text): # The `_TRIGGER` is wrapped in (?:...) so it does not add a # capture group; group(1) is the prefix, group(2) is the number. prefix = (m.group(1) or "").strip() number = (m.group(2) or "").strip() if not prefix or not number: continue norm = _normalize_case_number(number) if not norm: continue key = (kind, norm) if key in seen: continue seen.add(key) start = max(0, m.start() - _CTX_BEFORE) end = min(len(text), m.end() + _CTX_AFTER) context = text[start:end].replace("\n", " ").strip() context = re.sub(r"\s+", " ", context) yield { "prefix": prefix, "case_number": number, "case_number_norm": norm, "raw": m.group(0).strip(), "context": context[:1000], "pattern_kind": kind, } async def _resolve_case_law_id(case_number_norm: str) -> UUID | None: """Try to resolve a normalized citation to an existing case_law row. Strategy: 1. Exact match on normalized case_number column (after rewriting existing case_numbers the same way). 2. Substring match — the corpus often stores the full Nevo header ("ערר ‏(‏ועדות ערר - תכנון ובנייה ירושלים‏)‏ 1110/20 …"), so we search by ``case_number ILIKE '%1110/20%' OR '%1110-20%'``. Returns None if no row matches. """ if not case_number_norm: return None pool = await db.get_pool() # Build the two raw forms (with slash and with dash) for substring match. parts = case_number_norm.split("-") if len(parts) >= 2: slash_form = "/".join(parts[:2]) if len(parts) == 2 else parts[0] + "/" + parts[-1] else: slash_form = case_number_norm dash_form = case_number_norm async with pool.acquire() as conn: # Substring match on either form (covers full Nevo headers and short forms). row = await conn.fetchrow( """ SELECT id FROM case_law WHERE case_number ILIKE $1 OR case_number ILIKE $2 ORDER BY (source_kind = 'internal_committee') DESC, LENGTH(case_number) ASC LIMIT 1 """, f"%{slash_form}%", f"%{dash_form}%", ) return UUID(str(row["id"])) if row else None async def extract_and_store(case_law_id: UUID) -> dict: """Extract citations from a single ``case_law`` row's ``full_text``, resolve them against the corpus, and INSERT into ``precedent_internal_citations`` (ON CONFLICT DO NOTHING). Returns: {extracted: N, linked: M, new: K, skipped: S} extracted — total distinct citations found in the text linked — how many resolved to an existing case_law row new — rows actually inserted (not pre-existing) skipped — citations skipped (self-citation, already stored) """ pool = await db.get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT id, case_number, full_text FROM case_law WHERE id = $1", case_law_id, ) if not row: return {"extracted": 0, "linked": 0, "new": 0, "skipped": 0, "error": "not_found"} text = row["full_text"] or "" own_norm = _normalize_case_number(row["case_number"] or "") extracted = 0 linked = 0 new_count = 0 skipped = 0 for cit in extract_citations_from_text(text): extracted += 1 if cit["case_number_norm"] == own_norm: # Self-citation (e.g. document headers repeating the case number). skipped += 1 continue cited_id = await _resolve_case_law_id(cit["case_number_norm"]) if cited_id is not None and cited_id == case_law_id: skipped += 1 continue if cited_id is not None: linked += 1 async with pool.acquire() as conn: result = await conn.execute( """ INSERT INTO precedent_internal_citations ( source_case_law_id, cited_case_number, cited_case_law_id, match_context, match_pattern, confidence ) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (source_case_law_id, cited_case_number) DO NOTHING """, case_law_id, f"{cit['prefix']} {cit['case_number']}", cited_id, cit["context"], cit["pattern_kind"], 0.90 if cited_id is not None else 0.75, ) # asyncpg execute returns 'INSERT 0 N' — N is rows inserted. try: n_inserted = int(result.split()[-1]) except (ValueError, IndexError): n_inserted = 0 if n_inserted == 1: new_count += 1 else: skipped += 1 return { "extracted": extracted, "linked": linked, "new": new_count, "skipped": skipped, } async def extract_all_internal_committee( chair_name_filter: str = "", limit: int = 0, ) -> dict: """Run extraction over every internal-committee row in ``case_law``. Args: chair_name_filter: if non-empty, restrict to rows where chair_name matches (exact match). Useful for running on Daphna only. limit: hard cap on number of rows processed (0 = no cap). Returns: summary dict with per-row counts and aggregate totals. """ pool = await db.get_pool() conditions = ["source_kind = 'internal_committee'", "full_text <> ''"] params: list = [] if chair_name_filter: conditions.append("chair_name = $1") params.append(chair_name_filter) where = " WHERE " + " AND ".join(conditions) limit_clause = f" LIMIT {int(limit)}" if limit and limit > 0 else "" sql = f"SELECT id, case_number FROM case_law{where} ORDER BY created_at{limit_clause}" async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) totals = { "processed": 0, "extracted": 0, "linked": 0, "new": 0, "skipped": 0, "failed": 0, "chair_name_filter": chair_name_filter, "row_count": len(rows), } for r in rows: try: stats = await extract_and_store(UUID(str(r["id"]))) totals["processed"] += 1 totals["extracted"] += stats.get("extracted", 0) totals["linked"] += stats.get("linked", 0) totals["new"] += stats.get("new", 0) totals["skipped"] += stats.get("skipped", 0) except Exception as e: logger.exception("citation extraction failed for %s: %s", r["case_number"], e) totals["failed"] += 1 return totals async def list_citations_for_case_law( case_law_id: UUID, linked_only: bool = False, ) -> list[dict]: """Return all citations *from* the given case_law row (outgoing edges).""" pool = await db.get_pool() where = "pic.source_case_law_id = $1" if linked_only: where += " AND pic.cited_case_law_id IS NOT NULL" sql = f""" SELECT pic.id::text AS id, pic.cited_case_number, pic.cited_case_law_id::text AS cited_case_law_id, pic.match_context, pic.match_pattern, pic.confidence::float AS confidence, pic.created_at, cl.case_number AS target_case_number, cl.case_name AS target_case_name, cl.chair_name AS target_chair_name, cl.district AS target_district FROM precedent_internal_citations pic LEFT JOIN case_law cl ON cl.id = pic.cited_case_law_id WHERE {where} ORDER BY pic.created_at """ async with pool.acquire() as conn: rows = await conn.fetch(sql, case_law_id) return [dict(r) for r in rows] async def list_citations_to_case_law(case_law_id: UUID) -> list[dict]: """Return all citations *to* the given case_law row (incoming edges). Useful for "which Daphna decisions cite this ruling?" queries. """ pool = await db.get_pool() sql = """ SELECT pic.id::text AS id, pic.source_case_law_id::text AS source_case_law_id, pic.cited_case_number, pic.match_context, pic.match_pattern, pic.confidence::float AS confidence, pic.created_at, cl.case_number AS source_case_number, cl.case_name AS source_case_name, cl.chair_name AS source_chair_name, cl.district AS source_district FROM precedent_internal_citations pic JOIN case_law cl ON cl.id = pic.source_case_law_id WHERE pic.cited_case_law_id = $1 ORDER BY pic.created_at DESC """ async with pool.acquire() as conn: rows = await conn.fetch(sql, case_law_id) return [dict(r) for r in rows] async def get_cited_case_law_ids(source_case_law_ids: list[UUID]) -> dict[str, list[str]]: """Bulk-fetch outgoing citation case_law_ids for the given source rows. Returns: {source_case_law_id (str): [cited_case_law_id (str), ...]} — only including linked (resolved) citations. Used by search.search_internal_decisions(include_cited_by=True) to expand result sets with the precedents the hits themselves cite, without running a separate roundtrip per row. """ if not source_case_law_ids: return {} pool = await db.get_pool() async with pool.acquire() as conn: rows = await conn.fetch( """ SELECT source_case_law_id::text AS source_id, cited_case_law_id::text AS cited_id FROM precedent_internal_citations WHERE source_case_law_id = ANY($1::uuid[]) AND cited_case_law_id IS NOT NULL """, list(source_case_law_ids), ) out: dict[str, list[str]] = {} for r in rows: out.setdefault(r["source_id"], []).append(r["cited_id"]) return out