Files
legal-ai/docs/superpowers/plans/2026-05-30-fu7-audit-provenance.md
2026-05-30 21:26:30 +00:00

22 KiB
Raw Blame History

FU-7: Audit-Trail + Provenance — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Turn audit_log into an end-to-end audit trail, attach source-provenance to generated blocks, enforce citation→corpus resolution, and flag DOCX↔blocks drift — all forward-only, no data migration.

Architecture: Reuse audit_log.log_action with a details JSONB payload (X5 §4 — no new table) via a non-fatal log_action_safe wrapper. Provenance is an append-only write_block audit event carrying the source ids that fed the generation. GAP-17 drift is a deterministic cases.blocks_stale flag (V22) set at the known divergence points + a health-check count — not a fragile DOCX→blocks reparse. GAP-20 is a structural case_law_id resolver surfaced as a QA warning.

Tech Stack: Python 3.12, asyncpg, PostgreSQL@localhost:5433, pytest offline, .venv at mcp-server/.venv.

Spec: docs/superpowers/specs/2026-05-30-fu7-audit-provenance-design.md

Run tests: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v


File Structure

  • Modify mcp-server/src/legal_mcp/services/audit.py — add log_action_safe(...).
  • Modify mcp-server/src/legal_mcp/services/db.py — V22 migration (cases.blocks_stale), mark_blocks_stale, resolve_citation_case_law_ids.
  • Modify mcp-server/src/legal_mcp/tools/documents.py — audit in document_upload, extract_claims.
  • Modify mcp-server/src/legal_mcp/services/block_writer.py — collect source ids; audit write_block; clear blocks_stale on save.
  • Modify mcp-server/src/legal_mcp/tools/drafting.py — audit export_docx; set/clear blocks_stale in export_docx/revise_draft/apply_user_edit.
  • Modify QA path (services/qa_validator.py) — citation→corpus warning.
  • Modify mcp-server/src/legal_mcp/services/metrics.pycases_with_stale_blocks count.
  • Create mcp-server/tests/test_audit_provenance.py.

Task 1: Failing tests

Files: Create mcp-server/tests/test_audit_provenance.py

  • Step 1: Write the failing tests
"""FU-7: audit-trail + provenance (offline, monkeypatched I/O)."""
from __future__ import annotations

import asyncio
from uuid import uuid4

import pytest

from legal_mcp.services import audit, db


def _run(coro):
    return asyncio.run(coro)


# ── GAP-18: log_action_safe is non-fatal ───────────────────────────────
def test_log_action_safe_swallows_db_error(monkeypatch):
    async def _boom(*a, **k):
        raise RuntimeError("db down")
    monkeypatch.setattr(audit, "log_action", _boom)
    # must NOT raise
    _run(audit.log_action_safe("write_block", details={"x": 1}))


def test_log_action_safe_forwards_args(monkeypatch):
    seen = {}
    async def _capture(action, case_id=None, document_id=None, details=None, user="system"):
        seen.update(action=action, details=details)
    monkeypatch.setattr(audit, "log_action", _capture)
    _run(audit.log_action_safe("export_docx", details={"path": "/x"}))
    assert seen["action"] == "export_docx" and seen["details"] == {"path": "/x"}


# ── GAP-20: structural citation resolver ────────────────────────────────
def test_resolve_citation_case_law_ids_splits(monkeypatch):
    good = uuid4()
    bad = uuid4()

    class _Conn:
        async def fetchval(self, q, cid):
            return cid == good
        async def __aenter__(self): return self
        async def __aexit__(self, *a): return False

    class _Pool:
        def acquire(self): return _Conn()

    async def _pool():
        return _Pool()
    monkeypatch.setattr(db, "get_pool", _pool)

    out = _run(db.resolve_citation_case_law_ids([good, bad]))
    assert good in out["resolved"] and bad in out["unresolved"]


