Files
legal-ai/web/graph_api.py
Chaim 8258f09228 feat(graph): metadata filters + facets (corpus graph PR A)
Adds legal-metadata filtering and the payload to color by it (foundation for
the color-by selector in the analytics PR).

Backend (web/graph_api.py, web/app.py) — read-only, G2:
- GraphNode += court, date (ISO) — precedents carry them for filter/color-by.
- build_corpus_graph += server-side WHERE filters (G5): court, precedent_level,
  chair, district, year_from, year_to (EXTRACT(YEAR FROM date)). Neighborhood
  query also selects court/date.
- New GET /api/graph/facets (response_model GraphFacets, UI2) → distinct
  courts/levels/chairs/districts so the UI doesn't hardcode Hebrew strings.

Frontend:
- graph.ts: GraphNode += court/date; GraphFilters += the six params;
  buildParams; useGraphFacets() hook.
- graph-filter-panel: an "advanced" Accordion with court/precedent_level/chair/
  district Selects (from facets) + year-from/year-to Selects.
- graph-view: new controls wired into filters; facets fetched and passed down.

Verified read-only against the live DB (precedent_level=עליון&year_from=2015
filters correctly; facets populated: 36 courts / 3 levels / 19 chairs / 4
districts). web-ui build + lint pass.

Invariants: G2 (SELECT-only via db.get_pool), G5 (filters server-side),
UI2 (explicit response_models). api:types to be regenerated post-deploy.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 20:52:13 +00:00

