feat(ci): G12 leak-guard — enforce the Agent Platform Port seam (R4, #113)
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
המאכף האוטומטי של INV-G12 (docs/spec/X15 §4). שני כללים קשיחים: 1. mcp-server/src (שכבת-האינטליגנציה) ללא סמלי-Paperclip — allowlist מנומק לפי substring ל-6 ההפניות הלגיטימיות (pm2-bridge + הערות-מקור company_id). 2. import seam — רק web/agent_platform_port.py (+ קבצי-המעטפת) מייבאים paperclip_*. מימוש קנוני אחד (scripts/leak_guard.py, stdlib-בלבד), משותף לשלושה אכיפנים (G2): • CI hard gate: .gitea/workflows/leak-guard.yaml (pull_request + push→main) • pytest: mcp-server/tests/test_platform_port_leak_guard.py (כולל self-test שמוודא שה-guard תופס הזרקה — לא ירקב) • hook בזמן-אמת: spec-guard.sh בודק את התוכן-הנכתב (new_string/content) על כתיבה ל-mcp-server/src ומזהיר על הזרקת-Paperclip (לא-deduped); תזכורת-הספ עודכנה ל-G1–G12. מחריג קבצים-נוצרים (web-ui types.ts) ומעטפת מוצהרת; הפרונט מחוץ להיקף-האינטליגנציה (ממצא R3). עודכן scripts/SCRIPTS.md. אימות: סריקה נקייה exit 0; הזרקת pc.sh ל-mcp-server → exit 1; seam-violation ב-web → exit 1; hook מזהיר על mcp-server ומזכיר-ספ על web; pytest 3 passed; bash -n + YAML תקינים. Invariants: G12 (אכיפה), G2 (מאכף יחיד לשלושה צרכנים). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
172
scripts/leak_guard.py
Normal file
172
scripts/leak_guard.py
Normal file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env python3
|
||||
"""G12 leak-guard — enforce the Agent Platform Port seam (docs/spec/X15 §4 / R4).
|
||||
|
||||
The single, canonical checker for INV-G12. Used by BOTH the interactive
|
||||
PreToolUse hook (``scripts/spec-guard.sh``, warn-only) and the CI fitness-test
|
||||
(``mcp-server/tests/test_platform_port_leak_guard.py``, hard fail) — one
|
||||
implementation, no parallel rule (G2).
|
||||
|
||||
Two HARD rules:
|
||||
|
||||
1. **Intelligence layer is platform-clean.** ``mcp-server/src`` (the MCP tools +
|
||||
decision/RAG/extraction logic) contains ZERO Paperclip-specific symbols.
|
||||
A short, explicit baseline allowlist (``_ALLOW``) covers pre-existing benign
|
||||
prose mentions (the origin of ``company_id``) and the host pm2 bridge that
|
||||
legitimately names the ``paperclip`` service — keyed by substring so it
|
||||
survives line-number shifts.
|
||||
|
||||
2. **Import seam.** Only ``web/agent_platform_port.py`` (the Port) and the
|
||||
declared shell itself (``web/paperclip_client.py`` / ``web/paperclip_api.py``)
|
||||
may import ``web.paperclip_client`` / ``web.paperclip_api``. Any other file
|
||||
in ``web/`` that imports them is a violation (R2 established the seam).
|
||||
|
||||
OUT OF SCOPE (not intelligence): the declared shell (paperclip_client/api,
|
||||
plugin-legal-ai, adapters, web-ui settings paperclip-tab / paperclip-agents,
|
||||
skills/new-company-setup), and AUTO-GENERATED files (web-ui/src/lib/api/types.ts
|
||||
mirrors the backend OpenAPI — governed by the backend, not hand-fixable).
|
||||
|
||||
Usage:
|
||||
leak_guard.py # scan the whole repo; exit 1 on any violation
|
||||
leak_guard.py <file>... # scan only the given files (the spec-guard hook)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
REPO = Path(__file__).resolve().parent.parent
|
||||
|
||||
# Paperclip-specific symbols that must never appear in the intelligence layer.
|
||||
HARD = re.compile(
|
||||
r"paperclip|Paperclip|PAPERCLIP|wakeup|heartbeat|HEARTBEAT|pc_request|"
|
||||
r"pc\.sh|X-Paperclip|agent_wakeup|heartbeat_run|ctx\.agents|issueId"
|
||||
)
|
||||
|
||||
# Intelligence layer — rule 1 applies here (zero hard terms, save the allowlist).
|
||||
PROTECTED_DIRS = ["mcp-server/src"]
|
||||
|
||||
# Baseline allowlist: (path-suffix, substring-in-line). A hard-term hit is allowed
|
||||
# only if its file ends with <path-suffix> AND the line contains <substring>.
|
||||
# Keep this list SHORT and justified — every entry is a documented exception.
|
||||
_ALLOW: list[tuple[str, str]] = [
|
||||
# Host pm2 bridge legitimately lists the 'paperclip' service (ops, not intel).
|
||||
("court_fetch_service/server.py", "pm2 status of legal-* / paperclip services"),
|
||||
("court_fetch_service/server.py", '("legal-", "paperclip")'),
|
||||
("court_fetch_service/server.py", "never paperclip or arbitrary processes"),
|
||||
# Prose comments naming the ORIGIN of a stored field — not code coupling.
|
||||
("services/db.py", "Paperclip company UUID"),
|
||||
("services/db.py", "from a Paperclip issue"),
|
||||
("services/db.py", "The Paperclip project"),
|
||||
]
|
||||
|
||||
# Import-seam — rule 2. Only these web/ files may import the Paperclip client.
|
||||
SEAM_ALLOWED = {
|
||||
"web/agent_platform_port.py", # the Port
|
||||
"web/paperclip_client.py", # the shell itself
|
||||
"web/paperclip_api.py", # the shell itself
|
||||
}
|
||||
SEAM_IMPORT = re.compile(r"^\s*(from\s+web\.paperclip_(client|api)\s+import|"
|
||||
r"import\s+web\.paperclip_(client|api)\b)")
|
||||
|
||||
_SKIP_PARTS = {".venv", "node_modules", "__pycache__", ".git", ".next"}
|
||||
|
||||
|
||||
def _is_test(p: Path) -> bool:
|
||||
return "tests" in p.parts or "test" in p.parts or p.name.startswith("test_")
|
||||
|
||||
|
||||
def _skip(p: Path) -> bool:
|
||||
return any(part in _SKIP_PARTS for part in p.parts)
|
||||
|
||||
|
||||
def _allowed(rel: str, line: str) -> bool:
|
||||
return any(rel.endswith(suf) and sub in line for suf, sub in _ALLOW)
|
||||
|
||||
|
||||
def _iter_py(base: Path):
|
||||
for p in base.rglob("*.py"):
|
||||
if not _skip(p) and not _is_test(p):
|
||||
yield p
|
||||
|
||||
|
||||
def scan(files: list[Path] | None = None) -> list[str]:
|
||||
"""Return a list of violation strings (empty == clean)."""
|
||||
violations: list[str] = []
|
||||
|
||||
# Rule 1 — intelligence layer is platform-clean.
|
||||
if files is None:
|
||||
targets = [p for d in PROTECTED_DIRS for p in _iter_py(REPO / d)]
|
||||
else:
|
||||
prot = [REPO / d for d in PROTECTED_DIRS]
|
||||
targets = [
|
||||
p for p in files
|
||||
if any(prot_d in p.resolve().parents or p.resolve() == prot_d
|
||||
for prot_d in prot)
|
||||
and p.suffix == ".py" and not _is_test(p) and not _skip(p)
|
||||
]
|
||||
for p in targets:
|
||||
rel = p.resolve().relative_to(REPO).as_posix()
|
||||
try:
|
||||
lines = p.read_text(encoding="utf-8").splitlines()
|
||||
except (OSError, UnicodeDecodeError):
|
||||
continue
|
||||
for i, line in enumerate(lines, 1):
|
||||
if HARD.search(line) and not _allowed(rel, line):
|
||||
violations.append(
|
||||
f"{rel}:{i}: Paperclip symbol in the intelligence layer "
|
||||
f"(INV-G12). Route platform access through "
|
||||
f"web/agent_platform_port.py, or add a justified baseline "
|
||||
f"entry in scripts/leak_guard.py if genuinely benign.\n"
|
||||
f" {line.strip()[:120]}"
|
||||
)
|
||||
|
||||
# Rule 2 — import seam (web/ only).
|
||||
web = REPO / "web"
|
||||
seam_targets = (
|
||||
[p for p in _iter_py(web)]
|
||||
if files is None
|
||||
else [p for p in files
|
||||
if p.suffix == ".py" and (web in p.resolve().parents)
|
||||
and not _is_test(p)]
|
||||
)
|
||||
for p in seam_targets:
|
||||
rel = p.resolve().relative_to(REPO).as_posix()
|
||||
if rel in SEAM_ALLOWED:
|
||||
continue
|
||||
try:
|
||||
lines = p.read_text(encoding="utf-8").splitlines()
|
||||
except (OSError, UnicodeDecodeError):
|
||||
continue
|
||||
for i, line in enumerate(lines, 1):
|
||||
if SEAM_IMPORT.search(line):
|
||||
violations.append(
|
||||
f"{rel}:{i}: imports the Paperclip client directly "
|
||||
f"(INV-G12 seam). Import from web.agent_platform_port instead.\n"
|
||||
f" {line.strip()[:120]}"
|
||||
)
|
||||
return violations
|
||||
|
||||
|
||||
def main(argv: list[str]) -> int:
|
||||
files = [Path(a) for a in argv] or None
|
||||
violations = scan(files)
|
||||
if violations:
|
||||
sys.stderr.write(
|
||||
"✗ G12 leak-guard — Agent Platform Port violated "
|
||||
f"({len(violations)} finding(s)):\n\n"
|
||||
)
|
||||
for v in violations:
|
||||
sys.stderr.write(f" • {v}\n")
|
||||
sys.stderr.write(
|
||||
"\nSee docs/spec/X15-agent-platform-port.md (G12).\n"
|
||||
)
|
||||
return 1
|
||||
if files is None:
|
||||
print("✓ G12 leak-guard: intelligence layer is platform-clean; "
|
||||
"import seam intact.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main(sys.argv[1:]))
|
||||
Reference in New Issue
Block a user