Native, Obsidian-graph-view-like network of the precedent corpus, rendered
in web-ui from a read-only projection of the live DB. Replaces the idea of
exporting to an external Obsidian vault (which would be a parallel, drifting
copy of the corpus — the exact root cause G2 forbids).
The graph edges already existed in the data model; this only surfaces them:
nodes = precedents (case_law) + synthesized topic/practice-area hubs;
edges = cites (precedent_internal_citations) + same_chain (case_law_relations)
+ tagged/in_area (subject_tags / practice_area membership). Node size =
incoming-citation count (index-backed GROUP BY on idx_pic_target). Click a
node → local-graph neighborhood focus; panel deep-links to /precedents/[id].
Backend (read-only, SELECT only — G2):
- web/graph_api.py — Pydantic models (CorpusGraph/GraphNode/GraphEdge, so
OpenAPI emits real types — UI2) + SQL assembly over the shared db.get_pool().
- web/app.py — GET /api/graph/corpus, GET /api/graph/node/{id}/neighborhood,
both with explicit response_model. practice_area validated against the
closed enum (G5); both endpoints write nothing.
Frontend:
- react-force-graph-2d (canvas/d3-force), loaded via next/dynamic ssr:false.
- /graph page + nav entry; graph.ts TanStack hooks; filter panel (practice_area
/ source / min-citations / search / node-type toggles), node detail panel,
hover+selection neighborhood highlight. Explicit error handling (UI4).
Not a retrieval path (03-retrieval): returns graph topology, never ranked
search results. Halacha nodes + corroboration/equivalence edges are Phase 2,
already gated behind the node_types param (no contract change needed).
SQL validated read-only against the live DB (142 precedents, 85 resolved
citations, JSONB tag expansion, ANY(uuid[]) edge + BFS queries). web-ui lint
+ build pass; /graph in the route table.
Invariants: keeps G2 (single source of truth — live projection, no parallel
store), G5 (corpus separation filtered server-side), UI2 (response models),
UI4 (no swallowed UI errors).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
386 lines
14 KiB
Python
386 lines
14 KiB
Python
"""Corpus graph projection — read-only topology of the precedent corpus.
|
||
|
||
Powers the ``/graph`` page (the in-app, Obsidian-graph-view-like network of the
|
||
legal corpus). This module is a **pure projection** of the live corpus, not a
|
||
parallel store: every node and edge is assembled on the fly from the canonical
|
||
tables via the shared ``db.get_pool()`` connection. It writes nothing
|
||
(``SELECT`` only), so it cannot drift from the source of truth — preserving
|
||
**G2** (single source of truth, no parallel paths). It is also **not a retrieval
|
||
path** (03-retrieval): it returns graph topology (nodes + edges + in-degree),
|
||
never ranked search results, so it cannot become a second, drifting way to
|
||
"find" precedents.
|
||
|
||
Phase 1 node types:
|
||
- ``precedent`` — a row in ``case_law`` (external rulings + committee decisions)
|
||
- ``topic`` — a synthesized hub per ``subject_tag``
|
||
- ``practice_area`` — a synthesized hub per ``case_law.practice_area``
|
||
|
||
Phase 1 edge types:
|
||
- ``cites`` — ``precedent_internal_citations`` (source → cited)
|
||
- ``same_chain`` — ``case_law_relations`` (undirected, same-case chain)
|
||
- ``tagged`` — synthesized precedent → topic-hub membership
|
||
- ``in_area`` — synthesized precedent → practice-area-hub membership
|
||
|
||
Node **size = importance = incoming-citation count**, computed in SQL via the
|
||
``idx_pic_target`` index (a single index-backed ``GROUP BY``, never N+1).
|
||
|
||
Halacha nodes + corroboration/equivalence edges are Phase 2 (gated behind the
|
||
``node_types`` param), so the frontend can already send/hide ``halacha`` without
|
||
a contract change.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
from uuid import UUID
|
||
|
||
import asyncpg
|
||
from pydantic import BaseModel
|
||
|
||
# ── Node-type vocabulary ─────────────────────────────────────────────
|
||
VALID_NODE_TYPES = {"precedent", "halacha", "topic", "practice_area"}
|
||
DEFAULT_NODE_TYPES = ("precedent", "topic", "practice_area")
|
||
NODE_CAP_DEFAULT = 400
|
||
NODE_CAP_MAX = 1500
|
||
|
||
# Hebrew labels for the closed practice-area enum (G5). Unknown values fall
|
||
# back to the raw token so a new area still renders rather than vanishing.
|
||
_PA_LABELS = {
|
||
"rishuy_uvniya": "רישוי ובנייה",
|
||
"betterment_levy": "היטל השבחה",
|
||
"compensation_197": "פיצויים (ס׳ 197)",
|
||
"appeals_committee": "ועדת ערר",
|
||
}
|
||
|
||
|
||
# ── Response models (UI2: explicit Pydantic → real generated types) ───
|
||
class GraphNode(BaseModel):
|
||
id: str # "cl:<uuid>" | "hal:<uuid>" | "tag:<text>" | "pa:<token>"
|
||
type: str # precedent | halacha | topic | practice_area
|
||
label: str
|
||
size: int = 0 # incoming-citation count; 0 for hubs in Phase 1
|
||
practice_area: str | None = None
|
||
source_kind: str | None = None # precedents only
|
||
precedent_level: str | None = None # precedents only
|
||
case_law_id: str | None = None # canonical id for deep-link (precedents)
|
||
|
||
|
||
class GraphEdge(BaseModel):
|
||
source: str
|
||
target: str
|
||
type: str # cites | same_chain | tagged | in_area
|
||
treatment: str | None = None
|
||
weight: float | None = None
|
||
|
||
|
||
class CorpusGraph(BaseModel):
|
||
nodes: list[GraphNode]
|
||
edges: list[GraphEdge]
|
||
truncated: bool = False # true when the node cap clipped the result
|
||
total_available: int = 0 # precedents matching the filters before the cap
|
||
|
||
|
||
# ── Helpers ──────────────────────────────────────────────────────────
|
||
def normalize_node_types(node_types: str) -> set[str]:
|
||
"""Parse the ``node_types`` CSV param into a validated set.
|
||
|
||
Empty / all-invalid input falls back to the Phase-1 default so a missing
|
||
param never yields an empty graph.
|
||
"""
|
||
toks = {t.strip() for t in (node_types or "").split(",") if t.strip()}
|
||
valid = {t for t in toks if t in VALID_NODE_TYPES}
|
||
return valid or set(DEFAULT_NODE_TYPES)
|
||
|
||
|
||
_PREC_INDEG_CTE = """
|
||
WITH prec_indeg AS (
|
||
SELECT cited_case_law_id AS id, COUNT(*) AS n
|
||
FROM precedent_internal_citations
|
||
WHERE cited_case_law_id IS NOT NULL
|
||
GROUP BY cited_case_law_id
|
||
)
|
||
"""
|
||
|
||
|
||
def _precedent_node(row: asyncpg.Record) -> GraphNode:
|
||
label = (row["case_number"] or "").strip() or (row["case_name"] or "").strip() or "—"
|
||
return GraphNode(
|
||
id=f"cl:{row['id']}",
|
||
type="precedent",
|
||
label=label,
|
||
size=int(row["size"] or 0),
|
||
practice_area=(row["practice_area"] or None),
|
||
source_kind=(row["source_kind"] or None),
|
||
precedent_level=(row["precedent_level"] or None),
|
||
case_law_id=str(row["id"]),
|
||
)
|
||
|
||
|
||
async def _edges_and_hubs(
|
||
conn: asyncpg.Connection,
|
||
prec_rows: list[asyncpg.Record],
|
||
types: set[str],
|
||
) -> tuple[list[GraphNode], list[GraphEdge]]:
|
||
"""Build intra-set edges + synthesized topic/practice-area hub nodes.
|
||
|
||
Only edges whose BOTH endpoints are in ``prec_rows`` are emitted — an edge
|
||
to a precedent that was clipped by the node cap is dropped so the client
|
||
never receives a dangling reference.
|
||
"""
|
||
hub_nodes: list[GraphNode] = []
|
||
edges: list[GraphEdge] = []
|
||
prec_ids = [r["id"] for r in prec_rows]
|
||
if not prec_ids:
|
||
return hub_nodes, edges
|
||
|
||
# cites — directional precedent → precedent
|
||
cite_rows = await conn.fetch(
|
||
"""
|
||
SELECT source_case_law_id AS s, cited_case_law_id AS t, treatment, confidence
|
||
FROM precedent_internal_citations
|
||
WHERE cited_case_law_id IS NOT NULL
|
||
AND source_case_law_id = ANY($1::uuid[])
|
||
AND cited_case_law_id = ANY($1::uuid[])
|
||
""",
|
||
prec_ids,
|
||
)
|
||
for r in cite_rows:
|
||
edges.append(
|
||
GraphEdge(
|
||
source=f"cl:{r['s']}",
|
||
target=f"cl:{r['t']}",
|
||
type="cites",
|
||
treatment=(r["treatment"] or None),
|
||
weight=float(r["confidence"]) if r["confidence"] is not None else None,
|
||
)
|
||
)
|
||
|
||
# same_chain — undirected; stored possibly in both directions → dedup
|
||
rel_rows = await conn.fetch(
|
||
"""
|
||
SELECT case_law_id AS s, related_id AS t
|
||
FROM case_law_relations
|
||
WHERE case_law_id = ANY($1::uuid[]) AND related_id = ANY($1::uuid[])
|
||
""",
|
||
prec_ids,
|
||
)
|
||
seen_chain: set[tuple[str, str]] = set()
|
||
for r in rel_rows:
|
||
key = tuple(sorted((str(r["s"]), str(r["t"]))))
|
||
if key in seen_chain:
|
||
continue
|
||
seen_chain.add(key)
|
||
edges.append(
|
||
GraphEdge(source=f"cl:{r['s']}", target=f"cl:{r['t']}", type="same_chain")
|
||
)
|
||
|
||
# topic hubs — case_law.subject_tags is JSONB → expand in SQL
|
||
if "topic" in types:
|
||
tag_rows = await conn.fetch(
|
||
"""
|
||
SELECT c.id, btrim(t.tag) AS tag
|
||
FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag)
|
||
WHERE c.id = ANY($1::uuid[]) AND btrim(t.tag) <> ''
|
||
""",
|
||
prec_ids,
|
||
)
|
||
tag_seen: set[str] = set()
|
||
for r in tag_rows:
|
||
tag = r["tag"]
|
||
tid = f"tag:{tag}"
|
||
if tag not in tag_seen:
|
||
tag_seen.add(tag)
|
||
hub_nodes.append(GraphNode(id=tid, type="topic", label=tag))
|
||
edges.append(GraphEdge(source=f"cl:{r['id']}", target=tid, type="tagged"))
|
||
|
||
# practice-area hubs — scalar column on each precedent row
|
||
if "practice_area" in types:
|
||
pa_seen: set[str] = set()
|
||
for r in prec_rows:
|
||
pa = (r["practice_area"] or "").strip()
|
||
if not pa:
|
||
continue
|
||
pid = f"pa:{pa}"
|
||
if pa not in pa_seen:
|
||
pa_seen.add(pa)
|
||
hub_nodes.append(
|
||
GraphNode(
|
||
id=pid,
|
||
type="practice_area",
|
||
label=_PA_LABELS.get(pa, pa),
|
||
practice_area=pa,
|
||
)
|
||
)
|
||
edges.append(GraphEdge(source=f"cl:{r['id']}", target=pid, type="in_area"))
|
||
|
||
return hub_nodes, edges
|
||
|
||
|
||
# ── Endpoints' core logic ────────────────────────────────────────────
|
||
async def build_corpus_graph(
|
||
pool: asyncpg.Pool,
|
||
*,
|
||
practice_area: str = "",
|
||
source: str = "",
|
||
node_types: str = "",
|
||
min_citations: int = 0,
|
||
limit: int = NODE_CAP_DEFAULT,
|
||
q: str = "",
|
||
) -> CorpusGraph:
|
||
"""Assemble the full corpus graph under the given filters.
|
||
|
||
The most-cited precedents always survive the cap (``ORDER BY size DESC``),
|
||
so clipping never hides the structurally important nodes. ``truncated`` +
|
||
``total_available`` let the UI prompt the user to narrow filters.
|
||
"""
|
||
types = normalize_node_types(node_types)
|
||
cap = max(1, min(int(limit), NODE_CAP_MAX))
|
||
min_cit = max(0, int(min_citations))
|
||
|
||
async with pool.acquire() as conn:
|
||
prec_rows = await conn.fetch(
|
||
_PREC_INDEG_CTE
|
||
+ """
|
||
SELECT c.id, c.case_number, c.case_name,
|
||
c.practice_area, c.source_kind, c.precedent_level,
|
||
COALESCE(p.n, 0) AS size,
|
||
COUNT(*) OVER () AS total_available
|
||
FROM case_law c
|
||
LEFT JOIN prec_indeg p ON p.id = c.id
|
||
WHERE ($1 = '' OR c.practice_area = $1)
|
||
AND ($2 = '' OR c.source_kind = $2)
|
||
AND COALESCE(p.n, 0) >= $3
|
||
AND ($4 = '' OR c.case_number ILIKE '%' || $4 || '%'
|
||
OR c.case_name ILIKE '%' || $4 || '%')
|
||
ORDER BY COALESCE(p.n, 0) DESC, c.case_number
|
||
LIMIT $5
|
||
""",
|
||
practice_area,
|
||
source,
|
||
min_cit,
|
||
q.strip(),
|
||
cap,
|
||
)
|
||
|
||
total_available = int(prec_rows[0]["total_available"]) if prec_rows else 0
|
||
nodes = [_precedent_node(r) for r in prec_rows]
|
||
hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, types)
|
||
nodes.extend(hub_nodes)
|
||
|
||
return CorpusGraph(
|
||
nodes=nodes,
|
||
edges=edges,
|
||
truncated=total_available > len(prec_rows),
|
||
total_available=total_available,
|
||
)
|
||
|
||
|
||
async def build_node_neighborhood(
|
||
pool: asyncpg.Pool,
|
||
node_id: str,
|
||
*,
|
||
depth: int = 1,
|
||
node_types: str = "",
|
||
) -> CorpusGraph:
|
||
"""Local-graph focus: the seed node + its neighbors out to ``depth`` (1-2).
|
||
|
||
Naturally bounded (one seed, BFS depth ≤ 2), so it is the recommended way to
|
||
"see everything around a node" when the full graph is clipped. Seeds:
|
||
- ``cl:<uuid>`` — a precedent; BFS expands ``depth`` levels.
|
||
- ``tag:<text>`` — a topic hub; its members are level 1, BFS ``depth-1`` more.
|
||
- ``pa:<token>`` — a practice-area hub; same as topic.
|
||
"""
|
||
types = normalize_node_types(node_types)
|
||
depth = max(1, min(int(depth), 2))
|
||
prefix, _, rest = node_id.partition(":")
|
||
rest = rest.strip()
|
||
if prefix not in {"cl", "tag", "pa"} or not rest:
|
||
return CorpusGraph(nodes=[], edges=[])
|
||
|
||
async with pool.acquire() as conn:
|
||
# Seed the precedent id set + remaining BFS levels.
|
||
if prefix == "cl":
|
||
try:
|
||
seed_uuid = UUID(rest)
|
||
except ValueError:
|
||
return CorpusGraph(nodes=[], edges=[])
|
||
current: set = {seed_uuid}
|
||
levels_left = depth
|
||
# The seed hub types are whatever the caller asked for.
|
||
forced_types = types
|
||
elif prefix == "tag":
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT c.id
|
||
FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag)
|
||
WHERE btrim(t.tag) = $1
|
||
LIMIT $2
|
||
""",
|
||
rest,
|
||
NODE_CAP_MAX,
|
||
)
|
||
current = {r["id"] for r in rows}
|
||
levels_left = depth - 1
|
||
forced_types = types | {"topic"} # ensure the focused hub renders
|
||
else: # pa
|
||
rows = await conn.fetch(
|
||
"SELECT id FROM case_law WHERE practice_area = $1 LIMIT $2",
|
||
rest,
|
||
NODE_CAP_MAX,
|
||
)
|
||
current = {r["id"] for r in rows}
|
||
levels_left = depth - 1
|
||
forced_types = types | {"practice_area"}
|
||
|
||
if not current:
|
||
return CorpusGraph(nodes=[], edges=[])
|
||
|
||
# BFS over citation + same-chain edges (undirected for traversal).
|
||
all_ids = set(current)
|
||
frontier = set(current)
|
||
truncated = False
|
||
while levels_left > 0 and frontier:
|
||
if len(all_ids) >= NODE_CAP_MAX:
|
||
truncated = True
|
||
break
|
||
nb_rows = await conn.fetch(
|
||
"""
|
||
SELECT cited_case_law_id AS nb FROM precedent_internal_citations
|
||
WHERE cited_case_law_id IS NOT NULL AND source_case_law_id = ANY($1::uuid[])
|
||
UNION
|
||
SELECT source_case_law_id AS nb FROM precedent_internal_citations
|
||
WHERE cited_case_law_id = ANY($1::uuid[])
|
||
UNION
|
||
SELECT related_id AS nb FROM case_law_relations WHERE case_law_id = ANY($1::uuid[])
|
||
UNION
|
||
SELECT case_law_id AS nb FROM case_law_relations WHERE related_id = ANY($1::uuid[])
|
||
""",
|
||
list(frontier),
|
||
)
|
||
nbs = {r["nb"] for r in nb_rows} - all_ids
|
||
all_ids |= nbs
|
||
frontier = nbs
|
||
levels_left -= 1
|
||
|
||
ids = list(all_ids)[:NODE_CAP_MAX]
|
||
prec_rows = await conn.fetch(
|
||
_PREC_INDEG_CTE
|
||
+ """
|
||
SELECT c.id, c.case_number, c.case_name,
|
||
c.practice_area, c.source_kind, c.precedent_level,
|
||
COALESCE(p.n, 0) AS size
|
||
FROM case_law c
|
||
LEFT JOIN prec_indeg p ON p.id = c.id
|
||
WHERE c.id = ANY($1::uuid[])
|
||
""",
|
||
ids,
|
||
)
|
||
nodes = [_precedent_node(r) for r in prec_rows]
|
||
hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, forced_types)
|
||
nodes.extend(hub_nodes)
|
||
|
||
return CorpusGraph(
|
||
nodes=nodes,
|
||
edges=edges,
|
||
truncated=truncated,
|
||
total_available=len(nodes),
|
||
)
|