From 34d80a39e5ae15565bc08254455327617a60b38e Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 8 Jun 2026 07:28:41 +0000 Subject: [PATCH] =?UTF-8?q?feat(ops):=20/operations=20dashboard=20?= =?UTF-8?q?=E2=80=94=20everything=20running=20in=20the=20background?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A single live page for all the background work that downloads/analyses, so the chair can see what's running instead of guessing. - court_fetch_service: GET /pm2 (unauthenticated, host-only) → trimmed pm2 jlist for the legal-* services (status, restarts, mem, cron schedule). - FastAPI GET /api/operations: aggregates the DB-backed pipelines (court_fetch jobs, metadata + halacha extraction queues, halacha review gate, missing_precedents, digests, recent court ingests) and proxies the host /pm2 over the docker bridge (graceful if the host service is down). - web-ui /operations page (+ src/lib/api/operations.ts hook, nav entry under admin): services grid (with Hebrew labels + schedules) + pipeline cards + recent-fetch / recent-ingest lists. Auto-refreshes every 5s. tsc --noEmit clean; pm2 status carries nothing sensitive and the bind (10.0.1.1) is host/container-only, so /pm2 needs no secret. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../legal_mcp/court_fetch_service/server.py | 50 +++ web-ui/src/app/operations/page.tsx | 296 ++++++++++++++++++ web-ui/src/components/app-shell.tsx | 1 + web-ui/src/lib/api/operations.ts | 53 ++++ web/app.py | 88 ++++++ 5 files changed, 488 insertions(+) create mode 100644 web-ui/src/app/operations/page.tsx create mode 100644 web-ui/src/lib/api/operations.ts diff --git a/mcp-server/src/legal_mcp/court_fetch_service/server.py b/mcp-server/src/legal_mcp/court_fetch_service/server.py index c6b6136..a07f9ee 100644 --- a/mcp-server/src/legal_mcp/court_fetch_service/server.py +++ b/mcp-server/src/legal_mcp/court_fetch_service/server.py @@ -55,6 +55,55 @@ async def health(request: web.Request) -> web.Response: return web.json_response(info) +# Background services we surface on the /operations dashboard. pm2 jlist is a +# host-only capability (the legal-ai container can't run pm2), so the container's +# FastAPI proxies this read-only endpoint over the docker bridge. No secret: +# pm2 status (names/cpu/mem) carries nothing sensitive and the bind (10.0.1.1) +# is already host/container-only. +_PM2_PREFIXES = ("legal-", "paperclip") + + +async def pm2_status(request: web.Request) -> web.Response: + """Return a trimmed ``pm2 jlist`` for the legal-ai background services.""" + import asyncio as _asyncio + + try: + proc = await _asyncio.create_subprocess_exec( + "pm2", "jlist", + stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE, + ) + out, err = await _asyncio.wait_for(proc.communicate(), timeout=10) + if proc.returncode != 0: + return web.json_response( + {"error": f"pm2 jlist failed: {err.decode('utf-8','replace')[:200]}"}, + status=502, + ) + apps = json.loads(out.decode("utf-8", "replace")) + except FileNotFoundError: + return web.json_response({"error": "pm2 not found on PATH"}, status=502) + except Exception as e: # never throw + return web.json_response({"error": f"pm2 error: {e}"}, status=502) + + services = [] + for a in apps: + name = a.get("name", "") + if not any(name.startswith(p) for p in _PM2_PREFIXES): + continue + env = a.get("pm2_env", {}) or {} + services.append({ + "name": name, + "status": env.get("status", ""), + "restarts": env.get("restart_time", 0), + "uptime_ms": env.get("pm_uptime", 0), + "cpu": (a.get("monit") or {}).get("cpu", 0), + "memory_bytes": (a.get("monit") or {}).get("memory", 0), + "cron": env.get("cron_restart") or "", + "autorestart": env.get("autorestart", True), + }) + services.sort(key=lambda s: s["name"]) + return web.json_response({"services": services}) + + def _check_bearer(request: web.Request) -> web.Response | None: auth = request.headers.get("Authorization", "") expected = "Bearer " + _SHARED_SECRET @@ -106,6 +155,7 @@ async def fetch(request: web.Request) -> web.Response: def build_app() -> web.Application: app = web.Application(client_max_size=64 * 1024 * 1024) app.router.add_get("/health", health) + app.router.add_get("/pm2", pm2_status) app.router.add_post("/fetch", fetch) return app diff --git a/web-ui/src/app/operations/page.tsx b/web-ui/src/app/operations/page.tsx new file mode 100644 index 0000000..4b2f307 --- /dev/null +++ b/web-ui/src/app/operations/page.tsx @@ -0,0 +1,296 @@ +"use client"; + +import Link from "next/link"; +import { AppShell } from "@/components/app-shell"; +import { Card, CardContent } from "@/components/ui/card"; +import { Badge } from "@/components/ui/badge"; +import { Skeleton } from "@/components/ui/skeleton"; +import { + useOperations, + type OpsService, + type OperationsSnapshot, +} from "@/lib/api/operations"; + +function mb(bytes: number): string { + return `${Math.round((bytes || 0) / 1024 / 1024)}MB`; +} + +function uptime(ms: number): string { + if (!ms) return "—"; + const secs = Math.floor((Date.now() - ms) / 1000); + if (secs < 60) return `${secs}ש׳`; + if (secs < 3600) return `${Math.floor(secs / 60)}ד׳`; + if (secs < 86400) return `${Math.floor(secs / 3600)}ש׳`; + return `${Math.floor(secs / 86400)}י׳`; +} + +const STATUS_VARIANT: Record = { + online: "default", + done: "default", + approved: "default", + completed: "default", + stopped: "secondary", + pending: "secondary", + pending_review: "secondary", + running: "outline", + processing: "outline", + failed: "destructive", + errored: "destructive", + manual: "destructive", + open: "secondary", +}; + +function StatusBadge({ value, count }: { value: string; count?: number }) { + return ( + + {value} + {count !== undefined ? {count} : null} + + ); +} + +const SERVICE_LABELS: Record = { + "legal-court-fetch-service": "שירות אחזור פסיקה (דפדפן נט המשפט)", + "legal-court-fetch-xvfb": "צג וירטואלי (Xvfb) לדפדפן", + "legal-court-fetch-drain": "תזמון: ניקוז תור אחזור (שעתי)", + "legal-metadata-drain": "תזמון: חילוץ מטא-דאטה (Gemini, ×15 דק׳)", + "legal-halacha-drain": "תזמון: חילוץ הלכות (Claude, ×שעתיים)", + "legal-reaper": "מנקה תהליכים-יתומים (נגד דליפות זיכרון)", + "legal-chat-service": "שירות צ׳אט אימון (גשר ל-claude CLI)", +}; + +function ServicesSection({ data }: { data: OperationsSnapshot }) { + return ( + + +

