Files
legal-ai/web/graph_api.py
Chaim c80e4ce8ff feat(graph): in-app corpus citation graph (/graph) — Phase 1
Native, Obsidian-graph-view-like network of the precedent corpus, rendered
in web-ui from a read-only projection of the live DB. Replaces the idea of
exporting to an external Obsidian vault (which would be a parallel, drifting
copy of the corpus — the exact root cause G2 forbids).

The graph edges already existed in the data model; this only surfaces them:
nodes = precedents (case_law) + synthesized topic/practice-area hubs;
edges = cites (precedent_internal_citations) + same_chain (case_law_relations)
+ tagged/in_area (subject_tags / practice_area membership). Node size =
incoming-citation count (index-backed GROUP BY on idx_pic_target). Click a
node → local-graph neighborhood focus; panel deep-links to /precedents/[id].

Backend (read-only, SELECT only — G2):
- web/graph_api.py — Pydantic models (CorpusGraph/GraphNode/GraphEdge, so
  OpenAPI emits real types — UI2) + SQL assembly over the shared db.get_pool().
- web/app.py — GET /api/graph/corpus, GET /api/graph/node/{id}/neighborhood,
  both with explicit response_model. practice_area validated against the
  closed enum (G5); both endpoints write nothing.

Frontend:
- react-force-graph-2d (canvas/d3-force), loaded via next/dynamic ssr:false.
- /graph page + nav entry; graph.ts TanStack hooks; filter panel (practice_area
  / source / min-citations / search / node-type toggles), node detail panel,
  hover+selection neighborhood highlight. Explicit error handling (UI4).

Not a retrieval path (03-retrieval): returns graph topology, never ranked
search results. Halacha nodes + corroboration/equivalence edges are Phase 2,
already gated behind the node_types param (no contract change needed).

SQL validated read-only against the live DB (142 precedents, 85 resolved
citations, JSONB tag expansion, ANY(uuid[]) edge + BFS queries). web-ui lint
+ build pass; /graph in the route table.

Invariants: keeps G2 (single source of truth — live projection, no parallel
store), G5 (corpus separation filtered server-side), UI2 (response models),
UI4 (no swallowed UI errors).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 18:50:56 +00:00

386 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Corpus graph projection — read-only topology of the precedent corpus.
Powers the ``/graph`` page (the in-app, Obsidian-graph-view-like network of the
legal corpus). This module is a **pure projection** of the live corpus, not a
parallel store: every node and edge is assembled on the fly from the canonical
tables via the shared ``db.get_pool()`` connection. It writes nothing
(``SELECT`` only), so it cannot drift from the source of truth — preserving
**G2** (single source of truth, no parallel paths). It is also **not a retrieval
path** (03-retrieval): it returns graph topology (nodes + edges + in-degree),
never ranked search results, so it cannot become a second, drifting way to
"find" precedents.
Phase 1 node types:
- ``precedent`` — a row in ``case_law`` (external rulings + committee decisions)
- ``topic`` — a synthesized hub per ``subject_tag``
- ``practice_area`` — a synthesized hub per ``case_law.practice_area``
Phase 1 edge types:
- ``cites`` — ``precedent_internal_citations`` (source → cited)
- ``same_chain`` — ``case_law_relations`` (undirected, same-case chain)
- ``tagged`` — synthesized precedent → topic-hub membership
- ``in_area`` — synthesized precedent → practice-area-hub membership
Node **size = importance = incoming-citation count**, computed in SQL via the
``idx_pic_target`` index (a single index-backed ``GROUP BY``, never N+1).
Halacha nodes + corroboration/equivalence edges are Phase 2 (gated behind the
``node_types`` param), so the frontend can already send/hide ``halacha`` without
a contract change.
"""
from __future__ import annotations
from uuid import UUID
import asyncpg
from pydantic import BaseModel
# ── Node-type vocabulary ─────────────────────────────────────────────
VALID_NODE_TYPES = {"precedent", "halacha", "topic", "practice_area"}
DEFAULT_NODE_TYPES = ("precedent", "topic", "practice_area")
NODE_CAP_DEFAULT = 400
NODE_CAP_MAX = 1500
# Hebrew labels for the closed practice-area enum (G5). Unknown values fall
# back to the raw token so a new area still renders rather than vanishing.
_PA_LABELS = {
"rishuy_uvniya": "רישוי ובנייה",
"betterment_levy": "היטל השבחה",
"compensation_197": "פיצויים (ס׳ 197)",
"appeals_committee": "ועדת ערר",
}
# ── Response models (UI2: explicit Pydantic → real generated types) ───
class GraphNode(BaseModel):
id: str # "cl:<uuid>" | "hal:<uuid>" | "tag:<text>" | "pa:<token>"
type: str # precedent | halacha | topic | practice_area
label: str
size: int = 0 # incoming-citation count; 0 for hubs in Phase 1
practice_area: str | None = None
source_kind: str | None = None # precedents only
precedent_level: str | None = None # precedents only
case_law_id: str | None = None # canonical id for deep-link (precedents)
class GraphEdge(BaseModel):
source: str
target: str
type: str # cites | same_chain | tagged | in_area
treatment: str | None = None
weight: float | None = None
class CorpusGraph(BaseModel):
nodes: list[GraphNode]
edges: list[GraphEdge]
truncated: bool = False # true when the node cap clipped the result
total_available: int = 0 # precedents matching the filters before the cap
# ── Helpers ──────────────────────────────────────────────────────────
def normalize_node_types(node_types: str) -> set[str]:
"""Parse the ``node_types`` CSV param into a validated set.
Empty / all-invalid input falls back to the Phase-1 default so a missing
param never yields an empty graph.
"""
toks = {t.strip() for t in (node_types or "").split(",") if t.strip()}
valid = {t for t in toks if t in VALID_NODE_TYPES}
return valid or set(DEFAULT_NODE_TYPES)
_PREC_INDEG_CTE = """
WITH prec_indeg AS (
SELECT cited_case_law_id AS id, COUNT(*) AS n
FROM precedent_internal_citations
WHERE cited_case_law_id IS NOT NULL
GROUP BY cited_case_law_id
)
"""
def _precedent_node(row: asyncpg.Record) -> GraphNode:
label = (row["case_number"] or "").strip() or (row["case_name"] or "").strip() or ""
return GraphNode(
id=f"cl:{row['id']}",
type="precedent",
label=label,
size=int(row["size"] or 0),
practice_area=(row["practice_area"] or None),
source_kind=(row["source_kind"] or None),
precedent_level=(row["precedent_level"] or None),
case_law_id=str(row["id"]),
)
async def _edges_and_hubs(
conn: asyncpg.Connection,
prec_rows: list[asyncpg.Record],
types: set[str],
) -> tuple[list[GraphNode], list[GraphEdge]]:
"""Build intra-set edges + synthesized topic/practice-area hub nodes.
Only edges whose BOTH endpoints are in ``prec_rows`` are emitted — an edge
to a precedent that was clipped by the node cap is dropped so the client
never receives a dangling reference.
"""
hub_nodes: list[GraphNode] = []
edges: list[GraphEdge] = []
prec_ids = [r["id"] for r in prec_rows]
if not prec_ids:
return hub_nodes, edges
# cites — directional precedent → precedent
cite_rows = await conn.fetch(
"""
SELECT source_case_law_id AS s, cited_case_law_id AS t, treatment, confidence
FROM precedent_internal_citations
WHERE cited_case_law_id IS NOT NULL
AND source_case_law_id = ANY($1::uuid[])
AND cited_case_law_id = ANY($1::uuid[])
""",
prec_ids,
)
for r in cite_rows:
edges.append(
GraphEdge(
source=f"cl:{r['s']}",
target=f"cl:{r['t']}",
type="cites",
treatment=(r["treatment"] or None),
weight=float(r["confidence"]) if r["confidence"] is not None else None,
)
)
# same_chain — undirected; stored possibly in both directions → dedup
rel_rows = await conn.fetch(
"""
SELECT case_law_id AS s, related_id AS t
FROM case_law_relations
WHERE case_law_id = ANY($1::uuid[]) AND related_id = ANY($1::uuid[])
""",
prec_ids,
)
seen_chain: set[tuple[str, str]] = set()
for r in rel_rows:
key = tuple(sorted((str(r["s"]), str(r["t"]))))
if key in seen_chain:
continue
seen_chain.add(key)
edges.append(
GraphEdge(source=f"cl:{r['s']}", target=f"cl:{r['t']}", type="same_chain")
)
# topic hubs — case_law.subject_tags is JSONB → expand in SQL
if "topic" in types:
tag_rows = await conn.fetch(
"""
SELECT c.id, btrim(t.tag) AS tag
FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag)
WHERE c.id = ANY($1::uuid[]) AND btrim(t.tag) <> ''
""",
prec_ids,
)
tag_seen: set[str] = set()
for r in tag_rows:
tag = r["tag"]
tid = f"tag:{tag}"
if tag not in tag_seen:
tag_seen.add(tag)
hub_nodes.append(GraphNode(id=tid, type="topic", label=tag))
edges.append(GraphEdge(source=f"cl:{r['id']}", target=tid, type="tagged"))
# practice-area hubs — scalar column on each precedent row
if "practice_area" in types:
pa_seen: set[str] = set()
for r in prec_rows:
pa = (r["practice_area"] or "").strip()
if not pa:
continue
pid = f"pa:{pa}"
if pa not in pa_seen:
pa_seen.add(pa)
hub_nodes.append(
GraphNode(
id=pid,
type="practice_area",
label=_PA_LABELS.get(pa, pa),
practice_area=pa,
)
)
edges.append(GraphEdge(source=f"cl:{r['id']}", target=pid, type="in_area"))
return hub_nodes, edges
# ── Endpoints' core logic ────────────────────────────────────────────
async def build_corpus_graph(
pool: asyncpg.Pool,
*,
practice_area: str = "",
source: str = "",
node_types: str = "",
min_citations: int = 0,
limit: int = NODE_CAP_DEFAULT,
q: str = "",
) -> CorpusGraph:
"""Assemble the full corpus graph under the given filters.
The most-cited precedents always survive the cap (``ORDER BY size DESC``),
so clipping never hides the structurally important nodes. ``truncated`` +
``total_available`` let the UI prompt the user to narrow filters.
"""
types = normalize_node_types(node_types)
cap = max(1, min(int(limit), NODE_CAP_MAX))
min_cit = max(0, int(min_citations))
async with pool.acquire() as conn:
prec_rows = await conn.fetch(
_PREC_INDEG_CTE
+ """
SELECT c.id, c.case_number, c.case_name,
c.practice_area, c.source_kind, c.precedent_level,
COALESCE(p.n, 0) AS size,
COUNT(*) OVER () AS total_available
FROM case_law c
LEFT JOIN prec_indeg p ON p.id = c.id
WHERE ($1 = '' OR c.practice_area = $1)
AND ($2 = '' OR c.source_kind = $2)
AND COALESCE(p.n, 0) >= $3
AND ($4 = '' OR c.case_number ILIKE '%' || $4 || '%'
OR c.case_name ILIKE '%' || $4 || '%')
ORDER BY COALESCE(p.n, 0) DESC, c.case_number
LIMIT $5
""",
practice_area,
source,
min_cit,
q.strip(),
cap,
)
total_available = int(prec_rows[0]["total_available"]) if prec_rows else 0
nodes = [_precedent_node(r) for r in prec_rows]
hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, types)
nodes.extend(hub_nodes)
return CorpusGraph(
nodes=nodes,
edges=edges,
truncated=total_available > len(prec_rows),
total_available=total_available,
)
async def build_node_neighborhood(
pool: asyncpg.Pool,
node_id: str,
*,
depth: int = 1,
node_types: str = "",
) -> CorpusGraph:
"""Local-graph focus: the seed node + its neighbors out to ``depth`` (1-2).
Naturally bounded (one seed, BFS depth ≤ 2), so it is the recommended way to
"see everything around a node" when the full graph is clipped. Seeds:
- ``cl:<uuid>`` — a precedent; BFS expands ``depth`` levels.
- ``tag:<text>`` — a topic hub; its members are level 1, BFS ``depth-1`` more.
- ``pa:<token>`` — a practice-area hub; same as topic.
"""
types = normalize_node_types(node_types)
depth = max(1, min(int(depth), 2))
prefix, _, rest = node_id.partition(":")
rest = rest.strip()
if prefix not in {"cl", "tag", "pa"} or not rest:
return CorpusGraph(nodes=[], edges=[])
async with pool.acquire() as conn:
# Seed the precedent id set + remaining BFS levels.
if prefix == "cl":
try:
seed_uuid = UUID(rest)
except ValueError:
return CorpusGraph(nodes=[], edges=[])
current: set = {seed_uuid}
levels_left = depth
# The seed hub types are whatever the caller asked for.
forced_types = types
elif prefix == "tag":
rows = await conn.fetch(
"""
SELECT c.id
FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag)
WHERE btrim(t.tag) = $1
LIMIT $2
""",
rest,
NODE_CAP_MAX,
)
current = {r["id"] for r in rows}
levels_left = depth - 1
forced_types = types | {"topic"} # ensure the focused hub renders
else: # pa
rows = await conn.fetch(
"SELECT id FROM case_law WHERE practice_area = $1 LIMIT $2",
rest,
NODE_CAP_MAX,
)
current = {r["id"] for r in rows}
levels_left = depth - 1
forced_types = types | {"practice_area"}
if not current:
return CorpusGraph(nodes=[], edges=[])
# BFS over citation + same-chain edges (undirected for traversal).
all_ids = set(current)
frontier = set(current)
truncated = False
while levels_left > 0 and frontier:
if len(all_ids) >= NODE_CAP_MAX:
truncated = True
break
nb_rows = await conn.fetch(
"""
SELECT cited_case_law_id AS nb FROM precedent_internal_citations
WHERE cited_case_law_id IS NOT NULL AND source_case_law_id = ANY($1::uuid[])
UNION
SELECT source_case_law_id AS nb FROM precedent_internal_citations
WHERE cited_case_law_id = ANY($1::uuid[])
UNION
SELECT related_id AS nb FROM case_law_relations WHERE case_law_id = ANY($1::uuid[])
UNION
SELECT case_law_id AS nb FROM case_law_relations WHERE related_id = ANY($1::uuid[])
""",
list(frontier),
)
nbs = {r["nb"] for r in nb_rows} - all_ids
all_ids |= nbs
frontier = nbs
levels_left -= 1
ids = list(all_ids)[:NODE_CAP_MAX]
prec_rows = await conn.fetch(
_PREC_INDEG_CTE
+ """
SELECT c.id, c.case_number, c.case_name,
c.practice_area, c.source_kind, c.precedent_level,
COALESCE(p.n, 0) AS size
FROM case_law c
LEFT JOIN prec_indeg p ON p.id = c.id
WHERE c.id = ANY($1::uuid[])
""",
ids,
)
nodes = [_precedent_node(r) for r in prec_rows]
hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, forced_types)
nodes.extend(hub_nodes)
return CorpusGraph(
nodes=nodes,
edges=edges,
truncated=truncated,
total_available=len(nodes),
)