"""Corpus graph projection — read-only topology of the precedent corpus. Powers the ``/graph`` page (the in-app, Obsidian-graph-view-like network of the legal corpus). This module is a **pure projection** of the live corpus, not a parallel store: every node and edge is assembled on the fly from the canonical tables via the shared ``db.get_pool()`` connection. It writes nothing (``SELECT`` only), so it cannot drift from the source of truth — preserving **G2** (single source of truth, no parallel paths). It is also **not a retrieval path** (03-retrieval): it returns graph topology (nodes + edges + in-degree), never ranked search results, so it cannot become a second, drifting way to "find" precedents. Phase 1 node types: - ``precedent`` — a row in ``case_law`` (external rulings + committee decisions) - ``topic`` — a synthesized hub per ``subject_tag`` - ``practice_area`` — a synthesized hub per ``case_law.practice_area`` Phase 1 edge types: - ``cites`` — ``precedent_internal_citations`` (source → cited) - ``same_chain`` — ``case_law_relations`` (undirected, same-case chain) - ``tagged`` — synthesized precedent → topic-hub membership - ``in_area`` — synthesized precedent → practice-area-hub membership Node **size = importance = incoming-citation count**, computed in SQL via the ``idx_pic_target`` index (a single index-backed ``GROUP BY``, never N+1). Halacha nodes + corroboration/equivalence edges are Phase 2 (gated behind the ``node_types`` param), so the frontend can already send/hide ``halacha`` without a contract change. """ from __future__ import annotations from uuid import UUID import asyncpg from pydantic import BaseModel from web import graph_metrics # ── Node-type vocabulary ───────────────────────────────────────────── VALID_NODE_TYPES = {"precedent", "halacha", "topic", "practice_area", "gap", "digest"} DEFAULT_NODE_TYPES = ("precedent", "topic", "practice_area") NODE_CAP_DEFAULT = 400 NODE_CAP_MAX = 1500 # Hebrew labels for the closed practice-area enum (G5). Unknown values fall # back to the raw token so a new area still renders rather than vanishing. _PA_LABELS = { "rishuy_uvniya": "רישוי ובנייה", "betterment_levy": "היטל השבחה", "compensation_197": "פיצויים (ס׳ 197)", "appeals_committee": "ועדת ערר", } # ── Response models (UI2: explicit Pydantic → real generated types) ─── class GraphNode(BaseModel): id: str # "cl:" | "hal:" | "tag:" | "pa:" type: str # precedent | halacha | topic | practice_area label: str size: int = 0 # incoming-citation count; 0 for hubs in Phase 1 practice_area: str | None = None source_kind: str | None = None # precedents only precedent_level: str | None = None # precedents only court: str | None = None # precedents only — for color-by / filter date: str | None = None # precedents only — ISO date, for recency color/filter case_law_id: str | None = None # canonical id for deep-link (precedents) # Graph metrics — populated only when ``metrics=true`` (precedents only). pagerank: float | None = None # normalized 0–1 (global influence) betweenness: float | None = None # normalized 0–1 (bridge-ness) community: int | None = None # dense cluster id, 0 = largest # Gap nodes only — research-gap status from missing_precedents (best-effort). gap_status: str | None = None # open | uploaded | closed | irrelevant missing_precedent_id: str | None = None # Digest nodes only — the holding line from the daily יומון. note: str | None = None digest_id: str | None = None # for deep-link to /digests class GraphFacets(BaseModel): """Distinct filter values so the UI doesn't hardcode Hebrew enum strings.""" courts: list[str] precedent_levels: list[str] chairs: list[str] districts: list[str] class GraphEdge(BaseModel): source: str target: str type: str # cites | same_chain | tagged | in_area treatment: str | None = None weight: float | None = None class CorpusGraph(BaseModel): nodes: list[GraphNode] edges: list[GraphEdge] truncated: bool = False # true when the node cap clipped the result total_available: int = 0 # precedents matching the filters before the cap # ── Helpers ────────────────────────────────────────────────────────── def normalize_node_types(node_types: str) -> set[str]: """Parse the ``node_types`` CSV param into a validated set. Empty / all-invalid input falls back to the Phase-1 default so a missing param never yields an empty graph. """ toks = {t.strip() for t in (node_types or "").split(",") if t.strip()} valid = {t for t in toks if t in VALID_NODE_TYPES} return valid or set(DEFAULT_NODE_TYPES) _PREC_INDEG_CTE = """ WITH prec_indeg AS ( SELECT cited_case_law_id AS id, COUNT(*) AS n FROM precedent_internal_citations WHERE cited_case_law_id IS NOT NULL GROUP BY cited_case_law_id ) """ def _precedent_node(row: asyncpg.Record) -> GraphNode: label = (row["case_number"] or "").strip() or (row["case_name"] or "").strip() or "—" return GraphNode( id=f"cl:{row['id']}", type="precedent", label=label, size=int(row["size"] or 0), practice_area=(row["practice_area"] or None), source_kind=(row["source_kind"] or None), precedent_level=(row["precedent_level"] or None), court=(row["court"] or None), date=(row["date"].isoformat() if row["date"] else None), case_law_id=str(row["id"]), ) async def _edges_and_hubs( conn: asyncpg.Connection, prec_rows: list[asyncpg.Record], types: set[str], ) -> tuple[list[GraphNode], list[GraphEdge]]: """Build intra-set edges + synthesized topic/practice-area hub nodes. Only edges whose BOTH endpoints are in ``prec_rows`` are emitted — an edge to a precedent that was clipped by the node cap is dropped so the client never receives a dangling reference. """ hub_nodes: list[GraphNode] = [] edges: list[GraphEdge] = [] prec_ids = [r["id"] for r in prec_rows] if not prec_ids: return hub_nodes, edges # cites — directional precedent → precedent cite_rows = await conn.fetch( """ SELECT source_case_law_id AS s, cited_case_law_id AS t, treatment, confidence FROM precedent_internal_citations WHERE cited_case_law_id IS NOT NULL AND source_case_law_id = ANY($1::uuid[]) AND cited_case_law_id = ANY($1::uuid[]) """, prec_ids, ) for r in cite_rows: edges.append( GraphEdge( source=f"cl:{r['s']}", target=f"cl:{r['t']}", type="cites", treatment=(r["treatment"] or None), weight=float(r["confidence"]) if r["confidence"] is not None else None, ) ) # same_chain — undirected; stored possibly in both directions → dedup rel_rows = await conn.fetch( """ SELECT case_law_id AS s, related_id AS t FROM case_law_relations WHERE case_law_id = ANY($1::uuid[]) AND related_id = ANY($1::uuid[]) """, prec_ids, ) seen_chain: set[tuple[str, str]] = set() for r in rel_rows: key = tuple(sorted((str(r["s"]), str(r["t"])))) if key in seen_chain: continue seen_chain.add(key) edges.append( GraphEdge(source=f"cl:{r['s']}", target=f"cl:{r['t']}", type="same_chain") ) # topic hubs — case_law.subject_tags is JSONB → expand in SQL if "topic" in types: tag_rows = await conn.fetch( """ SELECT c.id, btrim(t.tag) AS tag FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag) WHERE c.id = ANY($1::uuid[]) AND btrim(t.tag) <> '' """, prec_ids, ) tag_seen: set[str] = set() for r in tag_rows: tag = r["tag"] tid = f"tag:{tag}" if tag not in tag_seen: tag_seen.add(tag) hub_nodes.append(GraphNode(id=tid, type="topic", label=tag)) edges.append(GraphEdge(source=f"cl:{r['id']}", target=tid, type="tagged")) # practice-area hubs — scalar column on each precedent row if "practice_area" in types: pa_seen: set[str] = set() for r in prec_rows: pa = (r["practice_area"] or "").strip() if not pa: continue pid = f"pa:{pa}" if pa not in pa_seen: pa_seen.add(pa) hub_nodes.append( GraphNode( id=pid, type="practice_area", label=_PA_LABELS.get(pa, pa), practice_area=pa, ) ) edges.append(GraphEdge(source=f"cl:{r['id']}", target=pid, type="in_area")) return hub_nodes, edges _NORM_NUM = "regexp_replace(btrim(cited_case_number), '\\s+', ' ', 'g')" async def _gap_nodes_and_edges( conn: asyncpg.Connection, prec_ids: list, ) -> tuple[list[GraphNode], list[GraphEdge]]: """Research-gap ("ghost") nodes: precedents that are CITED but not in the corpus (``precedent_internal_citations.cited_case_law_id IS NULL``). One ``gap:`` node per distinct cited number, sized by how many corpus precedents cite it (global — the "most-wanted missing precedent"). Edges only from citing precedents present in ``prec_ids`` so no edge dangles. Best-effort enriched with ``missing_precedents`` status via an exact normalized-citation match (an unmatched gap still renders).""" nodes: list[GraphNode] = [] edges: list[GraphEdge] = [] if not prec_ids: return nodes, edges # Edges from the displayed precedents to the numbers they cite. edge_rows = await conn.fetch( f""" SELECT {_NORM_NUM} AS num, source_case_law_id AS s FROM precedent_internal_citations WHERE cited_case_law_id IS NULL AND btrim(cited_case_number) <> '' AND source_case_law_id = ANY($1::uuid[]) """, prec_ids, ) if not edge_rows: return nodes, edges nums = {r["num"] for r in edge_rows} # Global in-degree per number (importance), independent of the cap. indeg_rows = await conn.fetch( f""" SELECT {_NORM_NUM} AS num, COUNT(*) AS n FROM precedent_internal_citations WHERE cited_case_law_id IS NULL AND btrim(cited_case_number) <> '' GROUP BY 1 """ ) indeg = {r["num"]: int(r["n"]) for r in indeg_rows} # Best-effort enrichment from missing_precedents (exact normalized match). mp_rows = await conn.fetch( "SELECT id, regexp_replace(btrim(citation), '\\s+', ' ', 'g') AS num, status " "FROM missing_precedents" ) mp = {r["num"]: (str(r["id"]), r["status"]) for r in mp_rows if r["num"]} for num in sorted(nums): gid = f"gap:{num}" match = mp.get(num) nodes.append( GraphNode( id=gid, type="gap", label=num, size=indeg.get(num, 1), gap_status=(match[1] if match else None), missing_precedent_id=(match[0] if match else None), ) ) for r in edge_rows: edges.append(GraphEdge(source=f"cl:{r['s']}", target=f"gap:{r['num']}", type="cites")) return nodes, edges async def _digest_nodes_and_edges( conn: asyncpg.Connection, prec_ids: list, ) -> tuple[list[GraphNode], list[GraphEdge], list[GraphNode]]: """Daily-digest (יומון) discovery layer. Each digest ``covers`` the ruling it analyses: a corpus precedent (``linked_case_law_id``) when we have it, or a ``gap`` node synthesized from ``underlying_citation`` when we don't — so the digest doubles as a research signal ("the feed flagged this ruling"). Returns (digest_nodes, covers_edges, gap_target_nodes). The caller dedups gap nodes against the gap layer (real in-degree there wins over size=1).""" digest_nodes: list[GraphNode] = [] edges: list[GraphEdge] = [] gap_nodes: list[GraphNode] = [] if not prec_ids: return digest_nodes, edges, gap_nodes prec_set = {str(x) for x in prec_ids} rows = await conn.fetch( """ SELECT id, yomon_number, concept_tag, headline_holding, underlying_citation, underlying_court, underlying_date, digest_date, practice_area, linked_case_law_id, regexp_replace(btrim(underlying_citation), '\\s+', ' ', 'g') AS u_num FROM digests WHERE extraction_status = 'completed' AND (linked_case_law_id = ANY($1::uuid[]) OR (linked_case_law_id IS NULL AND btrim(underlying_citation) <> '')) ORDER BY digest_date DESC NULLS LAST LIMIT 400 """, prec_ids, ) seen_gap: set[str] = set() for r in rows: did = f"dig:{r['id']}" linked = r["linked_case_law_id"] if linked is not None and str(linked) in prec_set: target = f"cl:{linked}" elif r["u_num"]: target = f"gap:{r['u_num']}" if r["u_num"] not in seen_gap: seen_gap.add(r["u_num"]) gap_nodes.append( GraphNode( id=target, type="gap", label=(r["underlying_citation"] or "").strip() or r["u_num"], size=1, ) ) else: continue label = (r["concept_tag"] or "").strip() or ( f"יומון {r['yomon_number']}" if r["yomon_number"] else "יומון" ) d = r["underlying_date"] or r["digest_date"] digest_nodes.append( GraphNode( id=did, type="digest", label=label[:48], note=((r["headline_holding"] or "").strip()[:160] or None), court=(r["underlying_court"] or None), date=(d.isoformat() if d else None), practice_area=(r["practice_area"] or None), digest_id=str(r["id"]), ) ) edges.append(GraphEdge(source=did, target=target, type="covers")) return digest_nodes, edges, gap_nodes async def _add_digests( conn: asyncpg.Connection, prec_ids: list, nodes: list[GraphNode], edges: list[GraphEdge], ) -> None: """Append the digest layer in place, adding digest-target gap nodes only if they aren't already present (the gap layer's real in-degree wins).""" dig_nodes, dig_edges, gap_targets = await _digest_nodes_and_edges(conn, prec_ids) existing = {n.id for n in nodes} for g in gap_targets: if g.id not in existing: nodes.append(g) existing.add(g.id) nodes.extend(dig_nodes) edges.extend(dig_edges) # ── Endpoints' core logic ──────────────────────────────────────────── async def build_corpus_graph( pool: asyncpg.Pool, *, practice_area: str = "", source: str = "", node_types: str = "", min_citations: int = 0, limit: int = NODE_CAP_DEFAULT, q: str = "", court: str = "", precedent_level: str = "", chair: str = "", district: str = "", year_from: int = 0, year_to: int = 0, metrics: bool = False, ) -> CorpusGraph: """Assemble the full corpus graph under the given filters. The most-cited precedents always survive the cap (``ORDER BY size DESC``), so clipping never hides the structurally important nodes. ``truncated`` + ``total_available`` let the UI prompt the user to narrow filters. All filters are applied server-side in the WHERE clause (G5). When ``metrics`` is true, PageRank / betweenness / community are computed in-memory over the precedent citation subgraph (``graph_metrics``) and stamped onto precedent nodes — no extra DB work (G2). """ types = normalize_node_types(node_types) cap = max(1, min(int(limit), NODE_CAP_MAX)) min_cit = max(0, int(min_citations)) async with pool.acquire() as conn: prec_rows = await conn.fetch( _PREC_INDEG_CTE + """ SELECT c.id, c.case_number, c.case_name, c.practice_area, c.source_kind, c.precedent_level, c.court, c.date, COALESCE(p.n, 0) AS size, COUNT(*) OVER () AS total_available FROM case_law c LEFT JOIN prec_indeg p ON p.id = c.id WHERE ($1 = '' OR c.practice_area = $1) AND ($2 = '' OR c.source_kind = $2) AND COALESCE(p.n, 0) >= $3 AND ($4 = '' OR c.case_number ILIKE '%' || $4 || '%' OR c.case_name ILIKE '%' || $4 || '%') AND ($6 = '' OR c.court = $6) AND ($7 = '' OR c.precedent_level = $7) AND ($8 = '' OR c.chair_name = $8) AND ($9 = '' OR c.district = $9) AND ($10 = 0 OR (c.date IS NOT NULL AND EXTRACT(YEAR FROM c.date) >= $10)) AND ($11 = 0 OR (c.date IS NOT NULL AND EXTRACT(YEAR FROM c.date) <= $11)) ORDER BY COALESCE(p.n, 0) DESC, c.case_number LIMIT $5 """, practice_area, source, min_cit, q.strip(), cap, court, precedent_level, chair, district, max(0, int(year_from)), max(0, int(year_to)), ) total_available = int(prec_rows[0]["total_available"]) if prec_rows else 0 nodes = [_precedent_node(r) for r in prec_rows] prec_id_list = [r["id"] for r in prec_rows] hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, types) nodes.extend(hub_nodes) if "gap" in types: gap_nodes, gap_edges = await _gap_nodes_and_edges(conn, prec_id_list) nodes.extend(gap_nodes) edges.extend(gap_edges) if "digest" in types: await _add_digests(conn, prec_id_list, nodes, edges) if metrics: _stamp_metrics(nodes, edges) return CorpusGraph( nodes=nodes, edges=edges, truncated=total_available > len(prec_rows), total_available=total_available, ) def _stamp_metrics(nodes: list[GraphNode], edges: list[GraphEdge]) -> None: """Compute PageRank/betweenness/community over the precedent subgraph and stamp them onto precedent nodes in place (hubs stay ``None``).""" prec_ids = [n.id for n in nodes if n.type == "precedent"] if not prec_ids: return directed = [(e.source, e.target) for e in edges if e.type == "cites"] undirected = [(e.source, e.target) for e in edges if e.type == "same_chain"] m = graph_metrics.compute(prec_ids, directed, undirected) for n in nodes: mv = m.get(n.id) if mv: n.pagerank = mv["pagerank"] n.betweenness = mv["betweenness"] n.community = mv["community"] async def build_node_neighborhood( pool: asyncpg.Pool, node_id: str, *, depth: int = 1, node_types: str = "", ) -> CorpusGraph: """Local-graph focus: the seed node + its neighbors out to ``depth`` (1-2). Naturally bounded (one seed, BFS depth ≤ 2), so it is the recommended way to "see everything around a node" when the full graph is clipped. Seeds: - ``cl:`` — a precedent; BFS expands ``depth`` levels. - ``tag:`` — a topic hub; its members are level 1, BFS ``depth-1`` more. - ``pa:`` — a practice-area hub; same as topic. """ types = normalize_node_types(node_types) depth = max(1, min(int(depth), 3)) # BFS is still bounded by NODE_CAP_MAX prefix, _, rest = node_id.partition(":") rest = rest.strip() if prefix not in {"cl", "tag", "pa"} or not rest: return CorpusGraph(nodes=[], edges=[]) async with pool.acquire() as conn: # Seed the precedent id set + remaining BFS levels. if prefix == "cl": try: seed_uuid = UUID(rest) except ValueError: return CorpusGraph(nodes=[], edges=[]) current: set = {seed_uuid} levels_left = depth # The seed hub types are whatever the caller asked for. forced_types = types elif prefix == "tag": rows = await conn.fetch( """ SELECT c.id FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag) WHERE btrim(t.tag) = $1 LIMIT $2 """, rest, NODE_CAP_MAX, ) current = {r["id"] for r in rows} levels_left = depth - 1 forced_types = types | {"topic"} # ensure the focused hub renders else: # pa rows = await conn.fetch( "SELECT id FROM case_law WHERE practice_area = $1 LIMIT $2", rest, NODE_CAP_MAX, ) current = {r["id"] for r in rows} levels_left = depth - 1 forced_types = types | {"practice_area"} if not current: return CorpusGraph(nodes=[], edges=[]) # BFS over citation + same-chain edges (undirected for traversal). all_ids = set(current) frontier = set(current) truncated = False while levels_left > 0 and frontier: if len(all_ids) >= NODE_CAP_MAX: truncated = True break nb_rows = await conn.fetch( """ SELECT cited_case_law_id AS nb FROM precedent_internal_citations WHERE cited_case_law_id IS NOT NULL AND source_case_law_id = ANY($1::uuid[]) UNION SELECT source_case_law_id AS nb FROM precedent_internal_citations WHERE cited_case_law_id = ANY($1::uuid[]) UNION SELECT related_id AS nb FROM case_law_relations WHERE case_law_id = ANY($1::uuid[]) UNION SELECT case_law_id AS nb FROM case_law_relations WHERE related_id = ANY($1::uuid[]) """, list(frontier), ) nbs = {r["nb"] for r in nb_rows} - all_ids all_ids |= nbs frontier = nbs levels_left -= 1 ids = list(all_ids)[:NODE_CAP_MAX] prec_rows = await conn.fetch( _PREC_INDEG_CTE + """ SELECT c.id, c.case_number, c.case_name, c.practice_area, c.source_kind, c.precedent_level, c.court, c.date, COALESCE(p.n, 0) AS size FROM case_law c LEFT JOIN prec_indeg p ON p.id = c.id WHERE c.id = ANY($1::uuid[]) """, ids, ) nodes = [_precedent_node(r) for r in prec_rows] prec_id_list = [r["id"] for r in prec_rows] hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, forced_types) nodes.extend(hub_nodes) if "gap" in forced_types: gap_nodes, gap_edges = await _gap_nodes_and_edges(conn, prec_id_list) nodes.extend(gap_nodes) edges.extend(gap_edges) if "digest" in forced_types: await _add_digests(conn, prec_id_list, nodes, edges) return CorpusGraph( nodes=nodes, edges=edges, truncated=truncated, total_available=len(nodes), ) async def build_facets(pool: asyncpg.Pool) -> GraphFacets: """Distinct, non-empty filter values from ``case_law`` for the UI dropdowns. Keeps the closed-vs-open-enum problem server-side so the frontend never hardcodes Hebrew court / chair strings (a UI1 source-of-truth concern). """ async with pool.acquire() as conn: rows = await conn.fetch( """ SELECT 'court' AS kind, court AS v FROM case_law WHERE court <> '' UNION SELECT 'level', precedent_level FROM case_law WHERE precedent_level <> '' UNION SELECT 'chair', chair_name FROM case_law WHERE chair_name <> '' UNION SELECT 'district', district FROM case_law WHERE district <> '' ORDER BY 1, 2 """ ) buckets: dict[str, list[str]] = {"court": [], "level": [], "chair": [], "district": []} for r in rows: buckets[r["kind"]].append(r["v"]) return GraphFacets( courts=buckets["court"], precedent_levels=buckets["level"], chairs=buckets["chair"], districts=buckets["district"], )