Files
legal-ai/web/graph_metrics.py
Chaim 2fbc0cd3c2 feat(graph): centrality + cluster analytics (corpus graph PR B)
The Obsidian "Graph Analysis" equivalent — surfaces influence and structure
beyond raw citation count.

Backend (new web/graph_metrics.py — pure, dependency-free, no DB → G2):
- PageRank (power-iteration), betweenness (Brandes), community (deterministic
  label-propagation + connected-components fallback), computed in-memory over
  the precedent citation subgraph that build_corpus_graph already fetched.
  Normalized 0–1; community ints dense-ranked by size (stable colours).
- GraphNode += pagerank/betweenness/community (None unless metrics=true).
- build_corpus_graph + /api/graph/corpus gain metrics=false (default path
  unchanged). Validated on the live corpus: 147 nodes in 13ms.

Frontend:
- graph.ts: GraphNode metrics fields + metrics param.
- graph-canvas: color-by (type | practice_area | precedent_level | community |
  recency) and size-by (in-degree | pagerank | betweenness) via colorForNode /
  radiusForNode; exported palettes.
- graph-view: colorBy/sizeBy controls; metrics requested only when needed;
  global metrics overlaid onto neighborhood nodes by id (a node's PageRank
  shouldn't change when focused); a ranking panel (Tabs: המשפיעות / גשרים,
  click → focus); dynamic legend per color-by.
- graph-filter-panel: "צביעה לפי" + "גודל נקודה לפי" Selects.

web-ui build + lint pass. Invariants: G2 (metrics pure, no DB writes),
UI2 (model grows on explicit Pydantic). api:types post-deploy.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 21:04:47 +00:00

159 lines
5.4 KiB
Python

"""Graph metrics for the corpus graph — dependency-free (no networkx).
Computed in-memory over the precedent citation subgraph that ``graph_api``
already fetched (**G2**: no DB access here — pure functions over data the caller
holds). The corpus graph is tiny (≤ ``NODE_CAP_MAX`` = 1500 nodes, sparse), so
power-iteration PageRank, Brandes betweenness, and label-propagation communities
all run synchronously well under a second.
Edge model: ``cites`` is directional (authority flows citing → cited);
``same_chain`` is non-directional. PageRank uses cites-direction + same_chain
both ways; betweenness and communities treat the whole graph as undirected.
Determinism (stable colors across requests): nodes are processed in sorted
order and ties break by lowest label — no randomness.
"""
from __future__ import annotations
from collections import Counter, defaultdict, deque
def compute(
node_ids: list[str],
directed_edges: list[tuple[str, str]],
undirected_edges: list[tuple[str, str]] | None = None,
) -> dict[str, dict]:
"""Return ``{node_id: {pagerank, betweenness, community}}``.
``pagerank`` / ``betweenness`` are normalized to max = 1.0 (easy client
scaling); ``community`` is a dense int 0..k-1 ordered by descending cluster
size (so the largest cluster is always colour 0).
"""
nodes = list(node_ids)
node_set = set(nodes)
if not nodes:
return {}
undirected_edges = undirected_edges or []
de = [(s, t) for s, t in directed_edges if s in node_set and t in node_set and s != t]
ue = [(s, t) for s, t in undirected_edges if s in node_set and t in node_set and s != t]
pr = _normalize(_pagerank(nodes, de, ue))
bt = _normalize(_betweenness(nodes, de, ue))
comm = _communities(nodes, de, ue)
return {
n: {
"pagerank": round(pr[n], 4),
"betweenness": round(bt[n], 4),
"community": comm[n],
}
for n in nodes
}
def _normalize(d: dict[str, float]) -> dict[str, float]:
m = max(d.values()) if d else 0.0
if m <= 0:
return {k: 0.0 for k in d}
return {k: v / m for k, v in d.items()}
def _undirected_adj(nodes: list[str], de, ue) -> dict[str, set[str]]:
adj: dict[str, set[str]] = {n: set() for n in nodes}
for s, t in de:
adj[s].add(t)
adj[t].add(s)
for s, t in ue:
adj[s].add(t)
adj[t].add(s)
return adj
def _pagerank(nodes, de, ue, d: float = 0.85, iters: int = 100, tol: float = 1e-9):
"""Power-iteration PageRank. cites direction + same_chain both ways."""
out: dict[str, list[str]] = defaultdict(list)
for s, t in de:
out[s].append(t)
for s, t in ue:
out[s].append(t)
out[t].append(s)
n = len(nodes)
pr = {x: 1.0 / n for x in nodes}
for _ in range(iters):
dangling = sum(pr[x] for x in nodes if not out[x])
base = (1.0 - d) / n + d * dangling / n
new = {x: base for x in nodes}
for x in nodes:
deg = len(out[x])
if deg:
share = d * pr[x] / deg
for m in out[x]:
new[m] += share
if sum(abs(new[x] - pr[x]) for x in nodes) < tol:
return new
pr = new
return pr
def _betweenness(nodes, de, ue):
"""Brandes betweenness on the undirected graph. O(V·(V+E))."""
adj = _undirected_adj(nodes, de, ue)
bc = {x: 0.0 for x in nodes}
for s in nodes:
stack: list[str] = []
preds: dict[str, list[str]] = {w: [] for w in nodes}
sigma = {w: 0.0 for w in nodes}
sigma[s] = 1.0
dist = {w: -1 for w in nodes}
dist[s] = 0
queue = deque([s])
while queue:
v = queue.popleft()
stack.append(v)
for w in adj[v]:
if dist[w] < 0:
dist[w] = dist[v] + 1
queue.append(w)
if dist[w] == dist[v] + 1:
sigma[w] += sigma[v]
preds[w].append(v)
delta = {w: 0.0 for w in nodes}
while stack:
w = stack.pop()
for v in preds[w]:
if sigma[w]:
delta[v] += (sigma[v] / sigma[w]) * (1.0 + delta[w])
if w != s:
bc[w] += delta[w]
# Undirected: each shortest path counted from both endpoints.
return {x: v / 2.0 for x, v in bc.items()}
def _communities(nodes, de, ue) -> dict[str, int]:
"""Deterministic synchronous label propagation (+ dense renumbering).
Each node starts in its own community and repeatedly adopts the most common
label among its neighbours (ties → lowest label). Isolated nodes keep their
own singleton community. Labels are renumbered 0..k-1 by descending size.
"""
adj = _undirected_adj(nodes, de, ue)
order = sorted(nodes)
label = {n: n for n in nodes}
for _ in range(30):
changed = False
for n in order:
neigh = adj[n]
if not neigh:
continue
counts = Counter(label[m] for m in neigh)
best = min(counts.items(), key=lambda kv: (-kv[1], kv[0]))[0]
if label[n] != best:
label[n] = best
changed = True
if not changed:
break
sizes = Counter(label.values())
ranked = [lab for lab, _ in sorted(sizes.items(), key=lambda kv: (-kv[1], kv[0]))]
remap = {lab: i for i, lab in enumerate(ranked)}
return {n: remap[label[n]] for n in nodes}