# FU-7: Audit-Trail + Provenance — Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Turn `audit_log` into an end-to-end audit trail, attach source-provenance to generated blocks, enforce citation→corpus resolution, and flag DOCX↔blocks drift — all forward-only, no data migration. **Architecture:** Reuse `audit_log.log_action` with a `details` JSONB payload (X5 §4 — no new table) via a non-fatal `log_action_safe` wrapper. Provenance is an append-only `write_block` audit event carrying the source ids that fed the generation. GAP-17 drift is a deterministic `cases.blocks_stale` flag (V22) set at the known divergence points + a health-check count — not a fragile DOCX→blocks reparse. GAP-20 is a structural `case_law_id` resolver surfaced as a QA warning. **Tech Stack:** Python 3.12, asyncpg, PostgreSQL@localhost:5433, pytest offline, `.venv` at `mcp-server/.venv`. **Spec:** [docs/superpowers/specs/2026-05-30-fu7-audit-provenance-design.md](../specs/2026-05-30-fu7-audit-provenance-design.md) **Run tests:** `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v` --- ## File Structure - **Modify** `mcp-server/src/legal_mcp/services/audit.py` — add `log_action_safe(...)`. - **Modify** `mcp-server/src/legal_mcp/services/db.py` — V22 migration (`cases.blocks_stale`), `mark_blocks_stale`, `resolve_citation_case_law_ids`. - **Modify** `mcp-server/src/legal_mcp/tools/documents.py` — audit in `document_upload`, `extract_claims`. - **Modify** `mcp-server/src/legal_mcp/services/block_writer.py` — collect source ids; audit `write_block`; clear `blocks_stale` on save. - **Modify** `mcp-server/src/legal_mcp/tools/drafting.py` — audit `export_docx`; set/clear `blocks_stale` in `export_docx`/`revise_draft`/`apply_user_edit`. - **Modify** QA path (`services/qa_validator.py`) — citation→corpus warning. - **Modify** `mcp-server/src/legal_mcp/services/metrics.py` — `cases_with_stale_blocks` count. - **Create** `mcp-server/tests/test_audit_provenance.py`. --- ## Task 1: Failing tests **Files:** Create `mcp-server/tests/test_audit_provenance.py` - [ ] **Step 1: Write the failing tests** ```python """FU-7: audit-trail + provenance (offline, monkeypatched I/O).""" from __future__ import annotations import asyncio from uuid import uuid4 import pytest from legal_mcp.services import audit, db def _run(coro): return asyncio.run(coro) # ── GAP-18: log_action_safe is non-fatal ─────────────────────────────── def test_log_action_safe_swallows_db_error(monkeypatch): async def _boom(*a, **k): raise RuntimeError("db down") monkeypatch.setattr(audit, "log_action", _boom) # must NOT raise _run(audit.log_action_safe("write_block", details={"x": 1})) def test_log_action_safe_forwards_args(monkeypatch): seen = {} async def _capture(action, case_id=None, document_id=None, details=None, user="system"): seen.update(action=action, details=details) monkeypatch.setattr(audit, "log_action", _capture) _run(audit.log_action_safe("export_docx", details={"path": "/x"})) assert seen["action"] == "export_docx" and seen["details"] == {"path": "/x"} # ── GAP-20: structural citation resolver ──────────────────────────────── def test_resolve_citation_case_law_ids_splits(monkeypatch): good = uuid4() bad = uuid4() class _Conn: async def fetchval(self, q, cid): return cid == good async def __aenter__(self): return self async def __aexit__(self, *a): return False class _Pool: def acquire(self): return _Conn() async def _pool(): return _Pool() monkeypatch.setattr(db, "get_pool", _pool) out = _run(db.resolve_citation_case_law_ids([good, bad])) assert good in out["resolved"] and bad in out["unresolved"] # ── GAP-17: blocks_stale helper ──────────────────────────────────────── def test_mark_blocks_stale_executes_update(monkeypatch): seen = {} class _Conn: async def execute(self, q, *a): seen["q"] = q; seen["args"] = a async def __aenter__(self): return self async def __aexit__(self, *a): return False class _Pool: def acquire(self): return _Conn() async def _pool(): return _Pool() monkeypatch.setattr(db, "get_pool", _pool) cid = uuid4() _run(db.mark_blocks_stale(cid, True)) assert "blocks_stale" in seen["q"] and seen["args"][0] is True and seen["args"][1] == cid ``` - [ ] **Step 2: Run to verify failure** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v` Expected: FAIL — `AttributeError: ... has no attribute 'log_action_safe'` / `resolve_citation_case_law_ids` / `mark_blocks_stale`. - [ ] **Step 3: Commit** ```bash cd ~/legal-ai git add mcp-server/tests/test_audit_provenance.py git commit -m "test(audit): failing tests for audit-trail + provenance (FU-7)" ``` --- ## Task 2: V22 migration + core helpers **Files:** Modify `mcp-server/src/legal_mcp/services/audit.py`, `mcp-server/src/legal_mcp/services/db.py` - [ ] **Step 1: Add `log_action_safe` to audit.py (after `log_action`)** ```python async def log_action_safe( action: str, case_id: "UUID | None" = None, document_id: "UUID | None" = None, details: dict | None = None, user: str = "system", ) -> None: """Non-fatal audit: never let an audit-log failure break the caller's action. The authoritative integrity trail is git (X5 §2.1); audit_log is the 'who/what/when' observability layer, so a write failure is logged as a warning and swallowed. """ try: await log_action(action, case_id=case_id, document_id=document_id, details=details, user=user) except Exception as e: # noqa: BLE001 — observability must not break the op logger.warning("audit log_action failed (non-fatal) for %s: %s", action, e) ``` - [ ] **Step 2: Add `SCHEMA_V22_SQL` after `SCHEMA_V21_SQL` in db.py + wire it** READ db.py near `SCHEMA_V21_SQL` (~line 1097-1133). Add after the V21 block: ```python # ── V22: cases.blocks_stale — DOCX↔blocks drift flag (GAP-17 / INV-EX1) ── # Set true when revise_draft/apply_user_edit make active_draft_path the live # source-of-truth without re-syncing decision_blocks; cleared when blocks are # re-exported or re-saved. Surfaced by health-check. Source-of-truth remains # decision_blocks — this only flags known drift (no fragile DOCX→blocks reparse). SCHEMA_V22_SQL = """ ALTER TABLE cases ADD COLUMN IF NOT EXISTS blocks_stale boolean NOT NULL DEFAULT false; """ ``` After `await conn.execute(SCHEMA_V21_SQL)` add `await conn.execute(SCHEMA_V22_SQL)` and bump the log line to `v1-v22`. - [ ] **Step 3: Add `mark_blocks_stale` + `resolve_citation_case_law_ids` to db.py (near the case helpers, after `get_active_draft_path`)** ```python async def mark_blocks_stale(case_id: UUID, stale: bool) -> None: """Flag/clear DOCX↔blocks drift for a case (GAP-17).""" pool = await get_pool() async with pool.acquire() as conn: await conn.execute( "UPDATE cases SET blocks_stale = $1, updated_at = now() WHERE id = $2", stale, case_id, ) async def resolve_citation_case_law_ids(ids) -> dict: """Structural citation→corpus resolution (GAP-20 / INV-AUD3). Given case_law_id values referenced by a decision's citations/provenance, split into resolvable (exist in case_law) vs unresolvable. """ resolved, unresolved = [], [] pool = await get_pool() async with pool.acquire() as conn: for cid in ids: try: exists = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM case_law WHERE id = $1)", cid) except Exception: exists = False (resolved if exists else unresolved).append(cid) return {"resolved": resolved, "unresolved": unresolved} ``` - [ ] **Step 4: Run Task-1 tests for these helpers** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v` Expected: all 4 tests PASS. - [ ] **Step 5: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/services/audit.py mcp-server/src/legal_mcp/services/db.py git commit -m "feat(audit): log_action_safe + V22 blocks_stale + citation resolver (FU-7)" ``` --- ## Task 3: GAP-18 — audit calls on upload / extract_claims / export **Files:** Modify `mcp-server/src/legal_mcp/tools/documents.py`, `mcp-server/src/legal_mcp/tools/drafting.py` - [ ] **Step 1: `document_upload` — audit after processing (documents.py)** READ `document_upload` (lines ~14-94). It computes `case_id`, `doc` (with `doc["id"]`), `actual_doc_type`, and `result` (with `result["classification"]`). Ensure `from legal_mcp.services import audit` is imported (add if missing). Immediately BEFORE the final `return json.dumps({...})`, add: ```python await audit.log_action_safe( "document_upload", case_id=case_id, document_id=UUID(doc["id"]), details={"title": title, "doc_type": actual_doc_type}, ) ``` - [ ] **Step 2: `extract_claims` — audit before return (documents.py)** In `extract_claims` (lines ~300-348), before the final `return json.dumps(results, ...)`, add: ```python await audit.log_action_safe( "extract_claims", case_id=case_id, details={"docs_processed": len(docs), "results": len(results)}, ) ``` - [ ] **Step 3: `export_docx` — audit after export (drafting.py)** READ `export_docx` in `drafting.py` (around lines 384-439). It resolves `case_id`, builds `path`, and calls `db.set_active_draft_path(case_id, path)`. Ensure `audit` is imported. After the `set_active_draft_path` call, add: ```python await audit.log_action_safe( "export_docx", case_id=case_id, details={"path": str(path)}, ) ``` - [ ] **Step 4: Verify imports** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.tools import documents, drafting; print('clean')"` Expected: `clean`. - [ ] **Step 5: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/tools/documents.py mcp-server/src/legal_mcp/tools/drafting.py git commit -m "feat(audit): log document_upload/extract_claims/export_docx (GAP-18, FU-7)" ``` --- ## Task 4: GAP-19 — block→source provenance **Files:** Modify `mcp-server/src/legal_mcp/services/block_writer.py` - [ ] **Step 1: Make `_build_precedents_context` also return the case_law ids it used** READ `_build_precedents_context` (lines ~671-716). Change the `caselaw_rows` SELECT to also fetch `cl.id`: replace `"""SELECT cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote,` with `"""SELECT cl.id, cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote,`. Collect ids and change the function to return a tuple. At the function's two `return` points: - replace `return "\n\n".join(parts) if parts else "(אין תקדימים)"` with `return ("\n\n".join(parts) if parts else "(אין תקדימים)"), case_law_ids` - ensure `case_law_ids = []` is initialized at the top, and inside the caselaw loop append `r["id"]` (str(r["id"])). If there is an early/exception return path that returns a bare string, make it return `("(אין תקדימים)", [])` too. - [ ] **Step 2: Update the caller in `write_block` + collect document/claim ids** READ `write_block` (lines ~280-394). Line ~321 currently: `precedents_context = await _build_precedents_context(case_id, block_id)` Change to: `precedents_context, _precedent_case_law_ids = await _build_precedents_context(case_id, block_id)` Add a helper `_collect_block_sources` (after `_build_result`, ~line 408): ```python async def _collect_block_sources(case_id: UUID, block_id: str) -> dict: """Deterministic source ids available to a block's generation (GAP-19). document_ids: case documents matching the block's allowed doc-types. claim_ids: extracted claims for the case. (case_law_ids are captured separately from the precedent search inside write_block.) """ allowed = _BLOCK_DOC_TYPES.get(block_id, []) docs = await db.list_documents(case_id) if allowed: docs = [d for d in docs if d.get("doc_type") in allowed] claims = await db.get_claims(case_id) return { "document_ids": [str(d["id"]) for d in docs], "claim_ids": [str(c["id"]) for c in claims], } ``` In `write_block`, just before the final `return _build_result(block_id, content, block_cfg)` (the non-template path, ~line 394), build the sources and attach to the result: ```python sources = await _collect_block_sources(case_id, block_id) sources["case_law_ids"] = _precedent_case_law_ids result = _build_result(block_id, content, block_cfg) result["sources"] = sources return result ``` (For the template path return at ~line 308, attach an empty sources dict: `r = _build_result(...); r["sources"] = {"document_ids": [], "claim_ids": [], "case_law_ids": []}; return r`.) - [ ] **Step 3: Write the provenance audit in `write_and_store_block` and `save_block_content`** In `write_and_store_block` (~line 1039), after `await store_block(UUID(decision["id"]), result)`, add: ```python await audit.log_action_safe( "write_block", case_id=case_id, details={ "decision_id": str(decision["id"]), "block_id": block_id, "model_used": result.get("model_used"), "generation_type": result.get("generation_type"), "sources": result.get("sources", {}), }, ) await db.mark_blocks_stale(case_id, False) ``` In `save_block_content` (~line 905), after `await store_block(...)` add the same `mark_blocks_stale(case_id, False)` (a saved block means DB blocks are current). Ensure `from legal_mcp.services import audit` is imported in block_writer.py (add if missing). - [ ] **Step 4: Smoke-import + targeted check** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import block_writer; print('clean')"` Expected: `clean`. - [ ] **Step 5: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/services/block_writer.py git commit -m "feat(audit): block→source provenance via write_block audit event (GAP-19, FU-7)" ``` --- ## Task 5: GAP-20 — citation→corpus validation as QA warning **Files:** Modify `mcp-server/src/legal_mcp/services/qa_validator.py` - [ ] **Step 1: Read the QA validator structure** READ `mcp-server/src/legal_mcp/services/qa_validator.py` — find the function that runs the QA checks and returns findings (look for the list of checks / findings dicts with severity like `warning`/`critical`). Identify the findings structure (keys, how a check is appended). - [ ] **Step 2: Add a citation-resolution check** Add a check that gathers `case_law_id`s referenced by the decision's provenance/citations and resolves them. Concretely, add a function in qa_validator.py: ```python async def _check_citation_resolution(case_id, decision_id) -> list[dict]: """GAP-20/INV-AUD3: every cited case_law_id must resolve to the corpus. Reads case_law_ids from the decision's write_block audit provenance (audit_log details.sources.case_law_ids) and verifies each resolves. Unresolvable ids → non-blocking warning + audit('citation_unresolved'). """ from legal_mcp.services import db, audit from uuid import UUID rows = await audit.get_audit_log(case_id=case_id, action="write_block", limit=200) ids = set() for r in rows: details = r.get("details") or {} if isinstance(details, str): import json as _json try: details = _json.loads(details) except (ValueError, TypeError): details = {} for raw in (details.get("sources") or {}).get("case_law_ids", []): try: ids.add(UUID(str(raw))) except (ValueError, TypeError): pass if not ids: return [] res = await db.resolve_citation_case_law_ids(list(ids)) findings = [] if res["unresolved"]: await audit.log_action_safe( "citation_unresolved", case_id=case_id, details={"unresolved": [str(x) for x in res["unresolved"]]}, ) findings.append({ "check": "citation_resolution", "severity": "warning", "passed": False, "message": f"{len(res['unresolved'])} ציטוטים אינם פתירים לקורפוס — דורש אימות יו\"ר", }) return findings ``` Then wire `_check_citation_resolution` into the validator's main run function so its findings are appended to the result list (match the existing findings shape — adjust the dict keys to the validator's actual schema discovered in Step 1). It must be a **warning**, never a critical gate (does not block export). - [ ] **Step 3: Smoke-import** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import qa_validator; print('clean')"` Expected: `clean`. - [ ] **Step 4: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/services/qa_validator.py git commit -m "feat(qa): citation→corpus resolution as non-blocking warning (GAP-20, FU-7)" ``` --- ## Task 6: GAP-17 — blocks_stale wiring + health-check **Files:** Modify `mcp-server/src/legal_mcp/tools/drafting.py`, `mcp-server/src/legal_mcp/services/metrics.py` - [ ] **Step 1: Set `blocks_stale=true` in `revise_draft` and `apply_user_edit`** READ `revise_draft` (~647-733) and `apply_user_edit` (~569-613) in drafting.py. Each ends by calling `db.set_active_draft_path(case_id, ...)`. Immediately after that call in EACH function, add: ```python await db.mark_blocks_stale(case_id, True) ``` - [ ] **Step 2: Clear `blocks_stale=false` in `export_docx`** In `export_docx` (after the `set_active_draft_path` + the audit added in Task 3), add: ```python await db.mark_blocks_stale(case_id, False) ``` (export_docx renders FROM the blocks, so the DOCX matches blocks → not stale.) - [ ] **Step 3: Health-check count in metrics.py** READ `mcp-server/src/legal_mcp/services/metrics.py` — find the aggregation that already runs counts (the one FU-2a added `non_searchable_case_law` to). Add a sibling count: ```python cases_with_stale_blocks = await conn.fetchval( "SELECT COUNT(*) FROM cases WHERE blocks_stale") ``` and expose it in the returned summary dict as `"cases_with_stale_blocks": cases_with_stale_blocks`. - [ ] **Step 4: Smoke-import** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.tools import drafting; from legal_mcp.services import metrics; print('clean')"` Expected: `clean`. - [ ] **Step 5: Commit** ```bash cd ~/legal-ai git add mcp-server/src/legal_mcp/tools/drafting.py mcp-server/src/legal_mcp/services/metrics.py git commit -m "feat(audit): blocks_stale drift flag + health-check visibility (GAP-17, FU-7)" ``` --- ## Task 7: Full suite + DB smoke + lint + TaskMaster - [ ] **Step 1: Full offline suite** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q` Expected: all pass (FU-1/2a + new FU-7 tests). Report the summary line. If a pre-existing test fails because a newly-audited function now calls `audit`/`mark_blocks_stale` without a stub, fix that test's fixture to stub the new boundary (same pattern as the FU-2a `recompute_searchable` fixture fix). - [ ] **Step 2: DB smoke (real Postgres — applies V22, exercises helpers)** ```bash cd ~/legal-ai && set -a && source ~/.env 2>/dev/null && set +a cd mcp-server && .venv/bin/python -c " import asyncio, uuid from legal_mcp.services import db, audit async def main(): await db.get_pool() # applies V22 pool = await db.get_pool() async with pool.acquire() as c: col = await c.fetchval(\"SELECT 1 FROM information_schema.columns WHERE table_name='cases' AND column_name='blocks_stale'\") print('V22 blocks_stale present:', bool(col)) # citation resolver: random id is unresolved out = await db.resolve_citation_case_law_ids([uuid.uuid4()]) print('resolver unresolved count:', len(out['unresolved'])) # log_action_safe never raises await audit.log_action_safe('fu7_smoke', details={'ok': True}) print('log_action_safe ok') asyncio.run(main()) " 2>&1 | grep -vE 'INFO|WARNING|httpx|deprecat|command not found|\^\^\^' | tail -5 ``` Expected: `V22 blocks_stale present: True`, `resolver unresolved count: 1`, `log_action_safe ok`. (Optionally clean the smoke row: `DELETE FROM audit_log WHERE action='fu7_smoke'`.) - [ ] **Step 3: Lint** Run: `cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/audit.py src/legal_mcp/services/db.py src/legal_mcp/services/block_writer.py 2>/dev/null; echo "exit=$?"` Expected: clean or "ruff not available". - [ ] **Step 4: Mark TaskMaster #65 done** — controller edits `.taskmaster/tasks/tasks.json` + verifies via MCP get_task. --- ## Self-Review Notes - **GAP-18** → Task 3 (+ write_block audit in Task 4). **GAP-19** → Task 4 (provenance event). **GAP-20** → Task 5 (resolver + QA warning). **GAP-17** → Tasks 2+6 (V22 flag + wiring + health). - **No new table** (audit_log reused, X5 §4). **No data migration** (V22 additive; provenance forward-only). - **Non-fatal audit:** all calls via `log_action_safe`. **GAP-20 is warning-only** (never a critical gate — doesn't block export, consistent with FU-6 gates). - **Type consistency:** `log_action_safe`, `mark_blocks_stale(case_id, stale)`, `resolve_citation_case_law_ids(ids)->{resolved,unresolved}`, `result["sources"]={document_ids,claim_ids,case_law_ids}` — names identical across tasks + tests. - **Offline-test limit:** real audit_log INSERT / V22 verified by Task 7 Step 2 smoke; offline tests cover the pure wrappers/resolver logic.