#!/usr/bin/env python3 """migrate_agent_adapter.py — safely migrate ANY Paperclip agent to ANY adapter. Why: flipping an agent's adapter in the raw Paperclip dropdown crashes it. The three failure axes (model↔provider, instructions transport, tool gating) are described in adapter_profiles.py. On top of those, INV-MC1 requires the two companies (CMP 1xxx / CMPA 8xxx) to stay symmetric — a manual flip touches one and the cross-company sync then *skips* the agent, leaving silent drift. This tool reconciles all three axes, applies to BOTH company records together, snapshots the prior state for an exact revert, and refuses unsafe migrations in preflight (closing the FU-8a "no adapter-switch validation gate" gap). It is an operations tool in the declared SHELL layer (like sync_agents_across_companies.py) — it talks Paperclip via the documented PATCH /api/agents/{id} path, never direct DB writes (G12). Runtime artifacts (frontmatter-stripped instruction copies, the revert snapshot) are written to the MAIN working tree absolute paths, because that is where Paperclip actually launches the agents (shared cwd /home/chaim/legal-ai). Usage: # preflight only — reports what would change + warnings, no mutation python migrate_agent_adapter.py --check --agent "עוזר משפטי" --to gemini_local # migrate one agent (both companies) — degraded-fallback warning printed python migrate_agent_adapter.py --apply --agent "עוזר משפטי" --to gemini_local [--model X] [--relax-tools] # migrate the whole team python migrate_agent_adapter.py --apply --agent all --to gemini_local --relax-tools # undo: restore each migrated agent to its pre-migration state python migrate_agent_adapter.py --revert --agent "עוזר משפטי" # or --agent all # health gate: flag any agent in an incompatible / asymmetric state python migrate_agent_adapter.py --verify Requires: PAPERCLIP_BOARD_API_KEY (Infisical: /paperclip @ nautilus). Run with the interpreter that has asyncpg+httpx: /home/chaim/legal-ai/mcp-server/.venv/bin/python """ from __future__ import annotations import argparse import asyncio import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any import asyncpg sys.path.insert(0, str(Path(__file__).resolve().parent)) from adapter_profiles import ( # noqa: E402 ADAPTER_PROFILES, UnknownAdapter, get_profile, model_matches_provider, ) from sync_agents_across_companies import ( # noqa: E402 (reuse SHELL helpers) CMP_COMPANY_ID, CMPA_COMPANY_ID, PAPERCLIP_DB_URL, call_patch, fetch_agents, ) # Absolute MAIN-tree paths — agents run there, not in any worktree. AGENTS_DIR = Path("/home/chaim/legal-ai/.claude/agents") GENERATED_DIR = AGENTS_DIR / ".generated" SIDECAR = Path("/home/chaim/legal-ai/data/adapter-migration-state.json") GEMINI_SETTINGS = Path.home() / ".gemini" / "settings.json" NOFM_SUFFIX = ".nofm.md" COMPANY_IDS = [CMP_COMPANY_ID, CMPA_COMPANY_ID] COMPANY_LABELS = { CMP_COMPANY_ID: "CMP (רישוי ובניה)", CMPA_COMPANY_ID: "CMPA (היטלי השבחה)", } def fail(msg: str) -> None: print(f"❌ {msg}", file=sys.stderr) sys.exit(1) # ─────────────────────────── sidecar (revert state) ─────────────────────────── def load_sidecar() -> dict: if SIDECAR.exists(): try: return json.loads(SIDECAR.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError) as e: fail(f"sidecar פגום ({SIDECAR}): {e}") return {"agents": {}, "gemini_excludetools_baseline": None} def save_sidecar(data: dict) -> None: SIDECAR.parent.mkdir(parents=True, exist_ok=True) SIDECAR.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") # ─────────────────────────── instructions handling ─────────────────────────── def canonical_instructions_path(current_path: str) -> str: """Resolve the frontmatter-bearing canonical .md, even if currently pointed at a generated .nofm.md copy.""" if not current_path: return "" p = Path(current_path) if p.parent == GENERATED_DIR and p.name.endswith(NOFM_SUFFIX): base = p.name[: -len(NOFM_SUFFIX)] return str(AGENTS_DIR / f"{base}.md") return current_path def strip_frontmatter(text: str) -> str: """Remove a leading YAML `---...---` block so the content never starts with `--` (which content_arg adapters parse as a CLI flag).""" m = re.match(r"^---\n.*?\n---\n", text, re.DOTALL) if m: text = text[m.end():] text = text.lstrip("\n") if not text or text.lstrip().startswith("--"): text = "# הוראות סוכן\n\n" + text return text def render_nofm_text(canonical_path: str) -> str: return strip_frontmatter(Path(canonical_path).read_text(encoding="utf-8")) def write_nofm(canonical_path: str) -> str: """Generate the frontmatter-free copy and return its absolute path.""" GENERATED_DIR.mkdir(parents=True, exist_ok=True) base = Path(canonical_path).stem out = GENERATED_DIR / f"{base}{NOFM_SUFFIX}" out.write_text(render_nofm_text(canonical_path), encoding="utf-8") return str(out) def frontmatter_tools(canonical_path: str) -> list[str]: """Parse the `tools:` list from an agent .md frontmatter (bare tool names, `mcp__legal-ai__` prefix stripped).""" try: lines = Path(canonical_path).read_text(encoding="utf-8").splitlines() except OSError: return [] tools: list[str] = [] in_tools = False for line in lines: if not in_tools: if re.match(r"^tools:\s*$", line): in_tools = True continue m = re.match(r"^\s+-\s+(.+?)\s*$", line) if m: tools.append(m.group(1).strip()) elif line.strip() and not line.startswith(" "): break # left the tools: block return [t.replace("mcp__legal-ai__", "") for t in tools] # ─────────────────────────── gemini excludeTools ─────────────────────────── def read_gemini_excludetools() -> list[str]: if not GEMINI_SETTINGS.exists(): return [] try: cfg = json.loads(GEMINI_SETTINGS.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return [] return list((((cfg.get("mcpServers") or {}).get("legal-ai") or {}).get("excludeTools")) or []) def write_gemini_excludetools(tools: list[str]) -> None: cfg = json.loads(GEMINI_SETTINGS.read_text(encoding="utf-8")) cfg.setdefault("mcpServers", {}).setdefault("legal-ai", {})["excludeTools"] = sorted(set(tools)) GEMINI_SETTINGS.write_text(json.dumps(cfg, ensure_ascii=False, indent=2), encoding="utf-8") def recompute_gemini_excludetools(sidecar: dict) -> list[str] | None: """excludeTools = baseline minus the union of tools relaxed for agents still targeted at gemini_local. Returns the list to write, or None if no baseline was ever captured (nothing to recompute).""" baseline = sidecar.get("gemini_excludetools_baseline") if baseline is None: return None relaxed: set[str] = set() for entry in sidecar["agents"].values(): if entry.get("current_target") == "gemini_local": relaxed.update(entry.get("relaxed_tools") or []) return [t for t in baseline if t not in relaxed] # ─────────────────────────── agent loading ─────────────────────────── async def load_pairs() -> dict[str, dict[str, dict]]: """Return {agent_name: {company_id: agent_row}}.""" conn = await asyncpg.connect(PAPERCLIP_DB_URL) try: rows = [] for cid in COMPANY_IDS: for a in await fetch_agents(conn, cid): a["_company_id"] = cid rows.append(a) finally: await conn.close() pairs: dict[str, dict[str, dict]] = {} for a in rows: pairs.setdefault(a["name"], {})[a["_company_id"]] = a return pairs def adapter_of(agent: dict) -> str: return agent.get("adapter_type") or "" def model_of(agent: dict) -> str: ac = agent.get("adapter_config") or {} return ac.get("model") or "" def instr_of(agent: dict) -> str: ac = agent.get("adapter_config") or {} return ac.get("instructionsFilePath") or "" # ─────────────────────────── planning / preflight ─────────────────────────── def plan_migration(name: str, records: dict[str, dict], target: str, model_override: str | None) -> dict: """Compute the safe migration for one agent (both companies). Returns a dict with target_model, target_instr, nofm_needed, tool_conflicts, errors[], warnings[].""" errors: list[str] = [] warnings: list[str] = [] try: profile = get_profile(target) except UnknownAdapter as e: return {"errors": [str(e)], "warnings": []} # INV-MC1: both company records must exist to flip symmetrically. missing = [COMPANY_LABELS[c] for c in COMPANY_IDS if c not in records] if missing: errors.append(f"חסרה רשומת-סוכן בחברות: {', '.join(missing)} — אי-אפשר להפוך סימטרית.") return {"errors": errors, "warnings": warnings} target_model = (model_override or profile["default_model"] or "").strip() if not model_matches_provider(target_model, target): errors.append( f"מודל {target_model!r} אינו תואם ל-provider {profile['provider']!r} של {target}. " f"השתמש ב---model או הסר אותו לקבלת ברירת-המחדל ({profile['default_model']})." ) # instructions transport canonical = canonical_instructions_path(instr_of(records[CMP_COMPANY_ID])) if not canonical or not os.path.exists(canonical): errors.append(f"קובץ-הוראות קנוני חסר: {canonical or '(לא מוגדר)'}") nofm_needed = not profile["frontmatter_safe"] target_instr = canonical elif profile["frontmatter_safe"]: nofm_needed = False target_instr = canonical else: nofm_needed = True target_instr = str(GENERATED_DIR / f"{Path(canonical).stem}{NOFM_SUFFIX}") # validate the (in-memory) stripped content really is flag-safe stripped = render_nofm_text(canonical) if stripped.lstrip().startswith("--"): errors.append("גריסת-ה-frontmatter עדיין פותחת ב-'--' — לא בטוח ל-content_arg adapter.") # tool gating (gemini global excludeTools) tool_conflicts: list[str] = [] if profile["tool_config"] == "gemini_global" and canonical and os.path.exists(canonical): required = set(frontmatter_tools(canonical)) excluded = set(read_gemini_excludetools()) tool_conflicts = sorted(required & excluded) if tool_conflicts: warnings.append( "כלי-כתיבה שייחסמו תחת gemini (excludeTools גלובלי): " f"{', '.join(tool_conflicts)} — הוסף --relax-tools כדי לשחרר אותם." ) return { "target_model": target_model, "target_instr": target_instr, "nofm_needed": nofm_needed, "canonical": canonical, "tool_conflicts": tool_conflicts, "errors": errors, "warnings": warnings, } def print_plan(name: str, records: dict, target: str, plan: dict) -> None: cur = adapter_of(records[CMP_COMPANY_ID]) print(f"\n • {name} — {cur or '(none)'} → {target}") if plan.get("errors"): for e in plan["errors"]: print(f" ❌ {e}") return print(f" model: {model_of(records[CMP_COMPANY_ID]) or '(default)'} → {plan['target_model']}") print(f" instructions: {plan['target_instr']}" f"{' (frontmatter-stripped copy)' if plan['nofm_needed'] else ''}") for w in plan["warnings"]: print(f" ⚠ {w}") # ─────────────────────────── apply / revert ─────────────────────────── async def apply_one(name: str, records: dict, target: str, plan: dict, sidecar: dict, relax_tools: bool) -> list[str]: errors: list[str] = [] # 1. snapshot ORIGINAL state once (first time we ever touch this agent), # so --revert always returns to the true pre-migration config. agents_state = sidecar["agents"] if name not in agents_state: agents_state[name] = { "original": { cid: { "adapter_type": adapter_of(records[cid]), "model": model_of(records[cid]), "instructionsFilePath": instr_of(records[cid]), } for cid in COMPANY_IDS }, "relaxed_tools": [], } agents_state[name]["current_target"] = target # 2. generate the frontmatter-free instructions copy if needed if plan["nofm_needed"]: write_nofm(plan["canonical"]) # 3. PATCH both company records (adapterType + adapterConfig merge) for cid in COMPANY_IDS: body = { "adapterType": target, "adapterConfig": { "model": plan["target_model"], "instructionsFilePath": plan["target_instr"], }, } status, data = await call_patch(records[cid]["id"], body) if status >= 400: errors.append(f"{COMPANY_LABELS[cid]}: PATCH HTTP {status}: {json.dumps(data)[:200]}") else: print(f" ✓ {COMPANY_LABELS[cid]} → {target} / {plan['target_model']}") # 4. relax global gemini excludeTools if requested if relax_tools and plan.get("tool_conflicts"): if sidecar.get("gemini_excludetools_baseline") is None: sidecar["gemini_excludetools_baseline"] = read_gemini_excludetools() agents_state[name]["relaxed_tools"] = plan["tool_conflicts"] new_excl = recompute_gemini_excludetools(sidecar) if new_excl is not None: write_gemini_excludetools(new_excl) print(f" ✓ excludeTools שוחרר: {', '.join(plan['tool_conflicts'])} (גלובלי לכל gemini)") return errors async def revert_one(name: str, records: dict, sidecar: dict) -> list[str]: errors: list[str] = [] entry = sidecar["agents"].get(name) if not entry: print(f" ⏭ {name} — אין snapshot לשחזור (לא הועבר דרך הכלי).") return errors for cid in COMPANY_IDS: if cid not in records: errors.append(f"{name}: חסרה רשומה ב-{COMPANY_LABELS[cid]} — דלג.") continue orig = entry["original"][cid] body = { "adapterType": orig["adapter_type"], "adapterConfig": { "model": orig["model"], "instructionsFilePath": orig["instructionsFilePath"], }, } status, data = await call_patch(records[cid]["id"], body) if status >= 400: errors.append(f"{COMPANY_LABELS[cid]}: PATCH HTTP {status}: {json.dumps(data)[:200]}") else: print(f" ✓ {COMPANY_LABELS[cid]} ← {orig['adapter_type']} / {orig['model'] or '(default)'}") if not errors: sidecar["agents"].pop(name, None) # recompute global excludeTools now that this agent no longer needs relaxing new_excl = recompute_gemini_excludetools(sidecar) if new_excl is not None: write_gemini_excludetools(new_excl) return errors # ─────────────────────────── verify ─────────────────────────── def verify_pairs(pairs: dict[str, dict]) -> int: """Flag any agent whose live state is incompatible or asymmetric. Returns the number of flagged agents (0 = healthy).""" flagged = 0 print("\n=== verify — תאימות מצב חי ===") for name in sorted(pairs): records = pairs[name] problems: list[str] = [] # INV-MC1 — same adapter_type in both companies adapters = {adapter_of(records[c]) for c in records} if len(records) == 2 and len(adapters) > 1: problems.append(f"adapter_type א-סימטרי בין החברות: {sorted(adapters)}") for cid, agent in records.items(): at = adapter_of(agent) if at not in ADAPTER_PROFILES: problems.append(f"{COMPANY_LABELS.get(cid, cid)}: adapter לא מוכר {at!r}") continue profile = ADAPTER_PROFILES[at] if not model_matches_provider(model_of(agent), at): problems.append( f"{COMPANY_LABELS.get(cid, cid)}: מודל {model_of(agent)!r} לא תואם ל-{at}") if not profile["frontmatter_safe"]: instr = instr_of(agent) if instr and os.path.exists(instr): head = Path(instr).read_text(encoding="utf-8", errors="ignore").lstrip() if head.startswith("--"): problems.append( f"{COMPANY_LABELS.get(cid, cid)}: קובץ-הוראות פותח ב-'--' → יקרוס תחת {at}") elif instr: problems.append(f"{COMPANY_LABELS.get(cid, cid)}: קובץ-הוראות חסר: {instr}") if problems: flagged += 1 print(f" ❌ {name}") for p in problems: print(f" {p}") else: print(f" ✓ {name} — {sorted(adapters)[0] if adapters else '—'}") print(f"\nSummary: {flagged} סוכנים בעייתיים → {'DRIFT' if flagged else 'תקין'}") return flagged # ─────────────────────────── main ─────────────────────────── async def main() -> None: p = argparse.ArgumentParser(description="Migrate any Paperclip agent to any adapter (both companies).") g = p.add_mutually_exclusive_group(required=True) g.add_argument("--check", action="store_true", help="Preflight only — no mutation") g.add_argument("--apply", action="store_true", help="Perform the migration") g.add_argument("--revert", action="store_true", help="Restore pre-migration state") g.add_argument("--verify", action="store_true", help="Flag incompatible/asymmetric agents") p.add_argument("--agent", help="Agent name (e.g. 'עוזר משפטי') or 'all'") p.add_argument("--to", dest="target", help="Target adapter (for --check/--apply)") p.add_argument("--model", help="Override target model (else adapter default)") p.add_argument("--relax-tools", action="store_true", help="Remove conflicting write tools from the global gemini excludeTools") args = p.parse_args() pairs = await load_pairs() if args.verify: sys.exit(1 if verify_pairs(pairs) else 0) if not args.agent: fail("--agent נדרש (שם-סוכן או 'all').") if args.agent == "all": names = sorted(pairs) elif args.agent in pairs: names = [args.agent] else: fail(f"סוכן לא נמצא: {args.agent!r}. קיימים: {', '.join(sorted(pairs))}") # ── revert ── if args.revert: sidecar = load_sidecar() all_errors: list[str] = [] print(f"=== revert ({len(names)} סוכנים) ===") for name in names: print(f"\n ↩ {name}") errs = await revert_one(name, pairs[name], sidecar) all_errors += [f"{name}: {e}" for e in errs] save_sidecar(sidecar) if all_errors: print(f"\n⚠️ {len(all_errors)} שגיאות:") for e in all_errors: print(f" {e}") sys.exit(1) print("\n✓ revert הושלם.") return # ── check / apply ── if not args.target: fail("--to נדרש עבור --check/--apply.") if args.target not in ADAPTER_PROFILES: fail(f"אדפטר לא מוכר: {args.target}. רשומים: {', '.join(sorted(ADAPTER_PROFILES))}") plans = {n: plan_migration(n, pairs[n], args.target, args.model) for n in names} print(f"=== {'check' if args.check else 'apply'} → {args.target} ({len(names)} סוכנים) ===") for n in names: print_plan(n, pairs[n], args.target, plans[n]) hard_errors = {n: pl["errors"] for n, pl in plans.items() if pl.get("errors")} if hard_errors: print(f"\n❌ {len(hard_errors)} סוכנים עם שגיאות-preflight — לא בטוח להעביר.") if args.check: sys.exit(1) fail("תקן את השגיאות לפני --apply.") if args.check: print("\n✓ preflight עבר. הרץ עם --apply לביצוע.") return # degraded-fallback warning profile = ADAPTER_PROFILES[args.target] if profile["provider"] != "claude": print(f"\n⚠️ מעבר ל-{args.target} הוא fallback-חירום (degraded), לא שקול-איכות ל-Claude.") print(" החזר ל-claude_local (--revert) כשטוקני-Claude חוזרים.") sidecar = load_sidecar() all_errors: list[str] = [] print(f"\n=== applying ===") for n in names: print(f"\n → {n}") errs = await apply_one(n, pairs[n], args.target, plans[n], sidecar, args.relax_tools) all_errors += [f"{n}: {e}" for e in errs] save_sidecar(sidecar) if all_errors: print(f"\n⚠️ {len(all_errors)} שגיאות:") for e in all_errors: print(f" {e}") sys.exit(1) print(f"\n✓ הועברו {len(names)} סוכנים ל-{args.target}. הרץ --verify לאישור, ו-sync_agents_across_companies.py --verify לוודא אפס drift.") if __name__ == "__main__": asyncio.run(main())