# ── GAP-17: blocks_stale helper ────────────────────────────────────────
def test_mark_blocks_stale_executes_update(monkeypatch):
    seen = {}

    class _Conn:
        async def execute(self, q, *a):
            seen["q"] = q; seen["args"] = a
        async def __aenter__(self): return self
        async def __aexit__(self, *a): return False

    class _Pool:
        def acquire(self): return _Conn()

    async def _pool(): return _Pool()
    monkeypatch.setattr(db, "get_pool", _pool)

    cid = uuid4()
    _run(db.mark_blocks_stale(cid, True))
    assert "blocks_stale" in seen["q"] and seen["args"][0] is True and seen["args"][1] == cid
  • Step 2: Run to verify failure

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v Expected: FAIL — AttributeError: ... has no attribute 'log_action_safe' / resolve_citation_case_law_ids / mark_blocks_stale.

  • Step 3: Commit
cd ~/legal-ai
git add mcp-server/tests/test_audit_provenance.py
git commit -m "test(audit): failing tests for audit-trail + provenance (FU-7)"

Task 2: V22 migration + core helpers

Files: Modify mcp-server/src/legal_mcp/services/audit.py, mcp-server/src/legal_mcp/services/db.py

  • Step 1: Add log_action_safe to audit.py (after log_action)
async def log_action_safe(
    action: str,
    case_id: "UUID | None" = None,
    document_id: "UUID | None" = None,
    details: dict | None = None,
    user: str = "system",
) -> None:
    """Non-fatal audit: never let an audit-log failure break the caller's action.

    The authoritative integrity trail is git (X5 §2.1); audit_log is the
    'who/what/when' observability layer, so a write failure is logged as a
    warning and swallowed.
    """
    try:
        await log_action(action, case_id=case_id, document_id=document_id,
                         details=details, user=user)
    except Exception as e:  # noqa: BLE001 — observability must not break the op
        logger.warning("audit log_action failed (non-fatal) for %s: %s", action, e)
  • Step 2: Add SCHEMA_V22_SQL after SCHEMA_V21_SQL in db.py + wire it

READ db.py near SCHEMA_V21_SQL (~line 1097-1133). Add after the V21 block:

# ── V22: cases.blocks_stale — DOCX↔blocks drift flag (GAP-17 / INV-EX1) ──
# Set true when revise_draft/apply_user_edit make active_draft_path the live
# source-of-truth without re-syncing decision_blocks; cleared when blocks are
# re-exported or re-saved. Surfaced by health-check. Source-of-truth remains
# decision_blocks — this only flags known drift (no fragile DOCX→blocks reparse).
SCHEMA_V22_SQL = """
ALTER TABLE cases ADD COLUMN IF NOT EXISTS blocks_stale boolean NOT NULL DEFAULT false;
"""

After await conn.execute(SCHEMA_V21_SQL) add await conn.execute(SCHEMA_V22_SQL) and bump the log line to v1-v22.

  • Step 3: Add mark_blocks_stale + resolve_citation_case_law_ids to db.py (near the case helpers, after get_active_draft_path)
async def mark_blocks_stale(case_id: UUID, stale: bool) -> None:
    """Flag/clear DOCX↔blocks drift for a case (GAP-17)."""
    pool = await get_pool()
    async with pool.acquire() as conn:
        await conn.execute(
            "UPDATE cases SET blocks_stale = $1, updated_at = now() WHERE id = $2",
            stale, case_id,
        )


async def resolve_citation_case_law_ids(ids) -> dict:
    """Structural citation→corpus resolution (GAP-20 / INV-AUD3).

    Given case_law_id values referenced by a decision's citations/provenance,
    split into resolvable (exist in case_law) vs unresolvable.
    """
    resolved, unresolved = [], []
    pool = await get_pool()
    async with pool.acquire() as conn:
        for cid in ids:
            try:
                exists = await conn.fetchval(
                    "SELECT EXISTS(SELECT 1 FROM case_law WHERE id = $1)", cid)
            except Exception:
                exists = False
            (resolved if exists else unresolved).append(cid)
    return {"resolved": resolved, "unresolved": unresolved}
  • Step 4: Run Task-1 tests for these helpers

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/test_audit_provenance.py -v Expected: all 4 tests PASS.

  • Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/audit.py mcp-server/src/legal_mcp/services/db.py