שירותי רקע (pm2)

+

+ דמונים ומשימות-תזמון על שרת המארח. "cron" = רץ לפי לוח-זמנים (מציג + "stopped" בין הרצות — תקין). +

+ {data.services_error ? ( +

{data.services_error}

+ ) : data.services.length === 0 ? ( +

אין שירותים.

+ ) : ( +
+ {data.services.map((s: OpsService) => ( +
+
+
+ + {s.cron ? ( + + {s.cron} + + ) : null} +
+
+ {SERVICE_LABELS[s.name] ?? s.name} +
+
+ {s.name} +
+
+
+
{mb(s.memory_bytes)}
+
↻{s.restarts}
+ {!s.cron ?
{uptime(s.uptime_ms)}
: null} +
+
+ ))} +
+ )} +
+
+ ); +} + +function StatusRow({ by }: { by: Record }) { + const entries = Object.entries(by).filter(([, n]) => n > 0); + if (entries.length === 0) return ריק; + return ( +
+ {entries + .sort((a, b) => b[1] - a[1]) + .map(([k, n]) => ( + + ))} +
+ ); +} + +function PipelineCard({ + title, + desc, + children, +}: { + title: string; + desc: string; + children: React.ReactNode; +}) { + return ( + + +
+

{title}

+

{desc}

+
+ {children} +
+
+ ); +} + +export default function OperationsPage() { + const { data, isLoading, error } = useOperations(); + + return ( + +
+
+ +

תפעול — מה רץ ברקע

+

+ כל מה שמוריד ומנתח אוטומטית: שירותי-המארח, משימות-התזמון, ותורי-העבודה. + מתרענן כל 5 שניות. +

+
+ + {error ? ( + + + שגיאה בטעינת מצב התפעול: {String(error)} + + + ) : isLoading || !data ? ( +
+ + +
+ ) : ( + <> + + +
+ + + + + + +

+ בתור לחילוץ: {data.pipelines.metadata_extraction.queued} +

+
+ + + +

+ בתור לחילוץ: {data.pipelines.halacha_extraction.queued} +

+
+ + + + + + + + + + +

+ {data.pipelines.digests.linked}/{data.pipelines.digests.total} מקושרים + לפסיקה +

+
+
+ +
+ + +

+ אחזורים אחרונים (תור) +

+
+ {data.pipelines.court_fetch.recent.map((j) => ( +
+
+ + + {j.citation_raw?.slice(0, 48) || j.case_number_norm} + + {j.error ? ( +
+ {j.error.slice(0, 80)} +
+ ) : null} +
+ + {(j.tier || "").slice(0, 7)} + +
+ ))} +
+
+
+ + + +

+ נקלטו לאחרונה (מבתי-משפט) +

+
+ {data.pipelines.ingested_recent.map((r) => ( +
+ {r.case_number} + + {r.created_at?.slice(0, 16).replace("T", " ")} + +
+ ))} + {data.pipelines.ingested_recent.length === 0 ? ( +

עדיין אין.

+ ) : null} +
+
+
+
+ + )} +
+
+ ); +} diff --git a/web-ui/src/components/app-shell.tsx b/web-ui/src/components/app-shell.tsx index fd98e9d..66e1886 100644 --- a/web-ui/src/components/app-shell.tsx +++ b/web-ui/src/components/app-shell.tsx @@ -62,6 +62,7 @@ const NAV_GROUPS: NavGroup[] = [ const ADMIN_ITEMS: NavItem[] = [ { href: "/skills", label: "מיומנויות" }, + { href: "/operations", label: "תפעול" }, { href: "/diagnostics", label: "אבחון" }, { href: "/settings", label: "הגדרות" }, ]; diff --git a/web-ui/src/lib/api/operations.ts b/web-ui/src/lib/api/operations.ts new file mode 100644 index 0000000..be740c4 --- /dev/null +++ b/web-ui/src/lib/api/operations.ts @@ -0,0 +1,53 @@ +import { useQuery } from "@tanstack/react-query"; +import { apiRequest } from "./client"; + +export type OpsService = { + name: string; + status: string; + restarts: number; + uptime_ms: number; + cpu: number; + memory_bytes: number; + cron: string; + autorestart: boolean; +}; + +export type CourtFetchJob = { + case_number_norm: string; + citation_raw: string; + tier: string; + status: string; + error: string; + updated_at: string; +}; + +export type IngestedRow = { + case_number: string; + court: string; + source_url: string; + created_at: string; +}; + +export type OperationsSnapshot = { + services: OpsService[]; + services_error: string | null; + pipelines: { + court_fetch: { by_status: Record; recent: CourtFetchJob[] }; + metadata_extraction: { by_status: Record; queued: number }; + halacha_extraction: { by_status: Record; queued: number }; + halacha_review: { by_status: Record }; + missing_precedents: { by_status: Record }; + digests: { total: number; linked: number }; + ingested_recent: IngestedRow[]; + }; +}; + +export function useOperations() { + return useQuery({ + queryKey: ["operations"], + queryFn: ({ signal }) => + apiRequest("/api/operations", { signal }), + refetchInterval: 5000, // live view of background work + staleTime: 3000, + }); +} diff --git a/web/app.py b/web/app.py index 92475d8..7378cd0 100644 --- a/web/app.py +++ b/web/app.py @@ -6008,6 +6008,94 @@ async def digest_queue_pending(limit: int = 20): return {"items": items, "count": len(items)} +# ── Operations dashboard (/operations) ──────────────────────────────────── +# One snapshot of everything running in the background that downloads or +# analyses: the host pm2 services/crons + the DB-backed pipelines & queues. +_COURT_FETCH_SERVICE_URL = os.environ.get( + "COURT_FETCH_SERVICE_URL", "http://10.0.1.1:8771" +) + + +async def _ops_pm2_services() -> dict: + """Proxy the host court-fetch-service /pm2 (host-only capability).""" + try: + async with httpx.AsyncClient(timeout=8.0) as client: + r = await client.get(f"{_COURT_FETCH_SERVICE_URL}/pm2") + if r.status_code == 200: + return {"services": r.json().get("services", []), "error": None} + return {"services": [], "error": f"pm2 bridge {r.status_code}"} + except Exception as e: # host service down / unreachable + return {"services": [], "error": f"לא ניתן להגיע לשירות-המארח: {e}"} + + +@app.get("/api/operations") +async def operations_snapshot(): + """Everything running in the background: services + pipelines/queues.""" + pool = await db.get_pool() + async with pool.acquire() as conn: + async def counts(sql: str) -> dict: + return {r[0]: r[1] for r in await conn.fetch(sql)} + + court_fetch = await counts( + "SELECT status, count(*) FROM court_fetch_jobs GROUP BY 1" + ) + court_recent = [ + dict(r) for r in await conn.fetch( + "SELECT case_number_norm, citation_raw, tier, status, error, " + "updated_at FROM court_fetch_jobs ORDER BY updated_at DESC LIMIT 15" + ) + ] + meta = await counts( + "SELECT coalesce(metadata_extraction_status,'unknown'), count(*) " + "FROM case_law GROUP BY 1" + ) + meta_queued = await conn.fetchval( + "SELECT count(*) FROM case_law WHERE metadata_extraction_requested_at IS NOT NULL" + ) + hal_ext = await counts( + "SELECT coalesce(halacha_extraction_status,'unknown'), count(*) " + "FROM case_law GROUP BY 1" + ) + hal_queued = await conn.fetchval( + "SELECT count(*) FROM case_law WHERE halacha_extraction_requested_at IS NOT NULL" + ) + review = await counts("SELECT review_status, count(*) FROM halachot GROUP BY 1") + missing = await counts("SELECT status, count(*) FROM missing_precedents GROUP BY 1") + digests_total = await conn.fetchval("SELECT count(*) FROM digests") + digests_linked = await conn.fetchval( + "SELECT count(*) FROM digests WHERE linked_case_law_id IS NOT NULL" + ) + ingested_recent = [ + dict(r) for r in await conn.fetch( + "SELECT case_number, court, source_url, created_at FROM case_law " + "WHERE source_url LIKE '%court.gov.il%' ORDER BY created_at DESC LIMIT 12" + ) + ] + + pm2 = await _ops_pm2_services() + + def _iso(rows: list[dict]) -> list[dict]: + for d in rows: + for k, v in list(d.items()): + if hasattr(v, "isoformat"): + d[k] = v.isoformat() + return rows + + return { + "services": pm2["services"], + "services_error": pm2["error"], + "pipelines": { + "court_fetch": {"by_status": court_fetch, "recent": _iso(court_recent)}, + "metadata_extraction": {"by_status": meta, "queued": meta_queued}, + "halacha_extraction": {"by_status": hal_ext, "queued": hal_queued}, + "halacha_review": {"by_status": review}, + "missing_precedents": {"by_status": missing}, + "digests": {"total": digests_total, "linked": digests_linked}, + "ingested_recent": _iso(ingested_recent), + }, + } + + @app.get("/api/digests/{digest_id}") async def digest_get(digest_id: str): try: -- 2.49.1