450 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Corpus graph projection — read-only topology of the precedent corpus.
Powers the ``/graph`` page (the in-app, Obsidian-graph-view-like network of the
legal corpus). This module is a **pure projection** of the live corpus, not a
parallel store: every node and edge is assembled on the fly from the canonical
tables via the shared ``db.get_pool()`` connection. It writes nothing
(``SELECT`` only), so it cannot drift from the source of truth — preserving
**G2** (single source of truth, no parallel paths). It is also **not a retrieval
path** (03-retrieval): it returns graph topology (nodes + edges + in-degree),
never ranked search results, so it cannot become a second, drifting way to
"find" precedents.
Phase 1 node types:
- ``precedent`` — a row in ``case_law`` (external rulings + committee decisions)
- ``topic`` — a synthesized hub per ``subject_tag``
- ``practice_area`` — a synthesized hub per ``case_law.practice_area``
Phase 1 edge types:
- ``cites`` — ``precedent_internal_citations`` (source → cited)
- ``same_chain`` — ``case_law_relations`` (undirected, same-case chain)
- ``tagged`` — synthesized precedent → topic-hub membership
- ``in_area`` — synthesized precedent → practice-area-hub membership
Node **size = importance = incoming-citation count**, computed in SQL via the
``idx_pic_target`` index (a single index-backed ``GROUP BY``, never N+1).
Halacha nodes + corroboration/equivalence edges are Phase 2 (gated behind the
``node_types`` param), so the frontend can already send/hide ``halacha`` without
a contract change.
"""
from __future__ import annotations
from uuid import UUID
import asyncpg
from pydantic import BaseModel
# ── Node-type vocabulary ─────────────────────────────────────────────
VALID_NODE_TYPES = {"precedent", "halacha", "topic", "practice_area"}
DEFAULT_NODE_TYPES = ("precedent", "topic", "practice_area")
NODE_CAP_DEFAULT = 400
NODE_CAP_MAX = 1500
# Hebrew labels for the closed practice-area enum (G5). Unknown values fall
# back to the raw token so a new area still renders rather than vanishing.
_PA_LABELS = {
"rishuy_uvniya": "רישוי ובנייה",
"betterment_levy": "היטל השבחה",
"compensation_197": "פיצויים (ס׳ 197)",
"appeals_committee": "ועדת ערר",
}
# ── Response models (UI2: explicit Pydantic → real generated types) ───
class GraphNode(BaseModel):
id: str # "cl:<uuid>" | "hal:<uuid>" | "tag:<text>" | "pa:<token>"
type: str # precedent | halacha | topic | practice_area
label: str
size: int = 0 # incoming-citation count; 0 for hubs in Phase 1
practice_area: str | None = None
source_kind: str | None = None # precedents only
precedent_level: str | None = None # precedents only
court: str | None = None # precedents only — for color-by / filter
date: str | None = None # precedents only — ISO date, for recency color/filter
case_law_id: str | None = None # canonical id for deep-link (precedents)
class GraphFacets(BaseModel):
"""Distinct filter values so the UI doesn't hardcode Hebrew enum strings."""
courts: list[str]
precedent_levels: list[str]
chairs: list[str]
districts: list[str]
class GraphEdge(BaseModel):
source: str
target: str
type: str # cites | same_chain | tagged | in_area
treatment: str | None = None
weight: float | None = None
class CorpusGraph(BaseModel):
nodes: list[GraphNode]
edges: list[GraphEdge]
truncated: bool = False # true when the node cap clipped the result
total_available: int = 0 # precedents matching the filters before the cap
# ── Helpers ──────────────────────────────────────────────────────────
def normalize_node_types(node_types: str) -> set[str]:
"""Parse the ``node_types`` CSV param into a validated set.
Empty / all-invalid input falls back to the Phase-1 default so a missing
param never yields an empty graph.
"""
toks = {t.strip() for t in (node_types or "").split(",") if t.strip()}
valid = {t for t in toks if t in VALID_NODE_TYPES}
return valid or set(DEFAULT_NODE_TYPES)
_PREC_INDEG_CTE = """
WITH prec_indeg AS (
SELECT cited_case_law_id AS id, COUNT(*) AS n
FROM precedent_internal_citations
WHERE cited_case_law_id IS NOT NULL
GROUP BY cited_case_law_id
)
"""
def _precedent_node(row: asyncpg.Record) -> GraphNode:
label = (row["case_number"] or "").strip() or (row["case_name"] or "").strip() or ""
return GraphNode(
id=f"cl:{row['id']}",
type="precedent",
label=label,
size=int(row["size"] or 0),
practice_area=(row["practice_area"] or None),
source_kind=(row["source_kind"] or None),
precedent_level=(row["precedent_level"] or None),
court=(row["court"] or None),
date=(row["date"].isoformat() if row["date"] else None),
case_law_id=str(row["id"]),
)
async def _edges_and_hubs(
conn: asyncpg.Connection,
prec_rows: list[asyncpg.Record],
types: set[str],
) -> tuple[list[GraphNode], list[GraphEdge]]:
"""Build intra-set edges + synthesized topic/practice-area hub nodes.
Only edges whose BOTH endpoints are in ``prec_rows`` are emitted — an edge
to a precedent that was clipped by the node cap is dropped so the client
never receives a dangling reference.
"""
hub_nodes: list[GraphNode] = []
edges: list[GraphEdge] = []
prec_ids = [r["id"] for r in prec_rows]
if not prec_ids:
return hub_nodes, edges
# cites — directional precedent → precedent
cite_rows = await conn.fetch(
"""
SELECT source_case_law_id AS s, cited_case_law_id AS t, treatment, confidence
FROM precedent_internal_citations
WHERE cited_case_law_id IS NOT NULL
AND source_case_law_id = ANY($1::uuid[])
AND cited_case_law_id = ANY($1::uuid[])
""",
prec_ids,
)
for r in cite_rows:
edges.append(
GraphEdge(
source=f"cl:{r['s']}",
target=f"cl:{r['t']}",
type="cites",
treatment=(r["treatment"] or None),
weight=float(r["confidence"]) if r["confidence"] is not None else None,
)
)
# same_chain — undirected; stored possibly in both directions → dedup
rel_rows = await conn.fetch(
"""
SELECT case_law_id AS s, related_id AS t
FROM case_law_relations
WHERE case_law_id = ANY($1::uuid[]) AND related_id = ANY($1::uuid[])
""",
prec_ids,
)
seen_chain: set[tuple[str, str]] = set()
for r in rel_rows:
key = tuple(sorted((str(r["s"]), str(r["t"]))))
if key in seen_chain:
continue
seen_chain.add(key)
edges.append(
GraphEdge(source=f"cl:{r['s']}", target=f"cl:{r['t']}", type="same_chain")
)
# topic hubs — case_law.subject_tags is JSONB → expand in SQL
if "topic" in types:
tag_rows = await conn.fetch(
"""
SELECT c.id, btrim(t.tag) AS tag
FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag)
WHERE c.id = ANY($1::uuid[]) AND btrim(t.tag) <> ''
""",
prec_ids,
)
tag_seen: set[str] = set()
for r in tag_rows:
tag = r["tag"]
tid = f"tag:{tag}"
if tag not in tag_seen:
tag_seen.add(tag)
hub_nodes.append(GraphNode(id=tid, type="topic", label=tag))
edges.append(GraphEdge(source=f"cl:{r['id']}", target=tid, type="tagged"))
# practice-area hubs — scalar column on each precedent row
if "practice_area" in types:
pa_seen: set[str] = set()
for r in prec_rows:
pa = (r["practice_area"] or "").strip()
if not pa:
continue
pid = f"pa:{pa}"
if pa not in pa_seen:
pa_seen.add(pa)
hub_nodes.append(
GraphNode(
id=pid,
type="practice_area",
label=_PA_LABELS.get(pa, pa),
practice_area=pa,
)
)
edges.append(GraphEdge(source=f"cl:{r['id']}", target=pid, type="in_area"))
return hub_nodes, edges
# ── Endpoints' core logic ────────────────────────────────────────────
async def build_corpus_graph(
pool: asyncpg.Pool,
*,
practice_area: str = "",
source: str = "",
node_types: str = "",
min_citations: int = 0,
limit: int = NODE_CAP_DEFAULT,
q: str = "",
court: str = "",
precedent_level: str = "",
chair: str = "",
district: str = "",
year_from: int = 0,
year_to: int = 0,
) -> CorpusGraph:
"""Assemble the full corpus graph under the given filters.
The most-cited precedents always survive the cap (``ORDER BY size DESC``),
so clipping never hides the structurally important nodes. ``truncated`` +
``total_available`` let the UI prompt the user to narrow filters. All
filters are applied server-side in the WHERE clause (G5).
"""
types = normalize_node_types(node_types)
cap = max(1, min(int(limit), NODE_CAP_MAX))
min_cit = max(0, int(min_citations))
async with pool.acquire() as conn:
prec_rows = await conn.fetch(
_PREC_INDEG_CTE
+ """
SELECT c.id, c.case_number, c.case_name,
c.practice_area, c.source_kind, c.precedent_level,
c.court, c.date,
COALESCE(p.n, 0) AS size,
COUNT(*) OVER () AS total_available
FROM case_law c
LEFT JOIN prec_indeg p ON p.id = c.id
WHERE ($1 = '' OR c.practice_area = $1)
AND ($2 = '' OR c.source_kind = $2)
AND COALESCE(p.n, 0) >= $3
AND ($4 = '' OR c.case_number ILIKE '%' || $4 || '%'
OR c.case_name ILIKE '%' || $4 || '%')
AND ($6 = '' OR c.court = $6)
AND ($7 = '' OR c.precedent_level = $7)
AND ($8 = '' OR c.chair_name = $8)
AND ($9 = '' OR c.district = $9)
AND ($10 = 0 OR (c.date IS NOT NULL AND EXTRACT(YEAR FROM c.date) >= $10))
AND ($11 = 0 OR (c.date IS NOT NULL AND EXTRACT(YEAR FROM c.date) <= $11))
ORDER BY COALESCE(p.n, 0) DESC, c.case_number
LIMIT $5
""",
practice_area,
source,
min_cit,
q.strip(),
cap,
court,
precedent_level,
chair,
district,
max(0, int(year_from)),
max(0, int(year_to)),
)
total_available = int(prec_rows[0]["total_available"]) if prec_rows else 0
nodes = [_precedent_node(r) for r in prec_rows]
hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, types)
nodes.extend(hub_nodes)
return CorpusGraph(
nodes=nodes,
edges=edges,
truncated=total_available > len(prec_rows),
total_available=total_available,
)
async def build_node_neighborhood(
pool: asyncpg.Pool,
node_id: str,
*,
depth: int = 1,
node_types: str = "",
) -> CorpusGraph:
"""Local-graph focus: the seed node + its neighbors out to ``depth`` (1-2).
Naturally bounded (one seed, BFS depth ≤ 2), so it is the recommended way to
"see everything around a node" when the full graph is clipped. Seeds:
- ``cl:<uuid>`` — a precedent; BFS expands ``depth`` levels.
- ``tag:<text>`` — a topic hub; its members are level 1, BFS ``depth-1`` more.
- ``pa:<token>`` — a practice-area hub; same as topic.
"""
types = normalize_node_types(node_types)
depth = max(1, min(int(depth), 2))
prefix, _, rest = node_id.partition(":")
rest = rest.strip()
if prefix not in {"cl", "tag", "pa"} or not rest:
return CorpusGraph(nodes=[], edges=[])
async with pool.acquire() as conn:
# Seed the precedent id set + remaining BFS levels.
if prefix == "cl":
try:
seed_uuid = UUID(rest)
except ValueError:
return CorpusGraph(nodes=[], edges=[])
current: set = {seed_uuid}
levels_left = depth
# The seed hub types are whatever the caller asked for.
forced_types = types
elif prefix == "tag":
rows = await conn.fetch(
"""
SELECT c.id
FROM case_law c, jsonb_array_elements_text(c.subject_tags) AS t(tag)
WHERE btrim(t.tag) = $1
LIMIT $2
""",
rest,
NODE_CAP_MAX,
)
current = {r["id"] for r in rows}
levels_left = depth - 1
forced_types = types | {"topic"} # ensure the focused hub renders
else: # pa
rows = await conn.fetch(
"SELECT id FROM case_law WHERE practice_area = $1 LIMIT $2",
rest,
NODE_CAP_MAX,
)
current = {r["id"] for r in rows}
levels_left = depth - 1
forced_types = types | {"practice_area"}
if not current:
return CorpusGraph(nodes=[], edges=[])
# BFS over citation + same-chain edges (undirected for traversal).
all_ids = set(current)
frontier = set(current)
truncated = False
while levels_left > 0 and frontier:
if len(all_ids) >= NODE_CAP_MAX:
truncated = True
break
nb_rows = await conn.fetch(
"""
SELECT cited_case_law_id AS nb FROM precedent_internal_citations
WHERE cited_case_law_id IS NOT NULL AND source_case_law_id = ANY($1::uuid[])
UNION
SELECT source_case_law_id AS nb FROM precedent_internal_citations
WHERE cited_case_law_id = ANY($1::uuid[])
UNION
SELECT related_id AS nb FROM case_law_relations WHERE case_law_id = ANY($1::uuid[])
UNION
SELECT case_law_id AS nb FROM case_law_relations WHERE related_id = ANY($1::uuid[])
""",
list(frontier),
)
nbs = {r["nb"] for r in nb_rows} - all_ids
all_ids |= nbs
frontier = nbs
levels_left -= 1
ids = list(all_ids)[:NODE_CAP_MAX]
prec_rows = await conn.fetch(
_PREC_INDEG_CTE
+ """
SELECT c.id, c.case_number, c.case_name,
c.practice_area, c.source_kind, c.precedent_level,
c.court, c.date,
COALESCE(p.n, 0) AS size
FROM case_law c
LEFT JOIN prec_indeg p ON p.id = c.id
WHERE c.id = ANY($1::uuid[])
""",
ids,
)
nodes = [_precedent_node(r) for r in prec_rows]
hub_nodes, edges = await _edges_and_hubs(conn, prec_rows, forced_types)
nodes.extend(hub_nodes)
return CorpusGraph(
nodes=nodes,
edges=edges,
truncated=truncated,
total_available=len(nodes),
)
async def build_facets(pool: asyncpg.Pool) -> GraphFacets:
"""Distinct, non-empty filter values from ``case_law`` for the UI dropdowns.
Keeps the closed-vs-open-enum problem server-side so the frontend never
hardcodes Hebrew court / chair strings (a UI1 source-of-truth concern).
"""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT 'court' AS kind, court AS v FROM case_law WHERE court <> ''
UNION
SELECT 'level', precedent_level FROM case_law WHERE precedent_level <> ''
UNION
SELECT 'chair', chair_name FROM case_law WHERE chair_name <> ''
UNION
SELECT 'district', district FROM case_law WHERE district <> ''
ORDER BY 1, 2
"""
)
buckets: dict[str, list[str]] = {"court": [], "level": [], "chair": [], "district": []}
for r in rows:
buckets[r["kind"]].append(r["v"])
return GraphFacets(
courts=buckets["court"],
precedent_levels=buckets["level"],
chairs=buckets["chair"],
districts=buckets["district"],
)