git commit -m "feat(audit): log_action_safe + V22 blocks_stale + citation resolver (FU-7)"

Task 3: GAP-18 — audit calls on upload / extract_claims / export

Files: Modify mcp-server/src/legal_mcp/tools/documents.py, mcp-server/src/legal_mcp/tools/drafting.py

  • Step 1: document_upload — audit after processing (documents.py)

READ document_upload (lines ~14-94). It computes case_id, doc (with doc["id"]), actual_doc_type, and result (with result["classification"]). Ensure from legal_mcp.services import audit is imported (add if missing). Immediately BEFORE the final return json.dumps({...}), add:

    await audit.log_action_safe(
        "document_upload", case_id=case_id, document_id=UUID(doc["id"]),
        details={"title": title, "doc_type": actual_doc_type},
    )
  • Step 2: extract_claims — audit before return (documents.py)

In extract_claims (lines ~300-348), before the final return json.dumps(results, ...), add:

    await audit.log_action_safe(
        "extract_claims", case_id=case_id,
        details={"docs_processed": len(docs), "results": len(results)},
    )
  • Step 3: export_docx — audit after export (drafting.py)

READ export_docx in drafting.py (around lines 384-439). It resolves case_id, builds path, and calls db.set_active_draft_path(case_id, path). Ensure audit is imported. After the set_active_draft_path call, add:

    await audit.log_action_safe(
        "export_docx", case_id=case_id,
        details={"path": str(path)},
    )
  • Step 4: Verify imports

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.tools import documents, drafting; print('clean')" Expected: clean.

  • Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/tools/documents.py mcp-server/src/legal_mcp/tools/drafting.py
git commit -m "feat(audit): log document_upload/extract_claims/export_docx (GAP-18, FU-7)"

Task 4: GAP-19 — block→source provenance

Files: Modify mcp-server/src/legal_mcp/services/block_writer.py

  • Step 1: Make _build_precedents_context also return the case_law ids it used

READ _build_precedents_context (lines ~671-716). Change the caselaw_rows SELECT to also fetch cl.id: replace """SELECT cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote, with """SELECT cl.id, cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote,. Collect ids and change the function to return a tuple. At the function's two return points:

  • replace return "\n\n".join(parts) if parts else "(אין תקדימים)" with return ("\n\n".join(parts) if parts else "(אין תקדימים)"), case_law_ids
  • ensure case_law_ids = [] is initialized at the top, and inside the caselaw loop append r["id"] (str(r["id"])).

If there is an early/exception return path that returns a bare string, make it return ("(אין תקדימים)", []) too.

  • Step 2: Update the caller in write_block + collect document/claim ids

READ write_block (lines ~280-394). Line ~321 currently: precedents_context = await _build_precedents_context(case_id, block_id) Change to: precedents_context, _precedent_case_law_ids = await _build_precedents_context(case_id, block_id)

Add a helper _collect_block_sources (after _build_result, ~line 408):

async def _collect_block_sources(case_id: UUID, block_id: str) -> dict:
    """Deterministic source ids available to a block's generation (GAP-19).

    document_ids: case documents matching the block's allowed doc-types.
    claim_ids: extracted claims for the case. (case_law_ids are captured
    separately from the precedent search inside write_block.)
    """
    allowed = _BLOCK_DOC_TYPES.get(block_id, [])
    docs = await db.list_documents(case_id)
    if allowed:
        docs = [d for d in docs if d.get("doc_type") in allowed]
    claims = await db.get_claims(case_id)
    return {
        "document_ids": [str(d["id"]) for d in docs],
        "claim_ids": [str(c["id"]) for c in claims],
    }

In write_block, just before the final return _build_result(block_id, content, block_cfg) (the non-template path, ~line 394), build the sources and attach to the result:

    sources = await _collect_block_sources(case_id, block_id)
    sources["case_law_ids"] = _precedent_case_law_ids
    result = _build_result(block_id, content, block_cfg)
    result["sources"] = sources
    return result

(For the template path return at ~line 308, attach an empty sources dict: r = _build_result(...); r["sources"] = {"document_ids": [], "claim_ids": [], "case_law_ids": []}; return r.)

  • Step 3: Write the provenance audit in write_and_store_block and save_block_content

