22 KiB
FU-7: Audit-Trail + Provenance — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Turn audit_log into an end-to-end audit trail, attach source-provenance to generated blocks, enforce citation→corpus resolution, and flag DOCX↔blocks drift — all forward-only, no data migration.
Architecture: Reuse audit_log.log_action with a details JSONB payload (X5 §4 — no new table) via a non-fatal log_action_safe wrapper. Provenance is an append-only write_block audit event carrying the source ids that fed the generation. GAP-17 drift is a deterministic cases.blocks_stale flag (V22) set at the known divergence points + a health-check count — not a fragile DOCX→blocks reparse. GAP-20 is a structural case_law_id resolver surfaced as a QA warning.
Tech Stack: Python 3.12, asyncpg, PostgreSQL@localhost:5433, pytest offline, .venv at mcp-server/.venv.
Spec: docs/superpowers/specs/2026-05-30-fu7-audit-provenance-design.md
Run tests: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v
File Structure
- Modify
mcp-server/src/legal_mcp/services/audit.py— addlog_action_safe(...). - Modify
mcp-server/src/legal_mcp/services/db.py— V22 migration (cases.blocks_stale),mark_blocks_stale,resolve_citation_case_law_ids. - Modify
mcp-server/src/legal_mcp/tools/documents.py— audit indocument_upload,extract_claims. - Modify
mcp-server/src/legal_mcp/services/block_writer.py— collect source ids; auditwrite_block; clearblocks_staleon save. - Modify
mcp-server/src/legal_mcp/tools/drafting.py— auditexport_docx; set/clearblocks_staleinexport_docx/revise_draft/apply_user_edit. - Modify QA path (
services/qa_validator.py) — citation→corpus warning. - Modify
mcp-server/src/legal_mcp/services/metrics.py—cases_with_stale_blockscount. - Create
mcp-server/tests/test_audit_provenance.py.
Task 1: Failing tests
Files: Create mcp-server/tests/test_audit_provenance.py
- Step 1: Write the failing tests
"""FU-7: audit-trail + provenance (offline, monkeypatched I/O)."""
from __future__ import annotations
import asyncio
from uuid import uuid4
import pytest
from legal_mcp.services import audit, db
def _run(coro):
return asyncio.run(coro)
# ── GAP-18: log_action_safe is non-fatal ───────────────────────────────
def test_log_action_safe_swallows_db_error(monkeypatch):
async def _boom(*a, **k):
raise RuntimeError("db down")
monkeypatch.setattr(audit, "log_action", _boom)
# must NOT raise
_run(audit.log_action_safe("write_block", details={"x": 1}))
def test_log_action_safe_forwards_args(monkeypatch):
seen = {}
async def _capture(action, case_id=None, document_id=None, details=None, user="system"):
seen.update(action=action, details=details)
monkeypatch.setattr(audit, "log_action", _capture)
_run(audit.log_action_safe("export_docx", details={"path": "/x"}))
assert seen["action"] == "export_docx" and seen["details"] == {"path": "/x"}
# ── GAP-20: structural citation resolver ────────────────────────────────
def test_resolve_citation_case_law_ids_splits(monkeypatch):
good = uuid4()
bad = uuid4()
class _Conn:
async def fetchval(self, q, cid):
return cid == good
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
class _Pool:
def acquire(self): return _Conn()
async def _pool():
return _Pool()
monkeypatch.setattr(db, "get_pool", _pool)
out = _run(db.resolve_citation_case_law_ids([good, bad]))
assert good in out["resolved"] and bad in out["unresolved"]
# ── GAP-17: blocks_stale helper ────────────────────────────────────────
def test_mark_blocks_stale_executes_update(monkeypatch):
seen = {}
class _Conn:
async def execute(self, q, *a):
seen["q"] = q; seen["args"] = a
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
class _Pool:
def acquire(self): return _Conn()
async def _pool(): return _Pool()
monkeypatch.setattr(db, "get_pool", _pool)
cid = uuid4()
_run(db.mark_blocks_stale(cid, True))
assert "blocks_stale" in seen["q"] and seen["args"][0] is True and seen["args"][1] == cid
- Step 2: Run to verify failure
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v
Expected: FAIL — AttributeError: ... has no attribute 'log_action_safe' / resolve_citation_case_law_ids / mark_blocks_stale.
- Step 3: Commit
cd ~/legal-ai
git add mcp-server/tests/test_audit_provenance.py
git commit -m "test(audit): failing tests for audit-trail + provenance (FU-7)"
Task 2: V22 migration + core helpers
Files: Modify mcp-server/src/legal_mcp/services/audit.py, mcp-server/src/legal_mcp/services/db.py
- Step 1: Add
log_action_safeto audit.py (afterlog_action)
async def log_action_safe(
action: str,
case_id: "UUID | None" = None,
document_id: "UUID | None" = None,
details: dict | None = None,
user: str = "system",
) -> None:
"""Non-fatal audit: never let an audit-log failure break the caller's action.
The authoritative integrity trail is git (X5 §2.1); audit_log is the
'who/what/when' observability layer, so a write failure is logged as a
warning and swallowed.
"""
try:
await log_action(action, case_id=case_id, document_id=document_id,
details=details, user=user)
except Exception as e: # noqa: BLE001 — observability must not break the op
logger.warning("audit log_action failed (non-fatal) for %s: %s", action, e)
- Step 2: Add
SCHEMA_V22_SQLafterSCHEMA_V21_SQLin db.py + wire it
READ db.py near SCHEMA_V21_SQL (~line 1097-1133). Add after the V21 block:
# ── V22: cases.blocks_stale — DOCX↔blocks drift flag (GAP-17 / INV-EX1) ──
# Set true when revise_draft/apply_user_edit make active_draft_path the live
# source-of-truth without re-syncing decision_blocks; cleared when blocks are
# re-exported or re-saved. Surfaced by health-check. Source-of-truth remains
# decision_blocks — this only flags known drift (no fragile DOCX→blocks reparse).
SCHEMA_V22_SQL = """
ALTER TABLE cases ADD COLUMN IF NOT EXISTS blocks_stale boolean NOT NULL DEFAULT false;
"""
After await conn.execute(SCHEMA_V21_SQL) add await conn.execute(SCHEMA_V22_SQL) and bump the log line to v1-v22.
- Step 3: Add
mark_blocks_stale+resolve_citation_case_law_idsto db.py (near the case helpers, afterget_active_draft_path)
async def mark_blocks_stale(case_id: UUID, stale: bool) -> None:
"""Flag/clear DOCX↔blocks drift for a case (GAP-17)."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE cases SET blocks_stale = $1, updated_at = now() WHERE id = $2",
stale, case_id,
)
async def resolve_citation_case_law_ids(ids) -> dict:
"""Structural citation→corpus resolution (GAP-20 / INV-AUD3).
Given case_law_id values referenced by a decision's citations/provenance,
split into resolvable (exist in case_law) vs unresolvable.
"""
resolved, unresolved = [], []
pool = await get_pool()
async with pool.acquire() as conn:
for cid in ids:
try:
exists = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM case_law WHERE id = $1)", cid)
except Exception:
exists = False
(resolved if exists else unresolved).append(cid)
return {"resolved": resolved, "unresolved": unresolved}
- Step 4: Run Task-1 tests for these helpers
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v
Expected: all 4 tests PASS.
- Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/audit.py mcp-server/src/legal_mcp/services/db.py
git commit -m "feat(audit): log_action_safe + V22 blocks_stale + citation resolver (FU-7)"
Task 3: GAP-18 — audit calls on upload / extract_claims / export
Files: Modify mcp-server/src/legal_mcp/tools/documents.py, mcp-server/src/legal_mcp/tools/drafting.py
- Step 1:
document_upload— audit after processing (documents.py)
READ document_upload (lines ~14-94). It computes case_id, doc (with doc["id"]), actual_doc_type, and result (with result["classification"]). Ensure from legal_mcp.services import audit is imported (add if missing). Immediately BEFORE the final return json.dumps({...}), add:
await audit.log_action_safe(
"document_upload", case_id=case_id, document_id=UUID(doc["id"]),
details={"title": title, "doc_type": actual_doc_type},
)
- Step 2:
extract_claims— audit before return (documents.py)
In extract_claims (lines ~300-348), before the final return json.dumps(results, ...), add:
await audit.log_action_safe(
"extract_claims", case_id=case_id,
details={"docs_processed": len(docs), "results": len(results)},
)
- Step 3:
export_docx— audit after export (drafting.py)
READ export_docx in drafting.py (around lines 384-439). It resolves case_id, builds path, and calls db.set_active_draft_path(case_id, path). Ensure audit is imported. After the set_active_draft_path call, add:
await audit.log_action_safe(
"export_docx", case_id=case_id,
details={"path": str(path)},
)
- Step 4: Verify imports
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.tools import documents, drafting; print('clean')"
Expected: clean.
- Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/tools/documents.py mcp-server/src/legal_mcp/tools/drafting.py
git commit -m "feat(audit): log document_upload/extract_claims/export_docx (GAP-18, FU-7)"
Task 4: GAP-19 — block→source provenance
Files: Modify mcp-server/src/legal_mcp/services/block_writer.py
- Step 1: Make
_build_precedents_contextalso return the case_law ids it used
READ _build_precedents_context (lines ~671-716). Change the caselaw_rows SELECT to also fetch cl.id:
replace """SELECT cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote, with
"""SELECT cl.id, cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote,.
Collect ids and change the function to return a tuple. At the function's two return points:
- replace
return "\n\n".join(parts) if parts else "(אין תקדימים)"withreturn ("\n\n".join(parts) if parts else "(אין תקדימים)"), case_law_ids - ensure
case_law_ids = []is initialized at the top, and inside the caselaw loop appendr["id"](str(r["id"])).
If there is an early/exception return path that returns a bare string, make it return ("(אין תקדימים)", []) too.
- Step 2: Update the caller in
write_block+ collect document/claim ids
READ write_block (lines ~280-394). Line ~321 currently:
precedents_context = await _build_precedents_context(case_id, block_id)
Change to:
precedents_context, _precedent_case_law_ids = await _build_precedents_context(case_id, block_id)
Add a helper _collect_block_sources (after _build_result, ~line 408):
async def _collect_block_sources(case_id: UUID, block_id: str) -> dict:
"""Deterministic source ids available to a block's generation (GAP-19).
document_ids: case documents matching the block's allowed doc-types.
claim_ids: extracted claims for the case. (case_law_ids are captured
separately from the precedent search inside write_block.)
"""
allowed = _BLOCK_DOC_TYPES.get(block_id, [])
docs = await db.list_documents(case_id)
if allowed:
docs = [d for d in docs if d.get("doc_type") in allowed]
claims = await db.get_claims(case_id)
return {
"document_ids": [str(d["id"]) for d in docs],
"claim_ids": [str(c["id"]) for c in claims],
}
In write_block, just before the final return _build_result(block_id, content, block_cfg) (the non-template path, ~line 394), build the sources and attach to the result:
sources = await _collect_block_sources(case_id, block_id)
sources["case_law_ids"] = _precedent_case_law_ids
result = _build_result(block_id, content, block_cfg)
result["sources"] = sources
return result
(For the template path return at ~line 308, attach an empty sources dict: r = _build_result(...); r["sources"] = {"document_ids": [], "claim_ids": [], "case_law_ids": []}; return r.)
- Step 3: Write the provenance audit in
write_and_store_blockandsave_block_content
In write_and_store_block (~line 1039), after await store_block(UUID(decision["id"]), result), add:
await audit.log_action_safe(
"write_block", case_id=case_id,
details={
"decision_id": str(decision["id"]),
"block_id": block_id,
"model_used": result.get("model_used"),
"generation_type": result.get("generation_type"),
"sources": result.get("sources", {}),
},
)
await db.mark_blocks_stale(case_id, False)
In save_block_content (~line 905), after await store_block(...) add the same mark_blocks_stale(case_id, False) (a saved block means DB blocks are current). Ensure from legal_mcp.services import audit is imported in block_writer.py (add if missing).
- Step 4: Smoke-import + targeted check
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import block_writer; print('clean')"
Expected: clean.
- Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/block_writer.py
git commit -m "feat(audit): block→source provenance via write_block audit event (GAP-19, FU-7)"
Task 5: GAP-20 — citation→corpus validation as QA warning
Files: Modify mcp-server/src/legal_mcp/services/qa_validator.py
- Step 1: Read the QA validator structure
READ mcp-server/src/legal_mcp/services/qa_validator.py — find the function that runs the QA checks and returns findings (look for the list of checks / findings dicts with severity like warning/critical). Identify the findings structure (keys, how a check is appended).
- Step 2: Add a citation-resolution check
Add a check that gathers case_law_ids referenced by the decision's provenance/citations and resolves them. Concretely, add a function in qa_validator.py:
async def _check_citation_resolution(case_id, decision_id) -> list[dict]:
"""GAP-20/INV-AUD3: every cited case_law_id must resolve to the corpus.
Reads case_law_ids from the decision's write_block audit provenance
(audit_log details.sources.case_law_ids) and verifies each resolves.
Unresolvable ids → non-blocking warning + audit('citation_unresolved').
"""
from legal_mcp.services import db, audit
from uuid import UUID
rows = await audit.get_audit_log(case_id=case_id, action="write_block", limit=200)
ids = set()
for r in rows:
details = r.get("details") or {}
if isinstance(details, str):
import json as _json
try: details = _json.loads(details)
except (ValueError, TypeError): details = {}
for raw in (details.get("sources") or {}).get("case_law_ids", []):
try: ids.add(UUID(str(raw)))
except (ValueError, TypeError): pass
if not ids:
return []
res = await db.resolve_citation_case_law_ids(list(ids))
findings = []
if res["unresolved"]:
await audit.log_action_safe(
"citation_unresolved", case_id=case_id,
details={"unresolved": [str(x) for x in res["unresolved"]]},
)
findings.append({
"check": "citation_resolution",
"severity": "warning",
"passed": False,
"message": f"{len(res['unresolved'])} ציטוטים אינם פתירים לקורפוס — דורש אימות יו\"ר",
})
return findings
Then wire _check_citation_resolution into the validator's main run function so its findings are appended to the result list (match the existing findings shape — adjust the dict keys to the validator's actual schema discovered in Step 1). It must be a warning, never a critical gate (does not block export).
- Step 3: Smoke-import
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import qa_validator; print('clean')"
Expected: clean.
- Step 4: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/qa_validator.py
git commit -m "feat(qa): citation→corpus resolution as non-blocking warning (GAP-20, FU-7)"
Task 6: GAP-17 — blocks_stale wiring + health-check
Files: Modify mcp-server/src/legal_mcp/tools/drafting.py, mcp-server/src/legal_mcp/services/metrics.py
- Step 1: Set
blocks_stale=trueinrevise_draftandapply_user_edit
READ revise_draft (~647-733) and apply_user_edit (~569-613) in drafting.py. Each ends by calling db.set_active_draft_path(case_id, ...). Immediately after that call in EACH function, add:
await db.mark_blocks_stale(case_id, True)
- Step 2: Clear
blocks_stale=falseinexport_docx
In export_docx (after the set_active_draft_path + the audit added in Task 3), add:
await db.mark_blocks_stale(case_id, False)
(export_docx renders FROM the blocks, so the DOCX matches blocks → not stale.)
- Step 3: Health-check count in metrics.py
READ mcp-server/src/legal_mcp/services/metrics.py — find the aggregation that already runs counts (the one FU-2a added non_searchable_case_law to). Add a sibling count:
cases_with_stale_blocks = await conn.fetchval(
"SELECT COUNT(*) FROM cases WHERE blocks_stale")
and expose it in the returned summary dict as "cases_with_stale_blocks": cases_with_stale_blocks.
- Step 4: Smoke-import
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.tools import drafting; from legal_mcp.services import metrics; print('clean')"
Expected: clean.
- Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/tools/drafting.py mcp-server/src/legal_mcp/services/metrics.py
git commit -m "feat(audit): blocks_stale drift flag + health-check visibility (GAP-17, FU-7)"
Task 7: Full suite + DB smoke + lint + TaskMaster
- Step 1: Full offline suite
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q
Expected: all pass (FU-1/2a + new FU-7 tests). Report the summary line. If a pre-existing test fails because a newly-audited function now calls audit/mark_blocks_stale without a stub, fix that test's fixture to stub the new boundary (same pattern as the FU-2a recompute_searchable fixture fix).
- Step 2: DB smoke (real Postgres — applies V22, exercises helpers)
cd ~/legal-ai && set -a && source ~/.env 2>/dev/null && set +a
cd mcp-server && .venv/bin/python -c "
import asyncio, uuid
from legal_mcp.services import db, audit
async def main():
await db.get_pool() # applies V22
pool = await db.get_pool()
async with pool.acquire() as c:
col = await c.fetchval(\"SELECT 1 FROM information_schema.columns WHERE table_name='cases' AND column_name='blocks_stale'\")
print('V22 blocks_stale present:', bool(col))
# citation resolver: random id is unresolved
out = await db.resolve_citation_case_law_ids([uuid.uuid4()])
print('resolver unresolved count:', len(out['unresolved']))
# log_action_safe never raises
await audit.log_action_safe('fu7_smoke', details={'ok': True})
print('log_action_safe ok')
asyncio.run(main())
" 2>&1 | grep -vE 'INFO|WARNING|httpx|deprecat|command not found|\^\^\^' | tail -5
Expected: V22 blocks_stale present: True, resolver unresolved count: 1, log_action_safe ok. (Optionally clean the smoke row: DELETE FROM audit_log WHERE action='fu7_smoke'.)
- Step 3: Lint
Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/audit.py src/legal_mcp/services/db.py src/legal_mcp/services/block_writer.py 2>/dev/null; echo "exit=$?"
Expected: clean or "ruff not available".
- Step 4: Mark TaskMaster #65 done — controller edits
.taskmaster/tasks/tasks.json+ verifies via MCP get_task.
Self-Review Notes
- GAP-18 → Task 3 (+ write_block audit in Task 4). GAP-19 → Task 4 (provenance event). GAP-20 → Task 5 (resolver + QA warning). GAP-17 → Tasks 2+6 (V22 flag + wiring + health).
- No new table (audit_log reused, X5 §4). No data migration (V22 additive; provenance forward-only).
- Non-fatal audit: all calls via
log_action_safe. GAP-20 is warning-only (never a critical gate — doesn't block export, consistent with FU-6 gates). - Type consistency:
log_action_safe,mark_blocks_stale(case_id, stale),resolve_citation_case_law_ids(ids)->{resolved,unresolved},result["sources"]={document_ids,claim_ids,case_law_ids}— names identical across tasks + tests. - Offline-test limit: real audit_log INSERT / V22 verified by Task 7 Step 2 smoke; offline tests cover the pure wrappers/resolver logic.