feat(qa): citation→corpus resolution as non-blocking warning (GAP-20, FU-7)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-30 21:35:24 +00:00
parent 769f5020eb
commit 7e2f4b2872

View File

@@ -287,6 +287,50 @@ def check_sequential_numbering(blocks: list[dict]) -> dict:
}
async def check_citation_resolution(case_id: UUID, decision_id=None) -> dict:
"""GAP-20/INV-AUD3: every cited case_law_id must resolve to the corpus.
Reads case_law_ids from the decision's write_block audit provenance and
verifies each resolves. Unresolvable → NON-BLOCKING warning + audit event.
"""
from legal_mcp.services import audit
rows = await audit.get_audit_log(case_id=case_id, action="write_block", limit=200)
ids = set()
for r in rows:
details = r.get("details") or {}
if isinstance(details, str):
try:
details = json.loads(details)
except (ValueError, TypeError):
details = {}
for raw in (details.get("sources") or {}).get("case_law_ids", []):
try:
ids.add(UUID(str(raw)))
except (ValueError, TypeError):
pass
if not ids:
return {"name": "citation_resolution", "passed": True, "errors": [], "severity": "warning"}
res = await db.resolve_citation_case_law_ids(list(ids))
if not res["unresolved"]:
return {"name": "citation_resolution", "passed": True, "errors": [], "severity": "warning"}
await audit.log_action_safe(
"citation_unresolved", case_id=case_id,
details={"unresolved": [str(x) for x in res["unresolved"]]},
)
return {
"name": "citation_resolution",
"passed": False,
"severity": "warning",
"errors": [
f"{len(res['unresolved'])} ציטוטים אינם פתירים לקורפוס — דורש אימות יו\"ר",
],
}
# ── Main validation ───────────────────────────────────────────────
async def validate_decision(case_id: UUID) -> dict:
@@ -334,6 +378,8 @@ async def validate_decision(case_id: UUID) -> dict:
check_no_duplication(blocks),
check_sequential_numbering(blocks),
])
# Async, non-blocking warning: citation→corpus resolution (GAP-20/INV-AUD3)
results.append(await check_citation_resolution(case_id, decision["id"]))
critical_failures = sum(1 for r in results if not r["passed"] and r["severity"] == "critical")
all_passed = all(r["passed"] for r in results)