In write_and_store_block (~line 1039), after await store_block(UUID(decision["id"]), result), add:

    await audit.log_action_safe(
        "write_block", case_id=case_id,
        details={
            "decision_id": str(decision["id"]),
            "block_id": block_id,
            "model_used": result.get("model_used"),
            "generation_type": result.get("generation_type"),
            "sources": result.get("sources", {}),
        },
    )
    await db.mark_blocks_stale(case_id, False)

In save_block_content (~line 905), after await store_block(...) add the same mark_blocks_stale(case_id, False) (a saved block means DB blocks are current). Ensure from legal_mcp.services import audit is imported in block_writer.py (add if missing).

  • Step 4: Smoke-import + targeted check

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import block_writer; print('clean')" Expected: clean.

  • Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/block_writer.py
git commit -m "feat(audit): block→source provenance via write_block audit event (GAP-19, FU-7)"

Task 5: GAP-20 — citation→corpus validation as QA warning

Files: Modify mcp-server/src/legal_mcp/services/qa_validator.py

  • Step 1: Read the QA validator structure

READ mcp-server/src/legal_mcp/services/qa_validator.py — find the function that runs the QA checks and returns findings (look for the list of checks / findings dicts with severity like warning/critical). Identify the findings structure (keys, how a check is appended).

  • Step 2: Add a citation-resolution check

Add a check that gathers case_law_ids referenced by the decision's provenance/citations and resolves them. Concretely, add a function in qa_validator.py:

async def _check_citation_resolution(case_id, decision_id) -> list[dict]:
    """GAP-20/INV-AUD3: every cited case_law_id must resolve to the corpus.

    Reads case_law_ids from the decision's write_block audit provenance
    (audit_log details.sources.case_law_ids) and verifies each resolves.
    Unresolvable ids → non-blocking warning + audit('citation_unresolved').
    """
    from legal_mcp.services import db, audit
    from uuid import UUID
    rows = await audit.get_audit_log(case_id=case_id, action="write_block", limit=200)
    ids = set()
    for r in rows:
        details = r.get("details") or {}
        if isinstance(details, str):
            import json as _json
            try: details = _json.loads(details)
            except (ValueError, TypeError): details = {}
        for raw in (details.get("sources") or {}).get("case_law_ids", []):
            try: ids.add(UUID(str(raw)))
            except (ValueError, TypeError): pass
    if not ids:
        return []
    res = await db.resolve_citation_case_law_ids(list(ids))
    findings = []
    if res["unresolved"]:
        await audit.log_action_safe(
            "citation_unresolved", case_id=case_id,
            details={"unresolved": [str(x) for x in res["unresolved"]]},
        )
        findings.append({
            "check": "citation_resolution",
            "severity": "warning",
            "passed": False,
            "message": f"{len(res['unresolved'])} ציטוטים אינם פתירים לקורפוס — דורש אימות יו\"ר",
        })
    return findings

Then wire _check_citation_resolution into the validator's main run function so its findings are appended to the result list (match the existing findings shape — adjust the dict keys to the validator's actual schema discovered in Step 1). It must be a warning, never a critical gate (does not block export).

  • Step 3: Smoke-import

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.services import qa_validator; print('clean')" Expected: clean.

  • Step 4: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/services/qa_validator.py
git commit -m "feat(qa): citation→corpus resolution as non-blocking warning (GAP-20, FU-7)"

Task 6: GAP-17 — blocks_stale wiring + health-check

Files: Modify mcp-server/src/legal_mcp/tools/drafting.py, mcp-server/src/legal_mcp/services/metrics.py

  • Step 1: Set blocks_stale=true in revise_draft and apply_user_edit

READ revise_draft (~647-733) and apply_user_edit (~569-613) in drafting.py. Each ends by calling db.set_active_draft_path(case_id, ...). Immediately after that call in EACH function, add:

    await db.mark_blocks_stale(case_id, True)
  • Step 2: Clear blocks_stale=false in export_docx

