From 638eef680362a3885cf9592e04f0177d8875692f Mon Sep 17 00:00:00 2001 From: Chaim Date: Mon, 8 Jun 2026 08:57:23 +0000 Subject: [PATCH] =?UTF-8?q?feat(ops):=20/operations=20=E2=80=94=20=D7=9E?= =?UTF-8?q?=D7=95=D7=A0=D7=99-=D7=AA=D7=95=D7=A8=20=D7=90=D7=97=D7=99?= =?UTF-8?q?=D7=93=D7=99=D7=9D,=20"=D7=9E=D7=94=20=D7=A8=D7=A5=20=D7=A2?= =?UTF-8?q?=D7=9B=D7=A9=D7=99=D7=95",=20=D7=95=D7=A0=D7=99=D7=94=D7=95?= =?UTF-8?q?=D7=9C-=D7=AA=D7=94=D7=9C=D7=99=D7=9B=D7=99=D7=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit הדף הציג את התורים באופן לא-אחיד (by_status גולמי), בלי הבחנה בין "ממתין" (בקלוג: status=pending) ל"בתור" (התור הפעיל: requested_at IS NOT NULL), בלי הצגת הפריט שרץ כרגע, ובלי שום שליטה בתהליכים. מה נוסף: 1. כרטיסי-תור אחידים — בתור / ממתין(בקלוג) / בעיבוד / הושלם / נכשל + "רץ עכשיו" (citation/case_number של הפריט בעיבוד) לכל drain (אחזור-פסיקה, מטא-דאטה, הלכות, יומונים). שערי-אנוש (אישור-הלכות, פסיקה-חסרה) נשארים מוני-סטטוס. 2. פאנל ניהול-תהליכים בסגנון "שירותי Windows": - דמון (court-fetch-service/xvfb/chat/reaper): הפעל-מחדש / עצור / הפעל. - cron drain: "הרץ עכשיו" (pm2 restart) + מתג הפעל/כבה תזמון. 3. כל תגי-הסטטוס מתורגמים לעברית. מנגנון: - הפעל/כבה תזמון = דגל ב-DB (טבלה drain_controls). pm2 cron_restart מחיה תהליך שעוצר ב-stop, לכן ה"כיבוי" האמין הוא דגל שכל drain בודק ב-startup (no-op מיידי כשכבוי). הקונטיינר כותב/קורא ישירות מ-DB. - הרץ-עכשיו + restart/stop/start = proxy ל-pm2 דרך endpoint חדש בגשר-המארח (court_fetch_service /pm2/control), מאובטח Bearer + whitelist ל-legal-* בלבד. - יומונים: drain_digests הועבר מ-crontab ל-pm2 (legal-digest-drain.config.cjs) כדי שיופיע ויהיה שליט כמו כל drain. drain_halacha_queue.py הובא לבקרת-גרסאות. Invariants: מקיים G2 (הרחבת /operations + הגשר הקיים, לא מסלול מקביל) ו-G1 (drain_controls = מקור-אמת יחיד לכיבוי, נורמליזציה במקור ולא תיקון-בקריאה). אין בליעת שגיאות שקטה (הגשר מחזיר {ok,error}; המוטציות מציגות toast). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../legal_mcp/court_fetch_service/server.py | 112 +++++-- mcp-server/src/legal_mcp/services/db.py | 49 ++- scripts/SCRIPTS.md | 3 +- scripts/drain_court_fetch.py | 6 + scripts/drain_digests.py | 11 +- scripts/drain_halacha_queue.py | 47 +++ scripts/drain_metadata_queue.py | 6 + scripts/legal-digest-drain.config.cjs | 37 +++ web-ui/src/app/operations/page.tsx | 307 ++++++++++++++---- web-ui/src/lib/api/operations.ts | 62 +++- web/app.py | 134 +++++++- 11 files changed, 676 insertions(+), 98 deletions(-) create mode 100644 scripts/drain_halacha_queue.py create mode 100644 scripts/legal-digest-drain.config.cjs diff --git a/mcp-server/src/legal_mcp/court_fetch_service/server.py b/mcp-server/src/legal_mcp/court_fetch_service/server.py index a07f9ee..a7e455a 100644 --- a/mcp-server/src/legal_mcp/court_fetch_service/server.py +++ b/mcp-server/src/legal_mcp/court_fetch_service/server.py @@ -9,6 +9,9 @@ Endpoints: → {ok, content_b64, filename, source_url, court, reason} REQUIRES Authorization: Bearer . GET /health liveness (no auth); reports camofox + VNC URL if available. + GET /pm2 read-only pm2 status of legal-* / paperclip services (no auth). + POST /pm2/control body {name, action: restart|stop|start} → run pm2 on a + whitelisted legal-* process. REQUIRES Bearer (mutating). Run with pm2: pm2 start scripts/legal-court-fetch-service.config.cjs @@ -63,17 +66,38 @@ async def health(request: web.Request) -> web.Response: _PM2_PREFIXES = ("legal-", "paperclip") -async def pm2_status(request: web.Request) -> web.Response: - """Return a trimmed ``pm2 jlist`` for the legal-ai background services.""" +def _trim_service(a: dict) -> dict: + """Project a pm2 jlist app entry into the fields the dashboard needs.""" + env = a.get("pm2_env", {}) or {} + return { + "name": a.get("name", ""), + "status": env.get("status", ""), + "restarts": env.get("restart_time", 0), + "uptime_ms": env.get("pm_uptime", 0), + "cpu": (a.get("monit") or {}).get("cpu", 0), + "memory_bytes": (a.get("monit") or {}).get("memory", 0), + "cron": env.get("cron_restart") or "", + "autorestart": env.get("autorestart", True), + } + + +async def _pm2_run(*args: str, timeout: float = 10) -> tuple[int, bytes, bytes]: + """Run a pm2 subcommand; returns (returncode, stdout, stderr).""" import asyncio as _asyncio + proc = await _asyncio.create_subprocess_exec( + "pm2", *args, + stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE, + ) + out, err = await _asyncio.wait_for(proc.communicate(), timeout=timeout) + return proc.returncode or 0, out, err + + +async def pm2_status(request: web.Request) -> web.Response: + """Return a trimmed ``pm2 jlist`` for the legal-ai background services.""" try: - proc = await _asyncio.create_subprocess_exec( - "pm2", "jlist", - stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE, - ) - out, err = await _asyncio.wait_for(proc.communicate(), timeout=10) - if proc.returncode != 0: + rc, out, err = await _pm2_run("jlist") + if rc != 0: return web.json_response( {"error": f"pm2 jlist failed: {err.decode('utf-8','replace')[:200]}"}, status=502, @@ -84,26 +108,65 @@ async def pm2_status(request: web.Request) -> web.Response: except Exception as e: # never throw return web.json_response({"error": f"pm2 error: {e}"}, status=502) - services = [] - for a in apps: - name = a.get("name", "") - if not any(name.startswith(p) for p in _PM2_PREFIXES): - continue - env = a.get("pm2_env", {}) or {} - services.append({ - "name": name, - "status": env.get("status", ""), - "restarts": env.get("restart_time", 0), - "uptime_ms": env.get("pm_uptime", 0), - "cpu": (a.get("monit") or {}).get("cpu", 0), - "memory_bytes": (a.get("monit") or {}).get("memory", 0), - "cron": env.get("cron_restart") or "", - "autorestart": env.get("autorestart", True), - }) + services = [ + _trim_service(a) for a in apps + if any(str(a.get("name", "")).startswith(p) for p in _PM2_PREFIXES) + ] services.sort(key=lambda s: s["name"]) return web.json_response({"services": services}) +# Process control (restart/stop/start) for the dashboard's "Windows-services" +# panel. Mutating, so it requires the Bearer secret (unlike read-only /pm2). +# Whitelisted to ``legal-`` names only — never paperclip or arbitrary processes. +_PM2_ACTIONS = {"restart", "stop", "start"} + + +async def pm2_control(request: web.Request) -> web.Response: + """Run ``pm2 `` for a whitelisted legal-* process.""" + unauth = _check_bearer(request) + if unauth is not None: + return unauth + try: + body = await request.json() + except json.JSONDecodeError: + return web.json_response({"error": "invalid JSON body"}, status=400) + + name = str(body.get("name", "")).strip() + action = str(body.get("action", "")).strip() + if action not in _PM2_ACTIONS: + return web.json_response( + {"error": f"action must be one of {sorted(_PM2_ACTIONS)}"}, status=400 + ) + if not name.startswith("legal-"): + return web.json_response( + {"error": "name must be a legal-* process"}, status=403 + ) + + try: + rc, out, err = await _pm2_run(action, name, "--silent", timeout=30) + if rc != 0: + return web.json_response( + {"ok": False, + "error": f"pm2 {action} {name} failed: " + f"{err.decode('utf-8','replace')[:200]}"}, + status=502, + ) + # Re-read just this process so the UI settles on the real new state. + rc2, out2, _ = await _pm2_run("jlist") + svc = None + if rc2 == 0: + for a in json.loads(out2.decode("utf-8", "replace")): + if a.get("name") == name: + svc = _trim_service(a) + break + return web.json_response({"ok": True, "action": action, "service": svc}) + except FileNotFoundError: + return web.json_response({"error": "pm2 not found on PATH"}, status=502) + except Exception as e: # never throw + return web.json_response({"ok": False, "error": f"pm2 error: {e}"}, status=502) + + def _check_bearer(request: web.Request) -> web.Response | None: auth = request.headers.get("Authorization", "") expected = "Bearer " + _SHARED_SECRET @@ -156,6 +219,7 @@ def build_app() -> web.Application: app = web.Application(client_max_size=64 * 1024 * 1024) app.router.add_get("/health", health) app.router.add_get("/pm2", pm2_status) + app.router.add_post("/pm2/control", pm2_control) app.router.add_post("/fetch", fetch) return app diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 43816e8..d32e662 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -1401,6 +1401,21 @@ UPDATE digests SET digest_kind = CREATE INDEX IF NOT EXISTS idx_digests_kind ON digests(digest_kind); """ +SCHEMA_V33_SQL = """ +-- drain_controls: a per-drain "startup type" switch for the /operations +-- dashboard's process-management panel. pm2 cron_restart resurrects a stopped +-- cron job at the next tick, so `pm2 stop` is NOT a durable "disable" for the +-- drains. Instead each drain checks this flag at startup and no-ops when +-- disabled (like a Windows service set to Disabled). The container writes it +-- directly (no host roundtrip); the drains read it. name = the pm2 process +-- name (e.g. 'legal-metadata-drain'). +CREATE TABLE IF NOT EXISTS drain_controls ( + name TEXT PRIMARY KEY, + disabled BOOLEAN NOT NULL DEFAULT false, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +""" + async def _run_schema_migrations(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: @@ -1437,7 +1452,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None: await conn.execute(SCHEMA_V30_SQL) await conn.execute(SCHEMA_V31_SQL) await conn.execute(SCHEMA_V32_SQL) - logger.info("Database schema initialized (v1-v32)") + await conn.execute(SCHEMA_V33_SQL) + logger.info("Database schema initialized (v1-v33)") async def init_schema() -> None: @@ -6144,3 +6160,34 @@ async def court_fetch_job_list(status: str | None = None, limit: int = 100) -> l limit, ) return [_row_to_court_fetch_job(r) for r in rows] + + +# ── Drain controls (/operations process-management panel) ────────────────── +async def is_drain_disabled(name: str) -> bool: + """True if the named drain is switched off (drains check this at startup).""" + pool = await get_pool() + async with pool.acquire() as conn: + val = await conn.fetchval( + "SELECT disabled FROM drain_controls WHERE name = $1", name + ) + return bool(val) + + +async def set_drain_disabled(name: str, disabled: bool) -> None: + """Switch a drain on/off (upsert). name = pm2 process name.""" + pool = await get_pool() + async with pool.acquire() as conn: + await conn.execute( + "INSERT INTO drain_controls (name, disabled, updated_at) " + "VALUES ($1, $2, now()) " + "ON CONFLICT (name) DO UPDATE SET disabled = $2, updated_at = now()", + name, disabled, + ) + + +async def get_drain_controls() -> dict[str, bool]: + """Map of drain name → disabled flag (only rows that were ever toggled).""" + pool = await get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch("SELECT name, disabled FROM drain_controls") + return {r["name"]: bool(r["disabled"]) for r in rows} diff --git a/scripts/SCRIPTS.md b/scripts/SCRIPTS.md index 8bfa891..9b1fb48 100644 --- a/scripts/SCRIPTS.md +++ b/scripts/SCRIPTS.md @@ -98,7 +98,8 @@ | `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני | | `legal-halacha-drain.config.cjs` | pm2/js | **תזמון כל שעתיים של `drain_halacha_queue.py`** (cron `47 */2 * * *`, `HALACHA_DRAIN_CRON` לעקיפה) — מונע סתימה של תור חילוץ-ההלכות. קצב שמרני (Claude איטי + כל ריצה מוסיפה לתור-אישור-היו"ר). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | | `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) | -| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. | **cron יומי** (10:00, אחרי ה-poll של n8n; flock למניעת חפיפה → `data/digests/drain.log`) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` | +| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` | +| `legal-digest-drain.config.cjs` | pm2/js | **תזמון כל שעתיים של `drain_digests.py`** (cron `0 */2 * * *`, `DIGEST_DRAIN_CRON` לעקיפה) — הועבר מ-crontab של המערכת ל-pm2 כדי שיופיע ויהיה שליט בדף `/operations` (הרץ-עכשיו/הפעל/כבה). `autorestart:false` (one-shot per tick). דורש claude CLI + `VOYAGE_API_KEY`. התקנה: `pm2 start scripts/legal-digest-drain.config.cjs && pm2 save`. | pm2 cron (host-side) | ## סקריפטים שנמחקו (git history בלבד) diff --git a/scripts/drain_court_fetch.py b/scripts/drain_court_fetch.py index 41f26c6..96149fd 100644 --- a/scripts/drain_court_fetch.py +++ b/scripts/drain_court_fetch.py @@ -22,9 +22,15 @@ import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) from legal_mcp.services import court_fetch_orchestrator as orch +from legal_mcp.services import db async def main() -> int: + # /operations "disable" switch — no-op immediately if turned off (pm2 + # cron_restart can still fire a stopped job, so the gate lives in the DB). + if await db.is_drain_disabled("legal-court-fetch-drain"): + print("===SKIP=== legal-court-fetch-drain disabled via /operations", flush=True) + return 0 limit = int(sys.argv[1]) if len(sys.argv) > 1 else 5 res = await orch.drain_pending(limit=limit) print(f"===court-fetch drain=== processed={res.get('processed', 0)} " diff --git a/scripts/drain_digests.py b/scripts/drain_digests.py index 9b8fc32..12dc9df 100644 --- a/scripts/drain_digests.py +++ b/scripts/drain_digests.py @@ -12,8 +12,9 @@ rows still 'pending'; safe to re-run. The DB is the single source of truth. Used two ways: 1. Manually after a backfill: mcp-server/.venv/bin/python scripts/drain_digests.py - 2. Daily cron (after the n8n 09:30 Gmail poll) — see crontab; runs under flock - so a slow run never overlaps the next. Logs to data/digests/drain.log. + 2. pm2 cron ``legal-digest-drain`` (scripts/legal-digest-drain.config.cjs) — + one-shot per tick. Controllable from the /operations dashboard (run-now / + enable / disable). Logs to data/digests/drain.log. claude CLI must be on PATH (the cron line prepends ~/.local/bin). Config (POSTGRES_URL, VOYAGE_API_KEY) auto-loads from ~/.env via legal_mcp.config. @@ -36,6 +37,12 @@ CONCURRENCY = int(os.environ.get("DIGEST_DRAIN_CONCURRENCY", "3")) async def main() -> int: pool = await db.get_pool() + # /operations "disable" switch — no-op immediately if turned off (pm2 + # cron_restart can still fire a stopped job, so the gate lives in the DB). + if await db.is_drain_disabled("legal-digest-drain"): + print("===SKIP=== legal-digest-drain disabled via /operations", flush=True) + await db.close_pool() + return 0 # get_pool() runs schema migrations first — incl. the V32 digest_kind backfill # that classifies legacy rows — so the failure check below is safe from the # very first run (no legacy row has digest_kind=''). diff --git a/scripts/drain_halacha_queue.py b/scripts/drain_halacha_queue.py new file mode 100644 index 0000000..334f45f --- /dev/null +++ b/scripts/drain_halacha_queue.py @@ -0,0 +1,47 @@ +"""Drain the halacha extraction queue for the incoming batch. + +Calls the canonical process_pending_extractions(kind='halacha') in small batches +until the queue is empty (two consecutive zero-progress rounds). Serial + global +advisory-lock coordinated inside the service — avoids concurrent Claude load spikes. + +Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py +""" + +import asyncio +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) + +from legal_mcp.services import db +from legal_mcp.services import precedent_library as pl + + +async def main(): + # /operations "disable" switch — no-op immediately if turned off (pm2 + # cron_restart can still fire a stopped job, so the gate lives in the DB). + if await db.is_drain_disabled("legal-halacha-drain"): + print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True) + return + total = 0 + empty_rounds = 0 + rnd = 0 + while empty_rounds < 2: + rnd += 1 + out = await pl.process_pending_extractions(kind="halacha", limit=4) + processed = out.get("processed", 0) + total_pending = out.get("total_pending", 0) + total += processed + print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True) + for r in out.get("results", []): + print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True) + if processed == 0: + empty_rounds += 1 + await asyncio.sleep(5) + else: + empty_rounds = 0 + print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/drain_metadata_queue.py b/scripts/drain_metadata_queue.py index bd836f4..2b9b07c 100644 --- a/scripts/drain_metadata_queue.py +++ b/scripts/drain_metadata_queue.py @@ -18,10 +18,16 @@ import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src")) +from legal_mcp.services import db from legal_mcp.services import precedent_library as pl async def main() -> int: + # /operations "disable" switch — no-op immediately if turned off (pm2 + # cron_restart can still fire a stopped job, so the gate lives in the DB). + if await db.is_drain_disabled("legal-metadata-drain"): + print("===SKIP=== legal-metadata-drain disabled via /operations", flush=True) + return 0 batch = int(sys.argv[1]) if len(sys.argv) > 1 else 10 total = 0 empty_rounds = 0 diff --git a/scripts/legal-digest-drain.config.cjs b/scripts/legal-digest-drain.config.cjs new file mode 100644 index 0000000..83ed966 --- /dev/null +++ b/scripts/legal-digest-drain.config.cjs @@ -0,0 +1,37 @@ +/** + * pm2 ecosystem entry for legal-digest-drain — scheduled (every 2 h) drain of + * the digest-enrichment queue (X12: "כל יום" yomonim → Sonnet enrichment + + * embedding + autolink). Migrated from a bare system crontab line to pm2 so it + * appears in — and is controllable from — the /operations dashboard (run-now / + * enable / disable) like every other drain. + * + * Pattern: cron_restart fires the script on schedule; autorestart:false → runs + * once and exits (pm2 shows "stopped" between ticks — expected). The script + * already serialises itself (it self-heals stale 'processing' rows), so no flock + * is needed under pm2's one-shot model. + * + * Requires (host ~/.env via legal_mcp.config): POSTGRES_URL, VOYAGE_API_KEY, and + * the local `claude` CLI on PATH (the script prepends ~/.local/bin). + * + * Install (once): + * pm2 start /home/chaim/legal-ai/scripts/legal-digest-drain.config.cjs + * pm2 save + * Run now (manual): mcp-server/.venv/bin/python scripts/drain_digests.py + * Schedule override: DIGEST_DRAIN_CRON (default every 2 h at :00). + */ +const cron = process.env.DIGEST_DRAIN_CRON || "0 */2 * * *"; + +module.exports = { + apps: [ + { + name: "legal-digest-drain", + cwd: "/home/chaim/legal-ai", + script: "/home/chaim/legal-ai/mcp-server/.venv/bin/python", + args: "scripts/drain_digests.py", + env: { HOME: "/home/chaim", PYTHONUNBUFFERED: "1" }, + autorestart: false, // one-shot per cron tick + cron_restart: cron, + max_memory_restart: "800M", + }, + ], +}; diff --git a/web-ui/src/app/operations/page.tsx b/web-ui/src/app/operations/page.tsx index 4b2f307..96625ac 100644 --- a/web-ui/src/app/operations/page.tsx +++ b/web-ui/src/app/operations/page.tsx @@ -4,46 +4,81 @@ import Link from "next/link"; import { AppShell } from "@/components/app-shell"; import { Card, CardContent } from "@/components/ui/card"; import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { Switch } from "@/components/ui/switch"; import { Skeleton } from "@/components/ui/skeleton"; import { useOperations, + useServiceAction, + useDrainToggle, type OpsService, type OperationsSnapshot, + type PipelineStats, } from "@/lib/api/operations"; function mb(bytes: number): string { return `${Math.round((bytes || 0) / 1024 / 1024)}MB`; } -function uptime(ms: number): string { +function ago(ms: number): string { if (!ms) return "—"; const secs = Math.floor((Date.now() - ms) / 1000); - if (secs < 60) return `${secs}ש׳`; - if (secs < 3600) return `${Math.floor(secs / 60)}ד׳`; - if (secs < 86400) return `${Math.floor(secs / 3600)}ש׳`; - return `${Math.floor(secs / 86400)}י׳`; + if (secs < 60) return `לפני ${secs}ש׳`; + if (secs < 3600) return `לפני ${Math.floor(secs / 60)}ד׳`; + if (secs < 86400) return `לפני ${Math.floor(secs / 3600)}ש׳`; + return `לפני ${Math.floor(secs / 86400)}י׳`; +} + +// Hebrew labels for the raw status strings the backend reports. +const STATUS_HE: Record = { + online: "פעיל", + stopped: "עצור", + errored: "שגיאה", + launching: "עולה", + pending: "ממתין", + processing: "בעיבוד", + running: "רץ", + done: "הושלם", + completed: "הושלם", + failed: "נכשל", + manual: "ידני", + approved: "אושר", + pending_review: "ממתין לאישור", + rejected: "נדחה", + published: "פורסם", + deferred: "נדחה זמנית", + open: "פתוח", + closed: "סגור", + unknown: "לא ידוע", +}; + +function he(status: string): string { + return STATUS_HE[status] ?? status; } const STATUS_VARIANT: Record = { online: "default", done: "default", - approved: "default", completed: "default", + approved: "default", + published: "default", stopped: "secondary", pending: "secondary", pending_review: "secondary", + open: "secondary", running: "outline", processing: "outline", + launching: "outline", failed: "destructive", errored: "destructive", manual: "destructive", - open: "secondary", + rejected: "destructive", }; function StatusBadge({ value, count }: { value: string; count?: number }) { return ( - {value} + {he(value)} {count !== undefined ? {count} : null} ); @@ -52,56 +87,151 @@ function StatusBadge({ value, count }: { value: string; count?: number }) { const SERVICE_LABELS: Record = { "legal-court-fetch-service": "שירות אחזור פסיקה (דפדפן נט המשפט)", "legal-court-fetch-xvfb": "צג וירטואלי (Xvfb) לדפדפן", - "legal-court-fetch-drain": "תזמון: ניקוז תור אחזור (שעתי)", + "legal-court-fetch-drain": "תזמון: ניקוז תור אחזור פסיקה (שעתי)", "legal-metadata-drain": "תזמון: חילוץ מטא-דאטה (Gemini, ×15 דק׳)", "legal-halacha-drain": "תזמון: חילוץ הלכות (Claude, ×שעתיים)", + "legal-digest-drain": "תזמון: העשרת יומונים (Sonnet, ×שעתיים)", "legal-reaper": "מנקה תהליכים-יתומים (נגד דליפות זיכרון)", "legal-chat-service": "שירות צ׳אט אימון (גשר ל-claude CLI)", }; -function ServicesSection({ data }: { data: OperationsSnapshot }) { +// ── Process-management panel (the "Windows services" view) ───────────────── +function ServiceControls({ + s, + busy, + onAction, + onToggle, +}: { + s: OpsService; + busy: boolean; + onAction: (action: "restart" | "stop" | "start" | "run-now") => void; + onToggle: (disabled: boolean) => void; +}) { + const isCron = !!s.cron; + + if (isCron) { + // Cron drain: "run now" (pm2 restart) + an enable/disable switch (DB flag). + return ( +
+ + +
+ ); + } + + // Daemon: restart / stop / start. + const online = s.status === "online"; + return ( +
+ + {online ? ( + + ) : ( + + )} +
+ ); +} + +function ServicesPanel({ data }: { data: OperationsSnapshot }) { + const action = useServiceAction(); + const toggle = useDrainToggle(); + const busy = action.isPending || toggle.isPending; + return ( -

שירותי רקע (pm2)

+

ניהול תהליכי-רקע (pm2)

- דמונים ומשימות-תזמון על שרת המארח. "cron" = רץ לפי לוח-זמנים (מציג - "stopped" בין הרצות — תקין). + כמו "שירותים" ב-Windows. דמון = שירות רץ-תמיד (הפעל-מחדש/עצור/הפעל). + תזמון (cron) = רץ לפי לוח-זמנים ("הרץ עכשיו" להרצה מיידית, ומתג + הפעלה/כיבוי של התזמון).

{data.services_error ? (

{data.services_error}

) : data.services.length === 0 ? (

אין שירותים.

) : ( -
- {data.services.map((s: OpsService) => ( -
-
-
- - {s.cron ? ( - - {s.cron} +
+ {data.services.map((s: OpsService) => { + const isCron = !!s.cron; + return ( +
+
+
+ {isCron ? ( + + {s.disabled ? "כבוי" : "פעיל (מתוזמן)"} + + ) : ( + + )} + {s.cron ? ( + + {s.cron} + + ) : null} +
+
+ {SERVICE_LABELS[s.name] ?? s.name} +
+
+ + {s.name} - ) : null} -
-
- {SERVICE_LABELS[s.name] ?? s.name} -
-
- {s.name} + {mb(s.memory_bytes)} + ↻{s.restarts} + {isCron ? `ריצה אחרונה ${ago(s.uptime_ms)}` : ago(s.uptime_ms)} +
+ action.mutate({ name: s.name, action: a })} + onToggle={(disabled) => toggle.mutate({ name: s.name, disabled })} + />
-
-
{mb(s.memory_bytes)}
-
↻{s.restarts}
- {!s.cron ?
{uptime(s.uptime_ms)}
: null} -
-
- ))} + ); + })}
)} @@ -109,6 +239,68 @@ function ServicesSection({ data }: { data: OperationsSnapshot }) { ); } +// ── Uniform queue stats ──────────────────────────────────────────────────── +function StatTile({ + label, + value, + tone, + title, +}: { + label: string; + value: number; + tone: "navy" | "muted" | "amber" | "green" | "red"; + title?: string; +}) { + const toneClass = { + navy: "text-navy", + muted: "text-ink-muted", + amber: "text-gold-deep", + green: "text-emerald-600", + red: "text-destructive", + }[tone]; + return ( +
+ {value} + {label} +
+ ); +} + +function UniformStats({ p }: { p: PipelineStats }) { + return ( +
+
+ + + + + {p.failed > 0 ? : null} +
+ {p.running_now.length > 0 ? ( +
+ רץ עכשיו: + {p.running_now.join(" · ")} +
+ ) : ( +
אין פריט בעיבוד כרגע
+ )} +
+ ); +} + function StatusRow({ by }: { by: Record }) { const entries = Object.entries(by).filter(([, n]) => n > 0); if (entries.length === 0) return ריק; @@ -134,7 +326,7 @@ function PipelineCard({ }) { return ( - +

{title}

{desc}

@@ -160,7 +352,7 @@ export default function OperationsPage() {

תפעול — מה רץ ברקע

כל מה שמוריד ומנתח אוטומטית: שירותי-המארח, משימות-התזמון, ותורי-העבודה. - מתרענן כל 5 שניות. + ניתן להפעיל-מחדש / לעצור / להריץ-עכשיו כל תהליך. מתרענן כל 5 שניות.

@@ -177,39 +369,44 @@ export default function OperationsPage() {
) : ( <> - +
- + - -

- בתור לחילוץ: {data.pipelines.metadata_extraction.queued} -

+
- + + + + +

- בתור לחילוץ: {data.pipelines.halacha_extraction.queued} + {data.pipelines.digests.linked}/{data.pipelines.digests.total} מקושרים + לפסיקה

@@ -220,16 +417,6 @@ export default function OperationsPage() { > - - -

- {data.pipelines.digests.linked}/{data.pipelines.digests.total} מקושרים - לפסיקה -

-
diff --git a/web-ui/src/lib/api/operations.ts b/web-ui/src/lib/api/operations.ts index be740c4..abf76c5 100644 --- a/web-ui/src/lib/api/operations.ts +++ b/web-ui/src/lib/api/operations.ts @@ -1,4 +1,5 @@ -import { useQuery } from "@tanstack/react-query"; +import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { toast } from "sonner"; import { apiRequest } from "./client"; export type OpsService = { @@ -10,6 +11,7 @@ export type OpsService = { memory_bytes: number; cron: string; autorestart: boolean; + disabled?: boolean; // cron drain switched off via the dashboard }; export type CourtFetchJob = { @@ -28,16 +30,27 @@ export type IngestedRow = { created_at: string; }; +/** The uniform per-pipeline shape every background drain reports. */ +export type PipelineStats = { + pending: number; // backlog: rows not yet processed (status default) + processing: number; // being worked right now + done: number; // completed + failed: number; // terminal failures (court_fetch folds in 'manual') + queued: number; // explicitly enqueued for the next drain run + running_now: string[]; // human labels of the items currently processing + by_status: Record; // raw counts, for the curious +}; + export type OperationsSnapshot = { services: OpsService[]; services_error: string | null; pipelines: { - court_fetch: { by_status: Record; recent: CourtFetchJob[] }; - metadata_extraction: { by_status: Record; queued: number }; - halacha_extraction: { by_status: Record; queued: number }; + court_fetch: PipelineStats & { recent: CourtFetchJob[] }; + metadata_extraction: PipelineStats; + halacha_extraction: PipelineStats; + digests: PipelineStats & { total: number; linked: number }; halacha_review: { by_status: Record }; missing_precedents: { by_status: Record }; - digests: { total: number; linked: number }; ingested_recent: IngestedRow[]; }; }; @@ -51,3 +64,42 @@ export function useOperations() { staleTime: 3000, }); } + +export type ServiceAction = "restart" | "stop" | "start" | "run-now"; + +/** Control a background service (daemon restart/stop/start, or run a drain now). */ +export function useServiceAction() { + const qc = useQueryClient(); + return useMutation({ + mutationFn: ({ name, action }: { name: string; action: ServiceAction }) => + apiRequest(`/api/operations/services/${name}/${action}`, { method: "POST" }), + onSuccess: (_d, { action }) => { + const labels: Record = { + "run-now": "הופעל עכשיו", + restart: "הופעל מחדש", + stop: "נעצר", + start: "הופעל", + }; + toast.success(labels[action]); + qc.invalidateQueries({ queryKey: ["operations"] }); + }, + onError: (e) => toast.error(`הפעולה נכשלה: ${String(e)}`), + }); +} + +/** Switch a cron drain on/off (its "startup type"). */ +export function useDrainToggle() { + const qc = useQueryClient(); + return useMutation({ + mutationFn: ({ name, disabled }: { name: string; disabled: boolean }) => + apiRequest(`/api/operations/drains/${name}/disabled`, { + method: "POST", + body: { disabled }, + }), + onSuccess: (_d, { disabled }) => { + toast.success(disabled ? "התזמון כובה" : "התזמון הופעל"); + qc.invalidateQueries({ queryKey: ["operations"] }); + }, + onError: (e) => toast.error(`העדכון נכשל: ${String(e)}`), + }); +} diff --git a/web/app.py b/web/app.py index 2319493..2c63921 100644 --- a/web/app.py +++ b/web/app.py @@ -22,7 +22,7 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / " import zipfile -from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile +from fastapi import BackgroundTasks, Body, FastAPI, File, Form, HTTPException, UploadFile from fastapi.responses import FileResponse, StreamingResponse from typing import Any, Literal from pydantic import BaseModel @@ -6030,6 +6030,49 @@ async def _ops_pm2_services() -> dict: return {"services": [], "error": f"לא ניתן להגיע לשירות-המארח: {e}"} +async def _ops_pm2_control(name: str, action: str) -> dict: + """Proxy a mutating pm2 action to the host bridge (Bearer-authenticated).""" + secret = os.environ.get("COURT_FETCH_SHARED_SECRET", "").strip() + headers = {"Authorization": f"Bearer {secret}"} if secret else {} + async with httpx.AsyncClient(timeout=35.0) as client: + r = await client.post( + f"{_COURT_FETCH_SERVICE_URL}/pm2/control", + json={"name": name, "action": action}, headers=headers, + ) + try: + payload = r.json() + except Exception: + payload = {"error": r.text[:200]} + if r.status_code != 200: + raise HTTPException(r.status_code, payload.get("error", "pm2 control failed")) + return payload + + +def _norm_pipeline( + by_status: dict, *, + pending: tuple[str, ...], processing: tuple[str, ...], + done: tuple[str, ...], failed: tuple[str, ...], + queued: int, running_now: list[str], extra: dict | None = None, +) -> dict: + """Project a raw status-count map into the dashboard's uniform shape: + pending / processing / done / failed / queued + the live running items.""" + def total(keys: tuple[str, ...]) -> int: + return sum(int(by_status.get(k, 0) or 0) for k in keys) + + out = { + "pending": total(pending), + "processing": total(processing), + "done": total(done), + "failed": total(failed), + "queued": queued, + "running_now": running_now, + "by_status": by_status, + } + if extra: + out.update(extra) + return out + + @app.get("/api/operations") async def operations_snapshot(): """Everything running in the background: services + pipelines/queues.""" @@ -6038,6 +6081,9 @@ async def operations_snapshot(): async def counts(sql: str) -> dict: return {r[0]: r[1] for r in await conn.fetch(sql)} + async def col(sql: str) -> list[str]: + return [r[0] for r in await conn.fetch(sql) if r[0]] + court_fetch = await counts( "SELECT status, count(*) FROM court_fetch_jobs GROUP BY 1" ) @@ -6047,6 +6093,10 @@ async def operations_snapshot(): "updated_at FROM court_fetch_jobs ORDER BY updated_at DESC LIMIT 15" ) ] + court_running = await col( + "SELECT coalesce(nullif(citation_raw,''), case_number_norm) " + "FROM court_fetch_jobs WHERE status = 'running' ORDER BY updated_at LIMIT 5" + ) meta = await counts( "SELECT coalesce(metadata_extraction_status,'unknown'), count(*) " "FROM case_law GROUP BY 1" @@ -6054,6 +6104,10 @@ async def operations_snapshot(): meta_queued = await conn.fetchval( "SELECT count(*) FROM case_law WHERE metadata_extraction_requested_at IS NOT NULL" ) + meta_running = await col( + "SELECT case_number FROM case_law WHERE metadata_extraction_status = 'processing' " + "ORDER BY metadata_extraction_requested_at NULLS LAST LIMIT 5" + ) hal_ext = await counts( "SELECT coalesce(halacha_extraction_status,'unknown'), count(*) " "FROM case_law GROUP BY 1" @@ -6061,12 +6115,24 @@ async def operations_snapshot(): hal_queued = await conn.fetchval( "SELECT count(*) FROM case_law WHERE halacha_extraction_requested_at IS NOT NULL" ) + hal_running = await col( + "SELECT case_number FROM case_law WHERE halacha_extraction_status = 'processing' " + "ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5" + ) review = await counts("SELECT review_status, count(*) FROM halachot GROUP BY 1") missing = await counts("SELECT status, count(*) FROM missing_precedents GROUP BY 1") + digest_ext = await counts( + "SELECT coalesce(extraction_status,'unknown'), count(*) FROM digests GROUP BY 1" + ) digests_total = await conn.fetchval("SELECT count(*) FROM digests") digests_linked = await conn.fetchval( "SELECT count(*) FROM digests WHERE linked_case_law_id IS NOT NULL" ) + digest_running = await col( + "SELECT coalesce(nullif(underlying_citation,''), nullif(concept_tag,''), " + "'יומון '||yomon_number) FROM digests WHERE extraction_status = 'processing' " + "ORDER BY updated_at LIMIT 5" + ) ingested_recent = [ dict(r) for r in await conn.fetch( "SELECT case_number, court, source_url, created_at FROM case_law " @@ -6075,6 +6141,9 @@ async def operations_snapshot(): ] pm2 = await _ops_pm2_services() + controls = await db.get_drain_controls() + for svc in pm2["services"]: + svc["disabled"] = controls.get(svc.get("name", ""), False) def _iso(rows: list[dict]) -> list[dict]: for d in rows: @@ -6087,17 +6156,72 @@ async def operations_snapshot(): "services": pm2["services"], "services_error": pm2["error"], "pipelines": { - "court_fetch": {"by_status": court_fetch, "recent": _iso(court_recent)}, - "metadata_extraction": {"by_status": meta, "queued": meta_queued}, - "halacha_extraction": {"by_status": hal_ext, "queued": hal_queued}, + "court_fetch": { + **_norm_pipeline( + court_fetch, + pending=("pending",), processing=("running",), done=("done",), + failed=("failed", "manual"), + queued=int(court_fetch.get("pending", 0)) + int(court_fetch.get("failed", 0)), + running_now=court_running, + ), + "recent": _iso(court_recent), + }, + "metadata_extraction": _norm_pipeline( + meta, + pending=("pending",), processing=("processing",), done=("completed",), + failed=("failed",), queued=meta_queued, running_now=meta_running, + ), + "halacha_extraction": _norm_pipeline( + hal_ext, + pending=("pending",), processing=("processing",), done=("completed",), + failed=("failed",), queued=hal_queued, running_now=hal_running, + ), + "digests": _norm_pipeline( + digest_ext, + pending=("pending",), processing=("processing",), done=("completed",), + failed=("failed",), + queued=int(digest_ext.get("pending", 0)), running_now=digest_running, + extra={"total": digests_total, "linked": digests_linked}, + ), + # Human gates, not background drains — surfaced as status counts only. "halacha_review": {"by_status": review}, "missing_precedents": {"by_status": missing}, - "digests": {"total": digests_total, "linked": digests_linked}, "ingested_recent": _iso(ingested_recent), }, } +_OPS_SERVICE_ACTIONS = {"restart", "stop", "start", "run-now"} + + +@app.post("/api/operations/services/{name}/{action}") +async def operations_service_action(name: str, action: str): + """Control a background service (restart/stop/start) or run a drain now. + + 'run-now' maps to pm2 restart — for a one-shot cron drain that fires the + job immediately. Whitelisted to legal-* (enforced again on the host).""" + if action not in _OPS_SERVICE_ACTIONS: + raise HTTPException(400, f"action חייב להיות אחד מ-{sorted(_OPS_SERVICE_ACTIONS)}") + if not name.startswith("legal-"): + raise HTTPException(403, "ניתן לשלוט רק בשירותי legal-*") + pm2_action = "restart" if action == "run-now" else action + return await _ops_pm2_control(name, pm2_action) + + +@app.post("/api/operations/drains/{name}/disabled") +async def operations_drain_toggle(name: str, body: dict = Body(...)): + """Switch a cron drain on/off (the 'startup type' in the services panel). + + Written straight to drain_controls — no host roundtrip; the drain reads the + flag at startup and no-ops when disabled (pm2 cron_restart can't be trusted + to stay stopped).""" + if not name.startswith("legal-"): + raise HTTPException(403, "ניתן לשלוט רק בשירותי legal-*") + disabled = bool(body.get("disabled")) + await db.set_drain_disabled(name, disabled) + return {"ok": True, "name": name, "disabled": disabled} + + @app.get("/api/digests/{digest_id}") async def digest_get(digest_id: str): try: -- 2.49.1