"""Corpus graph projection — read-only topology of the precedent corpus. Powers the ``/graph`` page (the in-app, Obsidian-graph-view-like network of the legal corpus). This module is a **pure projection** of the live corpus, not a parallel store: every node and edge is assembled on the fly from the canonical tables via the shared ``db.get_pool()`` connection. It writes nothing (``SELECT`` only), so it cannot drift from the source of truth — preserving **G2** (single source of truth, no parallel paths). It is also **not a retrieval path** (03-retrieval): it returns graph topology (nodes + edges + in-degree), never ranked search results, so it cannot become a second, drifting way to "find" precedents. Phase 1 node types: - ``precedent`` — a row in ``case_law`` (external rulings + committee decisions) - ``topic`` — a synthesized hub per ``subject_tag`` - ``practice_area`` — a synthesized hub per ``case_law.practice_area`` Phase 1 edge types: - ``cites`` — ``precedent_internal_citations`` (source → cited) - ``same_chain`` — ``case_law_relations`` (undirected, same-case chain) - ``tagged`` — synthesized precedent → topic-hub membership - ``in_area`` — synthesized precedent → practice-area-hub membership Node **size = importance = incoming-citation count**, computed in SQL via the ``idx_pic_target`` index (a single index-backed ``GROUP BY``, never N+1). Halacha nodes + corroboration/equivalence edges are Phase 2 (gated behind the ``node_types`` param), so the frontend can already send/hide ``halacha`` without a contract change. """ from __future__ import annotations from uuid import UUID import asyncpg from pydantic import BaseModel # ── Node-type vocabulary ───────────────────────────────────────────── VALID_NODE_TYPES = {"precedent", "halacha", "topic", "practice_area"} DEFAULT_NODE_TYPES = ("precedent", "topic", "practice_area") NODE_CAP_DEFAULT = 400 NODE_CAP_MAX = 1500 # Hebrew labels for the closed practice-area enum (G5). Unknown values fall # back to the raw token so a new area still renders rather than vanishing. _PA_LABELS = { "rishuy_uvniya": "רישוי ובנייה", "betterment_levy": "היטל השבחה", "compensation_197": "פיצויים (ס׳ 197)", "appeals_committee": "ועדת ערר", } # ── Response models (UI2: explicit Pydantic → real generated types) ─── class GraphNode(BaseModel): id: str # "cl:" | "hal:" | "tag:" | "pa:" type: str # precedent | halacha | topic | practice_area label: str size: int = 0 # incoming-citation count; 0 for hubs in Phase 1 practice_area: str | None = None source_kind: str | None = None # precedents only precedent_level: str | None = None # precedents only case_law_id: str | None = None # canonical id for deep-link (precedents) class GraphEdge(BaseModel): source: str target: str type: str # cites | same_chain | tagged | in_area treatment: str | None = None weight: float | None = None class CorpusGraph(BaseModel): nodes: list[GraphNode] edges: list[GraphEdge] truncated: bool = False # true when the node cap clipped the result total_available: int = 0 # precedents matching the filters before the cap # ── Helpers ────────────────────────────────────────────────────────── def normalize_node_types(node_types: str) -> set[str]: """Parse the ``node_types`` CSV param into a validated set. Empty / all-invalid input falls back to the Phase-1 default so a missing param never yields an empty graph. """ toks = {t.strip() for t in (node_types or "").split(",") if t.strip()} valid = {t for t in toks if t in VALID_NODE_TYPES} return valid or set(DEFAULT_NODE_TYPES) _PREC_INDEG_CTE = """ WITH prec_indeg AS ( SELECT cited_case_law_id AS id, COUNT(*) AS n FROM precedent_internal_citations WHERE cited_case_law_id IS NOT NULL GROUP BY cited_case_law_id ) """ def _precedent_node(row: asyncpg.Record) -> GraphNode: label = (row["case_number"] or "").strip() or (row["case_name"] or "").strip() or "—" return GraphNode( id=f"cl:{row['id']}", type="precedent", label=label, size=int(row["size"] or 0), practice_area=(row["practice_area"] or None), source_kind=(row["source_kind"] or None), precedent_level=(row["precedent_level"] or None), case_law_id=str(row["id"]), ) async def _edges_and_hubs( conn: asyncpg.Connection, prec_rows: list[asyncpg.Record], types: set[str], ) -> tuple[list[GraphNode], list[GraphEdge]]: """Build intra-set edges + synthesized topic/practice-area hub nodes. Only edges whose BOTH endpoints are in ``prec_rows`` are emitted — an edge to a precedent that was clipped by the node cap is dropped so the client never receives a dangling reference. """ hub_nodes: list[GraphNode] = [] edges: list[GraphEdge] = [] prec_ids = [r["id"] for r in prec_rows] if not prec_ids: return hub_nodes, edges # cites — directional precedent → precedent cite_rows = await conn.fetch( """ SELECT source_case_law_id AS s, cited_case_law_id AS t, treatment, confidence FROM precedent_internal_citations WHERE cited_case_law_id IS NOT NULL AND source_case_law_id = ANY($1::uuid[]) AND cited_case_law_id = ANY($1::uuid[]) """, prec_ids, ) for r in cite_rows: edges.append( GraphEdge( source=f"cl:{r['s']}", target=f"cl:{r['t']}", type="cites", treatment=(r["treatment"] or None), weight=float(r["confidence"]) if r["confidence"] is not None else None, ) ) # same_chain — undirected; stored possibly in both directions → dedup rel_rows = await conn.fetch( """ SELECT case_law_id AS s, related_id AS t FROM case_law_relations WHERE case_law_id = ANY($1::uuid[]) AND related_id = ANY($1::uuid[]) """, prec_ids, ) seen_chain: set[tuple[str, str]] = set() for r in rel_rows: key = tuple(sorted((str(r["s"]), str(r["t"])))) if key in seen_chain: continue seen_chain.add(key) edges.append( GraphEdge(source=f"cl:{r['s']}", target=f"cl:{r['t']}", type="same_chain") ) # topic hubs — case_law.subject_tags is JSONB → expand in SQL if "topic" in types: tag_rows = await conn.fetch( """ SELECT c.id, btrim(t.tag) AS tag FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag) WHERE c.id = ANY($1::uuid[]) AND btrim(t.tag) <> '' """, prec_ids, ) tag_seen: set[str] = set() for r in tag_rows: tag = r["tag"] tid = f"tag:{tag}" if tag not in tag_seen: tag_seen.add(tag) hub_nodes.append(GraphNode(id=tid, type="topic", label=tag)) edges.append(GraphEdge(source=f"cl:{r['id']}", target=tid, type="tagged")) # practice-area hubs — scalar column on each precedent row if "practice_area" in types: pa_seen: set[str] = set() for r in prec_rows: pa = (r["practice_area"] or "").strip() if not pa: continue pid = f"pa:{pa}" if pa not in pa_seen: pa_seen.add(pa) hub_nodes.append( GraphNode( id=pid, type="practice_area", label=_PA_LABELS.get(pa, pa), practice_area=pa, ) ) edges.append(GraphEdge(source=f"cl:{r['id']}", target=pid, type="in_area")) return hub_nodes, edges # ── Endpoints' core logic ──────────────────────────────────────────── async def build_corpus_graph( pool: asyncpg.Pool, *, practice_area: str = "", source: str = "", node_types: str = "", min_citations: int = 0, limit: int = NODE_CAP_DEFAULT, q: str = "", ) -> CorpusGraph: """Assemble the full corpus graph under the given filters. The most-cited precedents always survive the cap (``ORDER BY size DESC``), so clipping never hides the structurally important nodes. ``truncated`` + ``total_available`` let the UI prompt the user to narrow filters. """ types = normalize_node_types(node_types) cap = max(1, min(int(limit), NODE_CAP_MAX)) min_cit = max(0, int(min_citations)) async with pool.acquire() as conn: prec_rows = await conn.fetch( _PREC_INDEG_CTE + """ SELECT c.id, c.case_number, c.case_name, c.practice_area, c.source_kind, c.precedent_level, COALESCE(p.n, 0) AS size, COUNT(*) OVER () AS total_available FROM case_law c LEFT JOIN prec_indeg p ON p.id = c.id WHERE ($1 = '' OR c.practice_area = $1) AND ($2 = '' OR c.source_kind = $2) AND COALESCE(p.n, 0) >= $3 AND ($4 = '' OR c.case_number ILIKE '%' || $4 || '%' OR c.case_name ILIKE '%' || $4 || '%') ORDER BY COALESCE(p.n, 0) DESC, c.case_number LIMIT $5 """, practice_area, source, min_cit, q.strip(), cap, ) total_available = int(prec_rows[0]["total_available"]) if prec_rows else 0 nodes = [_precedent_node(r) for r in prec_rows] hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, types) nodes.extend(hub_nodes) return CorpusGraph( nodes=nodes, edges=edges, truncated=total_available > len(prec_rows), total_available=total_available, ) async def build_node_neighborhood( pool: asyncpg.Pool, node_id: str, *, depth: int = 1, node_types: str = "", ) -> CorpusGraph: """Local-graph focus: the seed node + its neighbors out to ``depth`` (1-2). Naturally bounded (one seed, BFS depth ≤ 2), so it is the recommended way to "see everything around a node" when the full graph is clipped. Seeds: - ``cl:`` — a precedent; BFS expands ``depth`` levels. - ``tag:`` — a topic hub; its members are level 1, BFS ``depth-1`` more. - ``pa:`` — a practice-area hub; same as topic. """ types = normalize_node_types(node_types) depth = max(1, min(int(depth), 2)) prefix, _, rest = node_id.partition(":") rest = rest.strip() if prefix not in {"cl", "tag", "pa"} or not rest: return CorpusGraph(nodes=[], edges=[]) async with pool.acquire() as conn: # Seed the precedent id set + remaining BFS levels. if prefix == "cl": try: seed_uuid = UUID(rest) except ValueError: return CorpusGraph(nodes=[], edges=[]) current: set = {seed_uuid} levels_left = depth # The seed hub types are whatever the caller asked for. forced_types = types elif prefix == "tag": rows = await conn.fetch( """ SELECT c.id FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag) WHERE btrim(t.tag) = $1 LIMIT $2 """, rest, NODE_CAP_MAX, ) current = {r["id"] for r in rows} levels_left = depth - 1 forced_types = types | {"topic"} # ensure the focused hub renders else: # pa rows = await conn.fetch( "SELECT id FROM case_law WHERE practice_area = $1 LIMIT $2", rest, NODE_CAP_MAX, ) current = {r["id"] for r in rows} levels_left = depth - 1 forced_types = types | {"practice_area"} if not current: return CorpusGraph(nodes=[], edges=[]) # BFS over citation + same-chain edges (undirected for traversal). all_ids = set(current) frontier = set(current) truncated = False while levels_left > 0 and frontier: if len(all_ids) >= NODE_CAP_MAX: truncated = True break nb_rows = await conn.fetch( """ SELECT cited_case_law_id AS nb FROM precedent_internal_citations WHERE cited_case_law_id IS NOT NULL AND source_case_law_id = ANY($1::uuid[]) UNION SELECT source_case_law_id AS nb FROM precedent_internal_citations WHERE cited_case_law_id = ANY($1::uuid[]) UNION SELECT related_id AS nb FROM case_law_relations WHERE case_law_id = ANY($1::uuid[]) UNION SELECT case_law_id AS nb FROM case_law_relations WHERE related_id = ANY($1::uuid[]) """, list(frontier), ) nbs = {r["nb"] for r in nb_rows} - all_ids all_ids |= nbs frontier = nbs levels_left -= 1 ids = list(all_ids)[:NODE_CAP_MAX] prec_rows = await conn.fetch( _PREC_INDEG_CTE + """ SELECT c.id, c.case_number, c.case_name, c.practice_area, c.source_kind, c.precedent_level, COALESCE(p.n, 0) AS size FROM case_law c LEFT JOIN prec_indeg p ON p.id = c.id WHERE c.id = ANY($1::uuid[]) """, ids, ) nodes = [_precedent_node(r) for r in prec_rows] hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, forced_types) nodes.extend(hub_nodes) return CorpusGraph( nodes=nodes, edges=edges, truncated=truncated, total_available=len(nodes), )