"""FU-8a / GAP-22: fitness function — forbid un-sanctioned Paperclip access. Fails if any scanned source (outside the allowlist) reaches the Paperclip API with a raw HTTP client or inserts directly into agent_wakeup_requests. The sanctioned paths are web/paperclip_api.py::pc_request (Python) and scripts/pc.sh (bash); wakeup must go through POST /api/agents/{id}/wakeup. """ from __future__ import annotations import re from pathlib import Path import pytest REPO = Path(__file__).resolve().parents[2] SCAN_ROOTS = [REPO / "web", REPO / "mcp-server" / "src", REPO / "scripts"] # Exempt ONLY from the raw-HTTP-to-Paperclip rule. Two categories, per the # endorsed "differentiate production code from operational tooling" pattern for # architectural fitness functions (cf. InfoQ fitness-functions; ESLint `overrides`): # (a) the sanctioned helpers themselves (the one place raw HTTP is correct); # (b) standalone operator/admin scripts run manually or by cron with the board # key — a distinct category from app/agent code. Forcing them through the # wrapper is over-engineering (DRY: "duplication is cheaper than the wrong # abstraction"); direct httpx with the board key is acceptable for tooling. # NOTE: the agent_wakeup_requests-INSERT rule is NOT exempted for anyone (below) — # it is a hard invariant for ALL code (a direct insert skips heartbeat creation). HTTP_RULE_ALLOWLIST = { REPO / "web" / "paperclip_api.py", # the sanctioned pc_request helper REPO / "scripts" / "pc.sh", # the sanctioned bash wrapper REPO / "web" / "paperclip_client.py", # legacy: DB reads only REPO / "scripts" / "sync_agents_across_companies.py", # operator tool: CMP→CMPA agent-config sync (CLAUDE.md) REPO / "scripts" / "audit_corpus_integrity.py", # cron audit tool: posts CEO wakeup via the wakeup API REPO / "scripts" / "fix_paperclipai_skills_drift.py", # one-shot operator fix (Gap #28 runbook) REPO / "scripts" / "sync_missing_agent_skills.py", # one-shot operator fix (Gap #28) } # Directories to skip entirely during scan (dead/archived code, virtual envs, test fixtures). _SKIP_PATH_FRAGMENTS = {"/.venv/", "/tests/", "/.archive/"} _PC_URL = re.compile(r"PAPERCLIP_API_URL|127\.0\.0\.1:3100|localhost:3100|pc\.nautilus\.marcusgroup\.org") _HTTP_CLIENT = re.compile(r"\bhttpx\b|\brequests\.(get|post|put|patch|delete)\b|\baiohttp\b|\bcurl\b") _WAKEUP_INSERT = re.compile(r"insert\s+into\s+agent_wakeup_requests", re.IGNORECASE) def _wakeup_violation(text: str) -> str | None: """Universal hard invariant — applies to ALL code (never allowlisted).""" if _WAKEUP_INSERT.search(text): return "direct INSERT INTO agent_wakeup_requests — use the wakeup API (POST /api/agents/{id}/wakeup)" return None def _http_violation(text: str) -> str | None: """Raw HTTP to Paperclip — exempted for HTTP_RULE_ALLOWLIST files only.""" if _PC_URL.search(text) and _HTTP_CLIENT.search(text): return "raw HTTP client + Paperclip URL — use web/paperclip_api.pc_request or scripts/pc.sh" return None def _scan_text(text: str) -> list[str]: """All violation reasons for a file's text, ignoring allowlist (used by unit tests).""" return [r for r in (_wakeup_violation(text), _http_violation(text)) if r] def _iter_source_files(): for root in SCAN_ROOTS: if not root.exists(): continue for ext in ("*.py", "*.sh"): for f in root.rglob(ext): if any(frag in str(f) for frag in _SKIP_PATH_FRAGMENTS): continue yield f def find_violations() -> list[tuple[str, str]]: """Wakeup-INSERT rule applies to every file; HTTP rule respects HTTP_RULE_ALLOWLIST.""" out = [] for f in _iter_source_files(): try: text = f.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError): continue w = _wakeup_violation(text) if w: out.append((str(f.relative_to(REPO)), w)) if f not in HTTP_RULE_ALLOWLIST: h = _http_violation(text) if h: out.append((str(f.relative_to(REPO)), h)) return out def test_scan_flags_raw_http_to_paperclip(): bad = 'import httpx\nasync def f():\n await httpx.post(f"{PAPERCLIP_API_URL}/x")\n' assert _scan_text(bad) def test_scan_flags_wakeup_insert(): bad = "await conn.execute('INSERT INTO agent_wakeup_requests (id) VALUES ($1)', x)" assert _scan_text(bad) def test_scan_ignores_plain_code(): assert _scan_text("def add(a, b):\n return a + b\n") == [] def test_wakeup_insert_rule_is_universal_not_allowlisted(): # The wakeup-INSERT invariant must apply to ALL code; find_violations checks it # for every file regardless of HTTP_RULE_ALLOWLIST. _wakeup_violation is the # standalone check used unconditionally in find_violations (no allowlist branch). assert _wakeup_violation("INSERT INTO agent_wakeup_requests (id) VALUES ($1)") is not None assert _http_violation('httpx.post(f"{PAPERCLIP_API_URL}/x")') is not None def test_repo_has_no_paperclip_access_violations(): violations = find_violations() assert violations == [], "Un-sanctioned Paperclip access found:\n" + "\n".join( f" {f}: {r}" for f, r in violations)