#!/usr/bin/env python3 """sync_agents_across_companies.py — Mirror agent configs from CMP (1xxx) to CMPA (8xxx). Gap #25: Paperclip enforces ``agents.company_id NOT NULL``, so we have 14 agents (7 × 2 companies). Without sync, settings drift between the master (CMP, 1xxx) and the mirror (CMPA, 8xxx). This script copies the relevant fields one-way: CMP → CMPA. Design: "אל-כשל" — backup before apply, idempotent, dry-run by default, clear field-level diff, rollback path printed on failure. Synced fields: - adapter_config.{model, effort, timeoutSec, maxTurnsPerRun, instructionsBundleMode, instructionsRootPath, instructionsEntryFile, instructionsFilePath, dangerouslySkipPermissions, extraArgs, cwd} - adapter_config.paperclipSkillSync.desiredSkills (filtered for skills that exist in the mirror company — local skills like ``local/eba6210d5a/legal-decision`` only exist in CMP) - runtime_config (full replace — heartbeat config) - budget_monthly_cents - metadata, icon, title, role Not synced (intentionally per-company): - id, company_id, name, reports_to, default_environment_id - adapter_type, agent_api_keys - status, pause_reason, paused_at, last_heartbeat_at - spent_monthly_cents (separate usage) - permissions (per-company access policies) Usage: python sync_agents_across_companies.py --verify # show drift only python sync_agents_across_companies.py --dry-run # show plan python sync_agents_across_companies.py --apply # backup + apply Requires: PAPERCLIP_BOARD_API_KEY (Infisical: /paperclip @ nautilus) """ from __future__ import annotations import argparse import asyncio import json import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Any import asyncpg import httpx PAPERCLIP_DB_URL = os.environ.get( "PAPERCLIP_DB_URL", "postgresql://paperclip:paperclip@127.0.0.1:54329/paperclip" ) PAPERCLIP_API_URL = os.environ.get("PAPERCLIP_API_URL", "http://localhost:3100") PAPERCLIP_BOARD_API_KEY = os.environ.get("PAPERCLIP_BOARD_API_KEY", "") BACKUP_DIR = Path("/home/chaim/.paperclip/instances/default/data/backups/manual") CMP_COMPANY_ID = "42a7acd0-30c5-4cbd-ac97-7424f65df294" # MASTER (1xxx) CMPA_COMPANY_ID = "8639e837-4c9d-47fa-a76b-95788d651896" # MIRROR (8xxx) # adapter_config keys to sync (top-level only; paperclipSkillSync handled separately) ADAPTER_CONFIG_SYNC_KEYS = [ "model", "effort", "timeoutSec", "maxTurnsPerRun", "instructionsBundleMode", "instructionsRootPath", "instructionsEntryFile", "instructionsFilePath", "dangerouslySkipPermissions", "extraArgs", "cwd", ] # Top-level agent fields to sync TOP_LEVEL_SYNC_FIELDS = [ "budget_monthly_cents", "metadata", "icon", "title", "role", ] def fail(msg: str) -> None: print(f"❌ {msg}", file=sys.stderr) sys.exit(1) async def fetch_agents(conn: asyncpg.Connection, company_id: str) -> list[dict[str, Any]]: rows = await conn.fetch( """ SELECT id::text, name, role, title, icon, adapter_type, adapter_config, runtime_config, metadata, budget_monthly_cents FROM agents WHERE company_id = $1::uuid ORDER BY name """, company_id, ) out = [] for r in rows: d = dict(r) # asyncpg returns jsonb as str; parse for k in ("adapter_config", "runtime_config", "metadata"): if isinstance(d.get(k), str): d[k] = json.loads(d[k]) if d[k] else None out.append(d) return out async def fetch_company_skills(conn: asyncpg.Connection, company_id: str) -> set[str]: rows = await conn.fetch( "SELECT key FROM company_skills WHERE company_id = $1::uuid", company_id, ) return {r["key"] for r in rows} def _get(d: dict | None, key: str, default=None): return d.get(key, default) if isinstance(d, dict) else default def compute_diff(master: dict, mirror: dict, mirror_skills: set[str]) -> dict[str, Any]: """Return a dict describing what would change in mirror to match master. Empty dict = in sync.""" diff: dict[str, Any] = {} # Top-level fields for field in TOP_LEVEL_SYNC_FIELDS: if master.get(field) != mirror.get(field): diff[field] = {"from": mirror.get(field), "to": master.get(field)} # adapter_config (per key) m_ac = master.get("adapter_config") or {} r_ac = mirror.get("adapter_config") or {} ac_changes = {} for key in ADAPTER_CONFIG_SYNC_KEYS: if _get(m_ac, key) != _get(r_ac, key): ac_changes[key] = {"from": _get(r_ac, key), "to": _get(m_ac, key)} if ac_changes: diff["adapter_config"] = ac_changes # paperclipSkillSync.desiredSkills — compare as a SUBSET check. # The Paperclip API auto-adds company-level required runtime skills # (e.g. paperclip-dev) to the desiredSkills list, so the mirror can # legitimately have MORE skills than master. We only need master's # filtered skills to be a subset of mirror's actual list. master_desired = list((_get(m_ac, "paperclipSkillSync") or {}).get("desiredSkills") or []) mirror_desired = list((_get(r_ac, "paperclipSkillSync") or {}).get("desiredSkills") or []) master_filtered = [s for s in master_desired if s in mirror_skills] skipped = [s for s in master_desired if s not in mirror_skills] missing_in_mirror = set(master_filtered) - set(mirror_desired) if missing_in_mirror: diff["paperclipSkillSync.desiredSkills"] = { "from": mirror_desired, "to": master_filtered, "missing_in_mirror": sorted(missing_in_mirror), "skipped_unavailable_in_mirror": skipped, } # runtime_config (full replace) if (master.get("runtime_config") or {}) != (mirror.get("runtime_config") or {}): diff["runtime_config"] = {"from": mirror.get("runtime_config"), "to": master.get("runtime_config")} return diff def backup_agents_table() -> Path: BACKUP_DIR.mkdir(parents=True, exist_ok=True) stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") out = BACKUP_DIR / f"agents-pre-cross-company-sync-{stamp}.sql" env = {**os.environ, "PGPASSWORD": "paperclip"} subprocess.run( ["pg_dump", "-h", "127.0.0.1", "-p", "54329", "-U", "paperclip", "-d", "paperclip", "-t", "agents", "--data-only", "-f", str(out)], check=True, env=env, ) return out def _short(value, max_len=80) -> str: s = json.dumps(value, ensure_ascii=False, default=str) if not isinstance(value, str) else value if len(s) > max_len: return s[:max_len] + "..." return s def print_diff(agent_name: str, diff: dict, master_id: str, mirror_id: str) -> None: if not diff: print(f" ✓ {agent_name:14s} — in sync (no changes)") return print(f" ⚠ {agent_name:14s} — {len(diff)} change(s): master={master_id[:8]}… → mirror={mirror_id[:8]}…") for key, change in diff.items(): if key == "adapter_config": for ac_key, ac_change in change.items(): print(f" adapter_config.{ac_key}: {_short(ac_change['from'])} → {_short(ac_change['to'])}") elif key == "paperclipSkillSync.desiredSkills": print(f" paperclipSkillSync.desiredSkills: {len(change['from'])} → {len(change['to'])} skills") for s in change.get("skipped_unavailable_in_mirror", []): print(f" (skipped, not in mirror company: {s})") elif key == "runtime_config": print(f" runtime_config: full replace") print(f" from: {_short(change['from'], 100)}") print(f" to: {_short(change['to'], 100)}") else: print(f" {key}: {_short(change['from'])} → {_short(change['to'])}") async def call_patch(agent_id: str, body: dict) -> tuple[int, dict]: if not PAPERCLIP_BOARD_API_KEY: fail("PAPERCLIP_BOARD_API_KEY not set") headers = { "Authorization": f"Bearer {PAPERCLIP_BOARD_API_KEY}", "X-Paperclip-Run-Id": "", "Content-Type": "application/json", } url = f"{PAPERCLIP_API_URL}/api/agents/{agent_id}" async with httpx.AsyncClient(timeout=30) as client: resp = await client.patch(url, headers=headers, json=body) try: data = resp.json() except Exception: data = {"raw": resp.text[:500]} return resp.status_code, data async def call_skill_sync(agent_id: str, desired_skills: list[str]) -> tuple[int, dict]: if not PAPERCLIP_BOARD_API_KEY: fail("PAPERCLIP_BOARD_API_KEY not set") headers = { "Authorization": f"Bearer {PAPERCLIP_BOARD_API_KEY}", "X-Paperclip-Run-Id": "", "Content-Type": "application/json", } url = f"{PAPERCLIP_API_URL}/api/agents/{agent_id}/skills/sync" async with httpx.AsyncClient(timeout=30) as client: resp = await client.post(url, headers=headers, json={"desiredSkills": desired_skills}) try: data = resp.json() except Exception: data = {"raw": resp.text[:500]} return resp.status_code, data async def apply_diff(mirror_id: str, agent_name: str, diff: dict) -> list[str]: """Apply the computed diff to the mirror agent. Returns list of error strings.""" errors: list[str] = [] # Build PATCH body for top-level + adapter_config (skills handled separately) patch_body: dict[str, Any] = {} for field in TOP_LEVEL_SYNC_FIELDS: if field in diff: # snake_case → camelCase for the API api_key = { "budget_monthly_cents": "budgetMonthlyCents", "metadata": "metadata", "icon": "icon", "title": "title", "role": "role", }[field] patch_body[api_key] = diff[field]["to"] if "adapter_config" in diff: patch_body["adapterConfig"] = {k: v["to"] for k, v in diff["adapter_config"].items()} if "runtime_config" in diff: patch_body["runtimeConfig"] = diff["runtime_config"]["to"] # Stamp claude_md_mtime + last_synced into metadata mtime = diff.get("_claude_md_mtime") if mtime: current_meta = dict(patch_body.get("metadata") or {}) current_meta["claude_md_mtime"] = mtime current_meta["claude_md_last_synced"] = datetime.now(timezone.utc).isoformat() patch_body["metadata"] = current_meta if patch_body: status, data = await call_patch(mirror_id, patch_body) if status >= 400: errors.append(f"PATCH HTTP {status}: {json.dumps(data)[:300]}") else: print(f" ✓ PATCH applied ({len(patch_body)} top-level keys)") # Skills via dedicated endpoint (creates 'skill-sync' revision) if "paperclipSkillSync.desiredSkills" in diff: desired = diff["paperclipSkillSync.desiredSkills"]["to"] status, data = await call_skill_sync(mirror_id, desired) if status >= 400: errors.append(f"skills/sync HTTP {status}: {json.dumps(data)[:300]}") else: print(f" ✓ skills/sync applied ({len(desired)} skills)") return errors def get_claude_md_mtime(adapter_config: dict) -> str | None: """Return Unix mtime of the agent's instructionsFilePath, or None if file missing.""" path = adapter_config.get("instructionsFilePath", "") if not path or not os.path.exists(path): return None return str(int(os.path.getmtime(path))) async def check_instructions(agents: list[dict]) -> bool: """Print a report of all agents' instruction files. Returns True if all OK.""" from datetime import datetime all_ok = True print(f"\n{'Agent':<30} {'File':<55} {'Status':<12} {'Size':>7} {'Modified'}") print("-" * 115) for agent in agents: adapter_cfg = agent.get("adapter_config") or {} if isinstance(adapter_cfg, str): adapter_cfg = json.loads(adapter_cfg) file_path = adapter_cfg.get("instructionsFilePath", "") name = (agent.get("name") or agent.get("id") or "?")[:29] if not file_path: print(f"{name:<30} {'(none)':<55} {'⚠ NOT SET':<12}") continue if not os.path.exists(file_path): print(f"{name:<30} {file_path[-54:]:<55} {'❌ MISSING':<12}") all_ok = False continue stat = os.stat(file_path) size_kb = stat.st_size // 1024 mtime = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M") # Check for drift vs DB metadata metadata = agent.get("metadata") or {} if isinstance(metadata, str): metadata = json.loads(metadata) db_mtime = metadata.get("claude_md_mtime", "") actual_mtime = str(int(stat.st_mtime)) drift = " ⚠ DRIFT" if db_mtime and db_mtime != actual_mtime else "" print(f"{name:<30} {file_path[-54:]:<55} {'✅ OK':<12} {size_kb:>5}KB {mtime}{drift}") print() return all_ok async def main() -> None: p = argparse.ArgumentParser() g = p.add_mutually_exclusive_group(required=True) g.add_argument("--verify", action="store_true", help="Show current drift, no changes") g.add_argument("--dry-run", action="store_true", help="Show what would change") g.add_argument("--apply", action="store_true", help="Backup + apply changes") g.add_argument("--check-instructions", action="store_true", help="Scan all agents' instructionsFilePath and report missing/outdated files") p.add_argument("--only", help="Sync only the named agent (e.g., 'עוזר משפטי')") args = p.parse_args() conn = await asyncpg.connect(PAPERCLIP_DB_URL) try: master_agents = await fetch_agents(conn, CMP_COMPANY_ID) mirror_agents = await fetch_agents(conn, CMPA_COMPANY_ID) mirror_skills = await fetch_company_skills(conn, CMPA_COMPANY_ID) finally: await conn.close() if args.check_instructions: all_agents = master_agents + mirror_agents all_ok = await check_instructions(all_agents) sys.exit(0 if all_ok else 1) mirror_by_name = {a["name"]: a for a in mirror_agents} print(f"\n=== Master (CMP, 1xxx): {len(master_agents)} agents ===") print(f"=== Mirror (CMPA, 8xxx): {len(mirror_agents)} agents ===") print(f"=== Mirror has {len(mirror_skills)} local skills available ===\n") print(f"=== Drift report ===") plan: list[tuple[dict, dict, dict]] = [] # (master, mirror, diff) for m in master_agents: if args.only and m["name"] != args.only: continue mirror = mirror_by_name.get(m["name"]) if not mirror: print(f" ⚠ {m['name']:14s} — NOT FOUND in mirror (skipping; we never auto-create)") continue if m["adapter_type"] != mirror["adapter_type"]: print(f" ⚠ {m['name']:14s} — adapter_type mismatch ({m['adapter_type']} vs {mirror['adapter_type']}) — SKIPPING") continue diff = compute_diff(m, mirror, mirror_skills) print_diff(m["name"], diff, m["id"], mirror["id"]) if diff: plan.append((m, mirror, diff)) if args.verify: print(f"\n(verify mode — exiting without changes)") print(f"\nSummary: {len(plan)} agent(s) need sync, {len(master_agents) - len(plan)} in sync") return if not plan: print(f"\n✓ All agents in sync — nothing to do.") return if args.dry_run: print(f"\n(dry-run mode — exiting without changes)\nRe-run with --apply to execute.") return # APPLY # Pre-flight: abort if any master agent is missing its instructions file print("🔍 Pre-flight: checking instruction files...") all_ok = await check_instructions(master_agents) if not all_ok: print("❌ Abort: one or more instruction files are missing. Fix before --apply.") sys.exit(1) print("✅ Pre-flight passed.\n") print(f"\n=== Backup ===") backup_path = backup_agents_table() print(f" ✓ {backup_path}") print(f"\n=== Applying ({len(plan)} agents) ===") all_errors: list[str] = [] for master, mirror, diff in plan: print(f"\n → {master['name']} ({mirror['id']})") # Inject mtime into diff so apply_diff can stamp metadata master_ac = master.get("adapter_config") or {} mtime = get_claude_md_mtime(master_ac) if mtime: diff["_claude_md_mtime"] = mtime errors = await apply_diff(mirror["id"], master["name"], diff) if errors: for e in errors: print(f" ❌ {e}") all_errors.extend([f"{master['name']}: {e}" for e in errors]) if all_errors: print(f"\n=== ⚠️ {len(all_errors)} error(s) ===") print(f"Rollback option: psql ... -f {backup_path}") sys.exit(1) print(f"\n=== ✓ Sync complete — re-running --verify to confirm ===\n") # Re-verify conn = await asyncpg.connect(PAPERCLIP_DB_URL) try: master_agents = await fetch_agents(conn, CMP_COMPANY_ID) mirror_agents = await fetch_agents(conn, CMPA_COMPANY_ID) mirror_skills = await fetch_company_skills(conn, CMPA_COMPANY_ID) finally: await conn.close() mirror_by_name = {a["name"]: a for a in mirror_agents} still_drifting = 0 for m in master_agents: mirror = mirror_by_name.get(m["name"]) if not mirror or m["adapter_type"] != mirror["adapter_type"]: continue diff = compute_diff(m, mirror, mirror_skills) if diff: still_drifting += 1 print(f" ⚠ {m['name']:14s} — STILL has {len(diff)} change(s) after apply (review!)") if still_drifting == 0: print(f" ✓ All {len(master_agents)} agents in sync.") else: print(f"\n⚠️ {still_drifting} agents still drifting — investigate.") if __name__ == "__main__": asyncio.run(main())