From 2f094b8d84c9748bf17e10aaaeeb1ec3b6320084 Mon Sep 17 00:00:00 2001 From: Chaim Date: Thu, 11 Jun 2026 13:26:30 +0000 Subject: [PATCH] =?UTF-8?q?feat(operations):=20=D7=9E=D7=A1=D7=9A=20"?= =?UTF-8?q?=D7=A1=D7=95=D7=9B=D7=A0=D7=99=D7=9D=20=D7=A4=D7=A2=D7=99=D7=9C?= =?UTF-8?q?=D7=99=D7=9D"=20+=20=D7=A0=D7=99=D7=94=D7=95=D7=9C=20=D7=A8?= =?UTF-8?q?=D7=99=D7=A6=D7=95=D7=AA=20(live-runs/log/cancel)=20(G12/X15,?= =?UTF-8?q?=20#119)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit פאנל ב-/operations שמראה אילו סוכני Paperclip עובדים כעת (רצים+בתור), הפלט החי שלהם, ושליטה מבוקרת: עצירת ריצה, איפוס session. סוגר את הנקודה-העיוורת שבה drain מונע-סוכן (למשל ריקון תור הלכות ע"י ה-CEO heartbeat) עוקף את בקרת /operations שמכירה רק שירותי pm2, והפלט הגולמי נגיש רק ב-Paperclip UI. מקור-נתונים: Paperclip heartbeat-runs API (אומת חי): GET /api/companies/{cid}/live-runs — רצים+בתור (agentName/status/issue/outputSilence) GET /api/heartbeat-runs/{id}/log — NDJSON של פלט הסוכן GET /api/heartbeat-runs/{id}/events — timeline POST /api/heartbeat-runs/{id}/cancel — עצירה מבוקרת (לא kill — מכבד watchdog+checkpoint) POST /api/agents/{id}/runtime-state/reset-session ארכיטקטורה (G12/INV-PORT1): כל המגע החדש עם Paperclip דרך השער בלבד — web/paperclip_client.py (shell) → re-export ב-web/agent_platform_port.py → web/app.py צורך מהשער. leak_guard.py עובר (seam שלם). אסור kill ישיר על process_pid (עוקף את השער). Backend: - paperclip_client: list_live_runs / get_run_log / get_run_events / cancel_run / reset_agent_session - agent_platform_port: re-export pc_list_live_runs / pc_get_run_log / pc_get_run_events / pc_cancel_run / pc_reset_agent_session - app.py: GET /api/operations/agents (אגרגציה CMP+CMPA, עמיד לכשל-חברה), GET .../runs/{id}/log, GET .../runs/{id}/events, POST .../runs/{id}/cancel, POST .../agents/{id}/reset-session Frontend: פאנל "סוכנים פעילים" ב-/operations (polling 4s) + dialog ללוג חי (פרסור NDJSON→טקסט קריא) + כפתורי עצור/אפס. הוספת hooks ל-operations.ts. בטיחות: cancel על דריינר הלכות בטוח — חילוץ checkpointed per-chunk + resumable + self-heal לשורות processing. Invariants: מקיים G12/INV-PORT1 (שער-הפלטפורמה). נוגע X6 (UI↔API). api:types יורץ אחרי deploy (openapi.json חי). Co-Authored-By: Claude Opus 4.8 (1M context) --- web-ui/src/app/operations/page.tsx | 198 +++++++++++++++++++++++++++++ web-ui/src/lib/api/operations.ts | 85 +++++++++++++ web/agent_platform_port.py | 11 ++ web/app.py | 112 ++++++++++++++++ web/paperclip_client.py | 71 +++++++++++ 5 files changed, 477 insertions(+) diff --git a/web-ui/src/app/operations/page.tsx b/web-ui/src/app/operations/page.tsx index 96625ac..3fc752f 100644 --- a/web-ui/src/app/operations/page.tsx +++ b/web-ui/src/app/operations/page.tsx @@ -1,5 +1,6 @@ "use client"; +import { useState } from "react"; import Link from "next/link"; import { AppShell } from "@/components/app-shell"; import { Card, CardContent } from "@/components/ui/card"; @@ -7,13 +8,26 @@ import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { Switch } from "@/components/ui/switch"; import { Skeleton } from "@/components/ui/skeleton"; +import { ScrollArea } from "@/components/ui/scroll-area"; +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, + DialogDescription, +} from "@/components/ui/dialog"; import { useOperations, useServiceAction, useDrainToggle, + useAgentRuns, + useRunLog, + useCancelRun, + useResetAgentSession, type OpsService, type OperationsSnapshot, type PipelineStats, + type AgentRun, } from "@/lib/api/operations"; function mb(bytes: number): string { @@ -337,6 +351,188 @@ function PipelineCard({ ); } +// ── Live agents — who's working now + output + controls ──────────────────── + +// The platform's own liveness signal → a Hebrew label + tone. +const SILENCE_HE: Record = { + ok: { label: "פעיל", tone: "text-emerald-600" }, + suspicion: { label: "שקט חשוד", tone: "text-gold-deep" }, + critical: { label: "תקוע?", tone: "text-destructive" }, +}; + +/** Best-effort: turn the captured NDJSON stream into readable lines (tail). */ +function parseRunLog(content: string, maxLines = 400): string { + if (!content) return ""; + const out: string[] = []; + for (const raw of content.split("\n")) { + const line = raw.trim(); + if (!line) continue; + let chunk = line; + try { + const wrap = JSON.parse(line); + chunk = typeof wrap.chunk === "string" ? wrap.chunk : line; + } catch { + // not a wrapper line — keep raw + } + // The chunk is often a claude stream-json event; extract the human bits. + for (const part of chunk.split("\n")) { + const p = part.trim(); + if (!p) continue; + try { + const ev = JSON.parse(p); + if (ev?.type === "assistant" && ev?.message?.content) { + const txt = (ev.message.content as Array<{ type: string; text?: string }>) + .filter((c) => c.type === "text" && c.text) + .map((c) => c.text) + .join(""); + if (txt) out.push(txt); + } else if (ev?.type === "result" && typeof ev.result === "string") { + out.push(`▸ ${ev.result}`); + } else if (ev?.type === "system" && ev?.subtype) { + out.push(`· [${ev.subtype}]`); + } else { + out.push(p); + } + } catch { + out.push(p); + } + } + } + return out.slice(-maxLines).join("\n"); +} + +function RunLogDialog({ run, onClose }: { run: AgentRun | null; onClose: () => void }) { + const { data, isLoading, error } = useRunLog(run?.run_id ?? null); + const text = data ? parseRunLog(data.content) : ""; + return ( + !o && onClose()}> + + + פלט הסוכן — {run?.agent_name} + + {run?.company_label} · ריצה {run?.run_id?.slice(0, 8)} · מתעדכן חי + + + {isLoading ? ( + + ) : error ? ( +

שגיאה בטעינת הלוג: {String(error)}

+ ) : ( + +
+              {text || "אין פלט עדיין."}
+            
+
+ )} +
+
+ ); +} + +function LiveAgentsPanel() { + const { data, isLoading } = useAgentRuns(); + const cancel = useCancelRun(); + const reset = useResetAgentSession(); + const [logRun, setLogRun] = useState(null); + const busy = cancel.isPending || reset.isPending; + + return ( + + +
+

סוכנים פעילים

+ {data ? ( +
+ רצים {data.running} + בתור {data.queued} +
+ ) : null} +
+

+ מי מבין סוכני-הוועדה עובד כרגע ומה הפלט שלו — כולל עבודה שלא קשורה לתיק (כמו + ריקון תור הלכות ע״י ה-CEO). עצירה היא מבוקרת דרך הפלטפורמה (לא kill). +

+ {isLoading || !data ? ( + + ) : data.runs.length === 0 ? ( +

אין סוכן פעיל כרגע.

+ ) : ( +
+ {data.errors.length > 0 ? ( +

+ לא ניתן לטעון חלק מהחברות: {data.errors.join(" · ")} +

+ ) : null} + {data.runs.map((r) => { + const sil = SILENCE_HE[r.silence_level]; + const startMs = r.started_at ? Date.parse(r.started_at) : 0; + return ( +
+
+
+ + {r.status === "running" ? "רץ" : "בתור"} + + {r.agent_name} + {sil ? ● {sil.label} : null} +
+
+ {r.company_label} + {r.status === "running" && startMs ? החל {ago(startMs)} : null} + {r.invocation_source ? ( + {r.invocation_source} + ) : null} + {r.continuation_attempt > 0 ? ניסיון #{r.continuation_attempt + 1} : null} +
+
+
+ + + +
+
+ ); + })} +
+ )} +
+ setLogRun(null)} /> +
+ ); +} + export default function OperationsPage() { const { data, isLoading, error } = useOperations(); @@ -369,6 +565,8 @@ export default function OperationsPage() { ) : ( <> + +
diff --git a/web-ui/src/lib/api/operations.ts b/web-ui/src/lib/api/operations.ts index abf76c5..d884cd5 100644 --- a/web-ui/src/lib/api/operations.ts +++ b/web-ui/src/lib/api/operations.ts @@ -103,3 +103,88 @@ export function useDrainToggle() { onError: (e) => toast.error(`העדכון נכשל: ${String(e)}`), }); } + +// ── Live agents — which agent is working now + its output + controls ─────── + +export type AgentRun = { + run_id: string; + agent_id: string; + agent_name: string; + company_id: string; + company_label: string; + status: string; // running | queued | ... + invocation_source: string; + trigger_detail: string; + issue_id: string | null; + adapter_type: string; + started_at: string | null; + created_at: string | null; + last_output_at: string | null; + continuation_attempt: number; + silence_level: string; // "" | ok | suspicion | critical + silence_age_ms: number; +}; + +export type AgentRunsResponse = { + runs: AgentRun[]; + running: number; + queued: number; + errors: string[]; +}; + +export type RunLog = { + runId: string; + store: string; + logRef: string; + content: string; // NDJSON stream the adapter captured +}; + +/** Queued + running heartbeat runs across all companies. */ +export function useAgentRuns() { + return useQuery({ + queryKey: ["operations", "agents"], + queryFn: ({ signal }) => + apiRequest("/api/operations/agents", { signal }), + refetchInterval: 4000, // live view of who's working now + staleTime: 2000, + }); +} + +/** Full output log of one run — fetched on demand (drawer open). */ +export function useRunLog(runId: string | null) { + return useQuery({ + queryKey: ["operations", "agents", "log", runId], + queryFn: ({ signal }) => + apiRequest(`/api/operations/agents/runs/${runId}/log`, { signal }), + enabled: !!runId, + refetchInterval: runId ? 4000 : false, // live tail while open + }); +} + +/** Gracefully cancel a queued/running run (not a raw kill). */ +export function useCancelRun() { + const qc = useQueryClient(); + return useMutation({ + mutationFn: (runId: string) => + apiRequest(`/api/operations/agents/runs/${runId}/cancel`, { method: "POST" }), + onSuccess: () => { + toast.success("בקשת עצירה נשלחה"); + qc.invalidateQueries({ queryKey: ["operations", "agents"] }); + }, + onError: (e) => toast.error(`העצירה נכשלה: ${String(e)}`), + }); +} + +/** Reset a wedged agent session so its next wakeup starts clean. */ +export function useResetAgentSession() { + const qc = useQueryClient(); + return useMutation({ + mutationFn: (agentId: string) => + apiRequest(`/api/operations/agents/${agentId}/reset-session`, { method: "POST" }), + onSuccess: () => { + toast.success("ה-session אופס"); + qc.invalidateQueries({ queryKey: ["operations", "agents"] }); + }, + onError: (e) => toast.error(`האיפוס נכשל: ${String(e)}`), + }); +} diff --git a/web/agent_platform_port.py b/web/agent_platform_port.py index 86a2ae8..4a02de6 100644 --- a/web/agent_platform_port.py +++ b/web/agent_platform_port.py @@ -35,6 +35,7 @@ from web.paperclip_client import ( COMPANIES as PAPERCLIP_COMPANIES, accept_interaction as pc_accept_interaction, archive_project as pc_archive_project, + cancel_run as pc_cancel_run, create_project as pc_create_project, create_workflow_issue as pc_create_workflow_issue, get_agents_for_case as pc_get_agents_for_case, @@ -43,8 +44,12 @@ from web.paperclip_client import ( get_issue_comments as pc_get_issue_comments, get_issue_interactions as pc_get_issue_interactions, get_project_url, + get_run_events as pc_get_run_events, + get_run_log as pc_get_run_log, + list_live_runs as pc_list_live_runs, post_comment as pc_post_comment, reject_interaction as pc_reject_interaction, + reset_agent_session as pc_reset_agent_session, respond_to_interaction as pc_respond_to_interaction, restore_project as pc_restore_project, wake_analyst_for_appraiser_facts as pc_wake_analyst_for_appraiser_facts, @@ -93,4 +98,10 @@ __all__ = [ "pc_accept_interaction", "pc_reject_interaction", "pc_respond_to_interaction", + # agent-run observability + control (live view + smart management) + "pc_list_live_runs", + "pc_get_run_log", + "pc_get_run_events", + "pc_cancel_run", + "pc_reset_agent_session", ] diff --git a/web/app.py b/web/app.py index 5451f59..e31f2fd 100644 --- a/web/app.py +++ b/web/app.py @@ -55,6 +55,7 @@ from web.agent_platform_port import ( get_project_url, pc_accept_interaction, pc_archive_project, + pc_cancel_run, pc_create_project, pc_create_workflow_issue, pc_get_agents, @@ -62,9 +63,13 @@ from web.agent_platform_port import ( pc_get_case_issues, pc_get_issue_comments, pc_get_issue_interactions, + pc_get_run_events, + pc_get_run_log, + pc_list_live_runs, pc_post_comment, pc_reject_interaction, pc_request, + pc_reset_agent_session, pc_respond_to_interaction, pc_restore_project, pc_wake_analyst_for_appraiser_facts, @@ -6565,6 +6570,113 @@ async def operations_drain_toggle(name: str, body: dict = Body(...)): return {"ok": True, "name": name, "disabled": disabled} +# ── Live agents (/operations "סוכנים פעילים") ────────────────────────────── +# What the pm2/queue panels can't show: WHICH agent is doing the work right now +# and its live output. An agent-driven drain (e.g. the CEO heartbeat draining +# the halacha queue) is neither a pm2 service nor visible per-case, so this +# pulls Paperclip's own heartbeat-run view through the platform Port (G12) and +# adds the controls to manage a stuck/runaway run. + +_OPS_COMPANY_LABELS = { + PAPERCLIP_COMPANIES["licensing"]: "CMP — רישוי ובניה", + PAPERCLIP_COMPANIES["betterment"]: "CMPA — היטלי השבחה", +} + + +def _shape_live_run(raw: dict, company_id: str) -> dict: + """Flatten one Paperclip live-run into the dashboard's snake_case shape.""" + silence = raw.get("outputSilence") or {} + return { + "run_id": raw.get("id"), + "agent_id": raw.get("agentId"), + "agent_name": raw.get("agentName") or "—", + "company_id": company_id, + "company_label": _OPS_COMPANY_LABELS.get(company_id, ""), + "status": raw.get("status") or "unknown", + "invocation_source": raw.get("invocationSource") or "", + "trigger_detail": raw.get("triggerDetail") or "", + "issue_id": raw.get("issueId"), + "adapter_type": raw.get("adapterType") or "", + "started_at": raw.get("startedAt"), + "created_at": raw.get("createdAt"), + "last_output_at": raw.get("lastOutputAt") or silence.get("lastOutputAt"), + "continuation_attempt": raw.get("continuationAttempt") or 0, + # Platform's own liveness signal: ok | suspicion | critical. + "silence_level": silence.get("level") or "", + "silence_age_ms": silence.get("silenceAgeMs") or 0, + } + + +@app.get("/api/operations/agents") +async def operations_agents(): + """Queued + running heartbeat runs across all companies (read-only). + + Tolerates a per-company Paperclip hiccup: returns whatever it could fetch + plus an ``errors`` list, so one company's outage never blanks the panel.""" + company_ids = list(_OPS_COMPANY_LABELS.keys()) + results = await asyncio.gather( + *(pc_list_live_runs(cid) for cid in company_ids), + return_exceptions=True, + ) + runs: list[dict] = [] + errors: list[str] = [] + for cid, res in zip(company_ids, results): + if isinstance(res, Exception): + logger.warning("live-runs fetch failed for company %s: %s", cid, res) + errors.append(f"{_OPS_COMPANY_LABELS.get(cid, cid)}: {type(res).__name__}") + continue + for raw in res: + runs.append(_shape_live_run(raw, cid)) + + # Running first, then queued; within each, oldest start first. + order = {"running": 0, "queued": 1} + runs.sort(key=lambda r: (order.get(r["status"], 2), r["started_at"] or r["created_at"] or "")) + return { + "runs": runs, + "running": sum(1 for r in runs if r["status"] == "running"), + "queued": sum(1 for r in runs if r["status"] != "running"), + "errors": errors, + } + + +@app.get("/api/operations/agents/runs/{run_id}/log") +async def operations_agent_run_log(run_id: str): + """Full output log (NDJSON stream) of one heartbeat run.""" + try: + return await pc_get_run_log(run_id) + except httpx.HTTPError as e: + raise HTTPException(502, f"שגיאת Paperclip בשליפת לוג: {type(e).__name__}") from e + + +@app.get("/api/operations/agents/runs/{run_id}/events") +async def operations_agent_run_events(run_id: str): + """Lifecycle/event timeline of one heartbeat run.""" + try: + return {"events": await pc_get_run_events(run_id)} + except httpx.HTTPError as e: + raise HTTPException(502, f"שגיאת Paperclip בשליפת אירועים: {type(e).__name__}") from e + + +@app.post("/api/operations/agents/runs/{run_id}/cancel") +async def operations_agent_run_cancel(run_id: str): + """Gracefully cancel a queued/running heartbeat run (not a raw kill).""" + try: + result = await pc_cancel_run(run_id) + except httpx.HTTPError as e: + raise HTTPException(502, f"שגיאת Paperclip בעצירת ריצה: {type(e).__name__}") from e + return {"ok": True, "run_id": run_id, "result": result} + + +@app.post("/api/operations/agents/{agent_id}/reset-session") +async def operations_agent_reset_session(agent_id: str): + """Reset a wedged agent session so its next wakeup starts clean.""" + try: + result = await pc_reset_agent_session(agent_id) + except httpx.HTTPError as e: + raise HTTPException(502, f"שגיאת Paperclip באיפוס session: {type(e).__name__}") from e + return {"ok": True, "agent_id": agent_id, "result": result} + + @app.get("/api/digests/{digest_id}") async def digest_get(digest_id: str): try: diff --git a/web/paperclip_client.py b/web/paperclip_client.py index c56d7cd..56678fe 100644 --- a/web/paperclip_client.py +++ b/web/paperclip_client.py @@ -720,6 +720,77 @@ async def get_issue_interactions(issue_ids: list[str]) -> list[dict]: await conn.close() +# ── Agent-run observability + control ─────────────────────────────────────── +# Live view of which agents are actually working right now + their output, and +# the controls to manage a stuck/runaway run. These wrap Paperclip's own +# heartbeat-run API (verified live): we use the *graceful* platform endpoints +# (cancel / reset-session) — never a raw kill on the run's process_pid, which +# would bypass the platform's watchdog and our extractors' per-chunk +# checkpointing. The only seam allowed to call these is web/agent_platform_port. + +async def list_live_runs(company_id: str) -> list[dict]: + """Queued + running heartbeat runs for a company (GET .../live-runs). + + Each row carries ``agentName``/``status``/``issueId``/``startedAt`` and an + ``outputSilence`` block (``level`` ok|suspicion|critical) — the platform's + own liveness signal, surfaced so the UI can flag a stalled run. + """ + resp = await pc_request( + "GET", f"/api/companies/{company_id}/live-runs", raise_on_error=True, + ) + data = resp.json() + return data if isinstance(data, list) else [] + + +async def get_run_log(run_id: str) -> dict: + """Full output log of a heartbeat run (GET /api/heartbeat-runs/{id}/log). + + Returns the platform payload as-is: ``{runId, store, logRef, content}`` + where ``content`` is the NDJSON stream the adapter captured. + """ + resp = await pc_request( + "GET", f"/api/heartbeat-runs/{run_id}/log", raise_on_error=True, + ) + return resp.json() + + +async def get_run_events(run_id: str) -> list[dict]: + """Lifecycle/event timeline of a heartbeat run (.../events).""" + resp = await pc_request( + "GET", f"/api/heartbeat-runs/{run_id}/events", raise_on_error=True, + ) + data = resp.json() + return data if isinstance(data, list) else [] + + +async def cancel_run(run_id: str) -> dict: + """Gracefully cancel a queued/running heartbeat run (POST .../cancel). + + The platform stops the run cleanly (process-group teardown + status flip), + respecting the watchdog. Safe for the halacha drain: its extractor is + checkpointed per-chunk and resumes on the next drain — a cancel loses at + most the in-flight chunk. + """ + resp = await pc_request( + "POST", f"/api/heartbeat-runs/{run_id}/cancel", json={}, + raise_on_error=True, + ) + return resp.json() + + +async def reset_agent_session(agent_id: str) -> dict: + """Reset an agent's runtime session (.../runtime-state/reset-session). + + Clears a wedged session so the next wakeup starts clean — the smart + alternative to cancelling individual runs when an agent loops. + """ + resp = await pc_request( + "POST", f"/api/agents/{agent_id}/runtime-state/reset-session", json={}, + raise_on_error=True, + ) + return resp.json() + + async def respond_to_interaction( issue_id: str, interaction_id: str, payload: dict, ) -> dict: