Files
legal-ai/scripts/sync_agents_across_companies.py
Chaim cf5f6fe274 feat(paperclip): close 11 integration gaps (#16-#28)
Brings the legal-ai ↔ Paperclip integration in line with the official
Paperclip skill. Net effect: HEARTBEAT.md -47% (370→195 lines), all 14
agents on uniform runtime_config + budget + instructionsBundleMode, and
two cross-company helpers replacing manual SQL.

Highlights:
- HEARTBEAT.md refactor: project-specific only, delegates to the official
  paperclipai/paperclip skill (loaded per agent). Adds heartbeat-context
  fast-path (§1.7) and PAPERCLIP_WAKE_PAYLOAD_JSON shortcut (§1.5).
- Issue Thread Interactions API: legal-ceo.md now uses
  ask_user_questions / request_confirmation / suggest_tasks instead of
  free-text comments — gives chair structured UI with idempotency keys.
- pc.sh + paperclip_api.pc_request: every API call goes through helpers
  that inject Authorization + X-Paperclip-Run-Id (audit trail).
- sync_agents_across_companies.py: master(CMP)→mirror(CMPA) sync via
  Paperclip API, idempotent, with --verify and --apply modes.
- skills/new-company-setup: 11-step blueprint distilling all 11 gaps
  into a single onboarding runbook for the next company.
- .taskmaster: 12 tasks covering each gap (one already closed: #29).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 17:25:45 +00:00

383 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""sync_agents_across_companies.py — Mirror agent configs from CMP (1xxx) to CMPA (8xxx).
Gap #25: Paperclip enforces ``agents.company_id NOT NULL``, so we have 14
agents (7 × 2 companies). Without sync, settings drift between the master
(CMP, 1xxx) and the mirror (CMPA, 8xxx). This script copies the relevant
fields one-way: CMP → CMPA.
Design: "אל-כשל" — backup before apply, idempotent, dry-run by default,
clear field-level diff, rollback path printed on failure.
Synced fields:
- adapter_config.{model, effort, timeoutSec, maxTurnsPerRun,
instructionsBundleMode, instructionsRootPath,
instructionsEntryFile, instructionsFilePath,
dangerouslySkipPermissions, extraArgs, cwd}
- adapter_config.paperclipSkillSync.desiredSkills (filtered for skills
that exist in the mirror company — local skills like
``local/eba6210d5a/legal-decision`` only exist in CMP)
- runtime_config (full replace — heartbeat config)
- budget_monthly_cents
- metadata, icon, title, role
Not synced (intentionally per-company):
- id, company_id, name, reports_to, default_environment_id
- adapter_type, agent_api_keys
- status, pause_reason, paused_at, last_heartbeat_at
- spent_monthly_cents (separate usage)
- permissions (per-company access policies)
Usage:
python sync_agents_across_companies.py --verify # show drift only
python sync_agents_across_companies.py --dry-run # show plan
python sync_agents_across_companies.py --apply # backup + apply
Requires:
PAPERCLIP_BOARD_API_KEY (Infisical: /paperclip @ nautilus)
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import asyncpg
import httpx
PAPERCLIP_DB_URL = os.environ.get(
"PAPERCLIP_DB_URL", "postgresql://paperclip:paperclip@127.0.0.1:54329/paperclip"
)
PAPERCLIP_API_URL = os.environ.get("PAPERCLIP_API_URL", "http://localhost:3100")
PAPERCLIP_BOARD_API_KEY = os.environ.get("PAPERCLIP_BOARD_API_KEY", "")
BACKUP_DIR = Path("/home/chaim/.paperclip/instances/default/data/backups/manual")
CMP_COMPANY_ID = "42a7acd0-30c5-4cbd-ac97-7424f65df294" # MASTER (1xxx)
CMPA_COMPANY_ID = "8639e837-4c9d-47fa-a76b-95788d651896" # MIRROR (8xxx)
# adapter_config keys to sync (top-level only; paperclipSkillSync handled separately)
ADAPTER_CONFIG_SYNC_KEYS = [
"model", "effort", "timeoutSec", "maxTurnsPerRun",
"instructionsBundleMode", "instructionsRootPath", "instructionsEntryFile", "instructionsFilePath",
"dangerouslySkipPermissions", "extraArgs", "cwd",
]
# Top-level agent fields to sync
TOP_LEVEL_SYNC_FIELDS = [
"budget_monthly_cents", "metadata", "icon", "title", "role",
]
def fail(msg: str) -> None:
print(f"{msg}", file=sys.stderr)
sys.exit(1)
async def fetch_agents(conn: asyncpg.Connection, company_id: str) -> list[dict[str, Any]]:
rows = await conn.fetch(
"""
SELECT id::text, name, role, title, icon,
adapter_type, adapter_config, runtime_config, metadata,
budget_monthly_cents
FROM agents
WHERE company_id = $1::uuid
ORDER BY name
""",
company_id,
)
out = []
for r in rows:
d = dict(r)
# asyncpg returns jsonb as str; parse
for k in ("adapter_config", "runtime_config", "metadata"):
if isinstance(d.get(k), str):
d[k] = json.loads(d[k]) if d[k] else None
out.append(d)
return out
async def fetch_company_skills(conn: asyncpg.Connection, company_id: str) -> set[str]:
rows = await conn.fetch(
"SELECT key FROM company_skills WHERE company_id = $1::uuid",
company_id,
)
return {r["key"] for r in rows}
def _get(d: dict | None, key: str, default=None):
return d.get(key, default) if isinstance(d, dict) else default
def compute_diff(master: dict, mirror: dict, mirror_skills: set[str]) -> dict[str, Any]:
"""Return a dict describing what would change in mirror to match master.
Empty dict = in sync."""
diff: dict[str, Any] = {}
# Top-level fields
for field in TOP_LEVEL_SYNC_FIELDS:
if master.get(field) != mirror.get(field):
diff[field] = {"from": mirror.get(field), "to": master.get(field)}
# adapter_config (per key)
m_ac = master.get("adapter_config") or {}
r_ac = mirror.get("adapter_config") or {}
ac_changes = {}
for key in ADAPTER_CONFIG_SYNC_KEYS:
if _get(m_ac, key) != _get(r_ac, key):
ac_changes[key] = {"from": _get(r_ac, key), "to": _get(m_ac, key)}
if ac_changes:
diff["adapter_config"] = ac_changes
# paperclipSkillSync.desiredSkills — compare as a SUBSET check.
# The Paperclip API auto-adds company-level required runtime skills
# (e.g. paperclip-dev) to the desiredSkills list, so the mirror can
# legitimately have MORE skills than master. We only need master's
# filtered skills to be a subset of mirror's actual list.
master_desired = list((_get(m_ac, "paperclipSkillSync") or {}).get("desiredSkills") or [])
mirror_desired = list((_get(r_ac, "paperclipSkillSync") or {}).get("desiredSkills") or [])
master_filtered = [s for s in master_desired if s in mirror_skills]
skipped = [s for s in master_desired if s not in mirror_skills]
missing_in_mirror = set(master_filtered) - set(mirror_desired)
if missing_in_mirror:
diff["paperclipSkillSync.desiredSkills"] = {
"from": mirror_desired,
"to": master_filtered,
"missing_in_mirror": sorted(missing_in_mirror),
"skipped_unavailable_in_mirror": skipped,
}
# runtime_config (full replace)
if (master.get("runtime_config") or {}) != (mirror.get("runtime_config") or {}):
diff["runtime_config"] = {"from": mirror.get("runtime_config"), "to": master.get("runtime_config")}
return diff
def backup_agents_table() -> Path:
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
out = BACKUP_DIR / f"agents-pre-cross-company-sync-{stamp}.sql"
env = {**os.environ, "PGPASSWORD": "paperclip"}
subprocess.run(
["pg_dump", "-h", "127.0.0.1", "-p", "54329", "-U", "paperclip",
"-d", "paperclip", "-t", "agents", "--data-only", "-f", str(out)],
check=True, env=env,
)
return out
def _short(value, max_len=80) -> str:
s = json.dumps(value, ensure_ascii=False, default=str) if not isinstance(value, str) else value
if len(s) > max_len:
return s[:max_len] + "..."
return s
def print_diff(agent_name: str, diff: dict, master_id: str, mirror_id: str) -> None:
if not diff:
print(f"{agent_name:14s} — in sync (no changes)")
return
print(f"{agent_name:14s}{len(diff)} change(s): master={master_id[:8]}… → mirror={mirror_id[:8]}")
for key, change in diff.items():
if key == "adapter_config":
for ac_key, ac_change in change.items():
print(f" adapter_config.{ac_key}: {_short(ac_change['from'])}{_short(ac_change['to'])}")
elif key == "paperclipSkillSync.desiredSkills":
print(f" paperclipSkillSync.desiredSkills: {len(change['from'])}{len(change['to'])} skills")
for s in change.get("skipped_unavailable_in_mirror", []):
print(f" (skipped, not in mirror company: {s})")
elif key == "runtime_config":
print(f" runtime_config: full replace")
print(f" from: {_short(change['from'], 100)}")
print(f" to: {_short(change['to'], 100)}")
else:
print(f" {key}: {_short(change['from'])}{_short(change['to'])}")
async def call_patch(agent_id: str, body: dict) -> tuple[int, dict]:
if not PAPERCLIP_BOARD_API_KEY:
fail("PAPERCLIP_BOARD_API_KEY not set")
headers = {
"Authorization": f"Bearer {PAPERCLIP_BOARD_API_KEY}",
"X-Paperclip-Run-Id": "",
"Content-Type": "application/json",
}
url = f"{PAPERCLIP_API_URL}/api/agents/{agent_id}"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.patch(url, headers=headers, json=body)
try:
data = resp.json()
except Exception:
data = {"raw": resp.text[:500]}
return resp.status_code, data
async def call_skill_sync(agent_id: str, desired_skills: list[str]) -> tuple[int, dict]:
if not PAPERCLIP_BOARD_API_KEY:
fail("PAPERCLIP_BOARD_API_KEY not set")
headers = {
"Authorization": f"Bearer {PAPERCLIP_BOARD_API_KEY}",
"X-Paperclip-Run-Id": "",
"Content-Type": "application/json",
}
url = f"{PAPERCLIP_API_URL}/api/agents/{agent_id}/skills/sync"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, headers=headers, json={"desiredSkills": desired_skills})
try:
data = resp.json()
except Exception:
data = {"raw": resp.text[:500]}
return resp.status_code, data
async def apply_diff(mirror_id: str, agent_name: str, diff: dict) -> list[str]:
"""Apply the computed diff to the mirror agent. Returns list of error strings."""
errors: list[str] = []
# Build PATCH body for top-level + adapter_config (skills handled separately)
patch_body: dict[str, Any] = {}
for field in TOP_LEVEL_SYNC_FIELDS:
if field in diff:
# snake_case → camelCase for the API
api_key = {
"budget_monthly_cents": "budgetMonthlyCents",
"metadata": "metadata",
"icon": "icon",
"title": "title",
"role": "role",
}[field]
patch_body[api_key] = diff[field]["to"]
if "adapter_config" in diff:
patch_body["adapterConfig"] = {k: v["to"] for k, v in diff["adapter_config"].items()}
if "runtime_config" in diff:
patch_body["runtimeConfig"] = diff["runtime_config"]["to"]
if patch_body:
status, data = await call_patch(mirror_id, patch_body)
if status >= 400:
errors.append(f"PATCH HTTP {status}: {json.dumps(data)[:300]}")
else:
print(f" ✓ PATCH applied ({len(patch_body)} top-level keys)")
# Skills via dedicated endpoint (creates 'skill-sync' revision)
if "paperclipSkillSync.desiredSkills" in diff:
desired = diff["paperclipSkillSync.desiredSkills"]["to"]
status, data = await call_skill_sync(mirror_id, desired)
if status >= 400:
errors.append(f"skills/sync HTTP {status}: {json.dumps(data)[:300]}")
else:
print(f" ✓ skills/sync applied ({len(desired)} skills)")
return errors
async def main() -> None:
p = argparse.ArgumentParser()
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--verify", action="store_true", help="Show current drift, no changes")
g.add_argument("--dry-run", action="store_true", help="Show what would change")
g.add_argument("--apply", action="store_true", help="Backup + apply changes")
p.add_argument("--only", help="Sync only the named agent (e.g., 'עוזר משפטי')")
args = p.parse_args()
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
master_agents = await fetch_agents(conn, CMP_COMPANY_ID)
mirror_agents = await fetch_agents(conn, CMPA_COMPANY_ID)
mirror_skills = await fetch_company_skills(conn, CMPA_COMPANY_ID)
finally:
await conn.close()
mirror_by_name = {a["name"]: a for a in mirror_agents}
print(f"\n=== Master (CMP, 1xxx): {len(master_agents)} agents ===")
print(f"=== Mirror (CMPA, 8xxx): {len(mirror_agents)} agents ===")
print(f"=== Mirror has {len(mirror_skills)} local skills available ===\n")
print(f"=== Drift report ===")
plan: list[tuple[dict, dict, dict]] = [] # (master, mirror, diff)
for m in master_agents:
if args.only and m["name"] != args.only:
continue
mirror = mirror_by_name.get(m["name"])
if not mirror:
print(f"{m['name']:14s} — NOT FOUND in mirror (skipping; we never auto-create)")
continue
if m["adapter_type"] != mirror["adapter_type"]:
print(f"{m['name']:14s} — adapter_type mismatch ({m['adapter_type']} vs {mirror['adapter_type']}) — SKIPPING")
continue
diff = compute_diff(m, mirror, mirror_skills)
print_diff(m["name"], diff, m["id"], mirror["id"])
if diff:
plan.append((m, mirror, diff))
if args.verify:
print(f"\n(verify mode — exiting without changes)")
print(f"\nSummary: {len(plan)} agent(s) need sync, {len(master_agents) - len(plan)} in sync")
return
if not plan:
print(f"\n✓ All agents in sync — nothing to do.")
return
if args.dry_run:
print(f"\n(dry-run mode — exiting without changes)\nRe-run with --apply to execute.")
return
# APPLY
print(f"\n=== Backup ===")
backup_path = backup_agents_table()
print(f"{backup_path}")
print(f"\n=== Applying ({len(plan)} agents) ===")
all_errors: list[str] = []
for master, mirror, diff in plan:
print(f"\n{master['name']} ({mirror['id']})")
errors = await apply_diff(mirror["id"], master["name"], diff)
if errors:
for e in errors:
print(f"{e}")
all_errors.extend([f"{master['name']}: {e}" for e in errors])
if all_errors:
print(f"\n=== ⚠️ {len(all_errors)} error(s) ===")
print(f"Rollback option: psql ... -f {backup_path}")
sys.exit(1)
print(f"\n=== ✓ Sync complete — re-running --verify to confirm ===\n")
# Re-verify
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
master_agents = await fetch_agents(conn, CMP_COMPANY_ID)
mirror_agents = await fetch_agents(conn, CMPA_COMPANY_ID)
mirror_skills = await fetch_company_skills(conn, CMPA_COMPANY_ID)
finally:
await conn.close()
mirror_by_name = {a["name"]: a for a in mirror_agents}
still_drifting = 0
for m in master_agents:
mirror = mirror_by_name.get(m["name"])
if not mirror or m["adapter_type"] != mirror["adapter_type"]:
continue
diff = compute_diff(m, mirror, mirror_skills)
if diff:
still_drifting += 1
print(f"{m['name']:14s} — STILL has {len(diff)} change(s) after apply (review!)")
if still_drifting == 0:
print(f" ✓ All {len(master_agents)} agents in sync.")
else:
print(f"\n⚠️ {still_drifting} agents still drifting — investigate.")
if __name__ == "__main__":
asyncio.run(main())