In export_docx (after the set_active_draft_path + the audit added in Task 3), add:

    await db.mark_blocks_stale(case_id, False)

(export_docx renders FROM the blocks, so the DOCX matches blocks → not stale.)

  • Step 3: Health-check count in metrics.py

READ mcp-server/src/legal_mcp/services/metrics.py — find the aggregation that already runs counts (the one FU-2a added non_searchable_case_law to). Add a sibling count:

    cases_with_stale_blocks = await conn.fetchval(
        "SELECT COUNT(*) FROM cases WHERE blocks_stale")

and expose it in the returned summary dict as "cases_with_stale_blocks": cases_with_stale_blocks.

  • Step 4: Smoke-import

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -c "from legal_mcp.tools import drafting; from legal_mcp.services import metrics; print('clean')" Expected: clean.

  • Step 5: Commit
cd ~/legal-ai
git add mcp-server/src/legal_mcp/tools/drafting.py mcp-server/src/legal_mcp/services/metrics.py
git commit -m "feat(audit): blocks_stale drift flag + health-check visibility (GAP-17, FU-7)"

Task 7: Full suite + DB smoke + lint + TaskMaster

  • Step 1: Full offline suite

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m pytest tests/ -q Expected: all pass (FU-1/2a + new FU-7 tests). Report the summary line. If a pre-existing test fails because a newly-audited function now calls audit/mark_blocks_stale without a stub, fix that test's fixture to stub the new boundary (same pattern as the FU-2a recompute_searchable fixture fix).

  • Step 2: DB smoke (real Postgres — applies V22, exercises helpers)
cd ~/legal-ai && set -a && source ~/.env 2>/dev/null && set +a
cd mcp-server && .venv/bin/python -c "
import asyncio, uuid
from legal_mcp.services import db, audit
async def main():
    await db.get_pool()  # applies V22
    pool = await db.get_pool()
    async with pool.acquire() as c:
        col = await c.fetchval(\"SELECT 1 FROM information_schema.columns WHERE table_name='cases' AND column_name='blocks_stale'\")
    print('V22 blocks_stale present:', bool(col))
    # citation resolver: random id is unresolved
    out = await db.resolve_citation_case_law_ids([uuid.uuid4()])
    print('resolver unresolved count:', len(out['unresolved']))
    # log_action_safe never raises
    await audit.log_action_safe('fu7_smoke', details={'ok': True})
    print('log_action_safe ok')
asyncio.run(main())
" 2>&1 | grep -vE 'INFO|WARNING|httpx|deprecat|command not found|\^\^\^' | tail -5

Expected: V22 blocks_stale present: True, resolver unresolved count: 1, log_action_safe ok. (Optionally clean the smoke row: DELETE FROM audit_log WHERE action='fu7_smoke'.)

  • Step 3: Lint

Run: cd ~/legal-ai/mcp-server && .venv/bin/python -m ruff check src/legal_mcp/services/audit.py src/legal_mcp/services/db.py src/legal_mcp/services/block_writer.py 2>/dev/null; echo "exit=$?" Expected: clean or "ruff not available".

  • Step 4: Mark TaskMaster #65 done — controller edits .taskmaster/tasks/tasks.json + verifies via MCP get_task.

Self-Review Notes

  • GAP-18 → Task 3 (+ write_block audit in Task 4). GAP-19 → Task 4 (provenance event). GAP-20 → Task 5 (resolver + QA warning). GAP-17 → Tasks 2+6 (V22 flag + wiring + health).
  • No new table (audit_log reused, X5 §4). No data migration (V22 additive; provenance forward-only).
  • Non-fatal audit: all calls via log_action_safe. GAP-20 is warning-only (never a critical gate — doesn't block export, consistent with FU-6 gates).
  • Type consistency: log_action_safe, mark_blocks_stale(case_id, stale), resolve_citation_case_law_ids(ids)->{resolved,unresolved}, result["sources"]={document_ids,claim_ids,case_law_ids} — names identical across tasks + tests.
  • Offline-test limit: real audit_log INSERT / V22 verified by Task 7 Step 2 smoke; offline tests cover the pure wrappers/resolver logic.