feat(ops): /operations — מוני-תור אחידים, "מה רץ עכשיו", וניהול-תהליכים

הדף הציג את התורים באופן לא-אחיד (by_status גולמי), בלי הבחנה בין "ממתין"
(בקלוג: status=pending) ל"בתור" (התור הפעיל: requested_at IS NOT NULL), בלי
הצגת הפריט שרץ כרגע, ובלי שום שליטה בתהליכים.

מה נוסף:
1. כרטיסי-תור אחידים — בתור / ממתין(בקלוג) / בעיבוד / הושלם / נכשל + "רץ עכשיו"
   (citation/case_number של הפריט בעיבוד) לכל drain (אחזור-פסיקה, מטא-דאטה,
   הלכות, יומונים). שערי-אנוש (אישור-הלכות, פסיקה-חסרה) נשארים מוני-סטטוס.
2. פאנל ניהול-תהליכים בסגנון "שירותי Windows":
   - דמון (court-fetch-service/xvfb/chat/reaper): הפעל-מחדש / עצור / הפעל.
   - cron drain: "הרץ עכשיו" (pm2 restart) + מתג הפעל/כבה תזמון.
3. כל תגי-הסטטוס מתורגמים לעברית.

מנגנון:
- הפעל/כבה תזמון = דגל ב-DB (טבלה drain_controls). pm2 cron_restart מחיה תהליך
  שעוצר ב-stop, לכן ה"כיבוי" האמין הוא דגל שכל drain בודק ב-startup (no-op מיידי
  כשכבוי). הקונטיינר כותב/קורא ישירות מ-DB.
- הרץ-עכשיו + restart/stop/start = proxy ל-pm2 דרך endpoint חדש בגשר-המארח
  (court_fetch_service /pm2/control), מאובטח Bearer + whitelist ל-legal-* בלבד.
- יומונים: drain_digests הועבר מ-crontab ל-pm2 (legal-digest-drain.config.cjs)
  כדי שיופיע ויהיה שליט כמו כל drain. drain_halacha_queue.py הובא לבקרת-גרסאות.

Invariants: מקיים G2 (הרחבת /operations + הגשר הקיים, לא מסלול מקביל) ו-G1
(drain_controls = מקור-אמת יחיד לכיבוי, נורמליזציה במקור ולא תיקון-בקריאה).
אין בליעת שגיאות שקטה (הגשר מחזיר {ok,error}; המוטציות מציגות toast).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-08 08:57:23 +00:00
parent 6647aa92e6
commit 638eef6803
11 changed files with 676 additions and 98 deletions

View File

@@ -9,6 +9,9 @@ Endpoints:
{ok, content_b64, filename, source_url, court, reason}
REQUIRES Authorization: Bearer <COURT_FETCH_SHARED_SECRET>.
GET /health liveness (no auth); reports camofox + VNC URL if available.
GET /pm2 read-only pm2 status of legal-* / paperclip services (no auth).
POST /pm2/control body {name, action: restart|stop|start} → run pm2 on a
whitelisted legal-* process. REQUIRES Bearer (mutating).
Run with pm2:
pm2 start scripts/legal-court-fetch-service.config.cjs
@@ -63,17 +66,38 @@ async def health(request: web.Request) -> web.Response:
_PM2_PREFIXES = ("legal-", "paperclip")
async def pm2_status(request: web.Request) -> web.Response:
"""Return a trimmed ``pm2 jlist`` for the legal-ai background services."""
def _trim_service(a: dict) -> dict:
"""Project a pm2 jlist app entry into the fields the dashboard needs."""
env = a.get("pm2_env", {}) or {}
return {
"name": a.get("name", ""),
"status": env.get("status", ""),
"restarts": env.get("restart_time", 0),
"uptime_ms": env.get("pm_uptime", 0),
"cpu": (a.get("monit") or {}).get("cpu", 0),
"memory_bytes": (a.get("monit") or {}).get("memory", 0),
"cron": env.get("cron_restart") or "",
"autorestart": env.get("autorestart", True),
}
async def _pm2_run(*args: str, timeout: float = 10) -> tuple[int, bytes, bytes]:
"""Run a pm2 subcommand; returns (returncode, stdout, stderr)."""
import asyncio as _asyncio
proc = await _asyncio.create_subprocess_exec(
"pm2", *args,
stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE,
)
out, err = await _asyncio.wait_for(proc.communicate(), timeout=timeout)
return proc.returncode or 0, out, err
async def pm2_status(request: web.Request) -> web.Response:
"""Return a trimmed ``pm2 jlist`` for the legal-ai background services."""
try:
proc = await _asyncio.create_subprocess_exec(
"pm2", "jlist",
stdout=_asyncio.subprocess.PIPE, stderr=_asyncio.subprocess.PIPE,
)
out, err = await _asyncio.wait_for(proc.communicate(), timeout=10)
if proc.returncode != 0:
rc, out, err = await _pm2_run("jlist")
if rc != 0:
return web.json_response(
{"error": f"pm2 jlist failed: {err.decode('utf-8','replace')[:200]}"},
status=502,
@@ -84,26 +108,65 @@ async def pm2_status(request: web.Request) -> web.Response:
except Exception as e: # never throw
return web.json_response({"error": f"pm2 error: {e}"}, status=502)
services = []
for a in apps:
name = a.get("name", "")
if not any(name.startswith(p) for p in _PM2_PREFIXES):
continue
env = a.get("pm2_env", {}) or {}
services.append({
"name": name,
"status": env.get("status", ""),
"restarts": env.get("restart_time", 0),
"uptime_ms": env.get("pm_uptime", 0),
"cpu": (a.get("monit") or {}).get("cpu", 0),
"memory_bytes": (a.get("monit") or {}).get("memory", 0),
"cron": env.get("cron_restart") or "",
"autorestart": env.get("autorestart", True),
})
services = [
_trim_service(a) for a in apps
if any(str(a.get("name", "")).startswith(p) for p in _PM2_PREFIXES)
]
services.sort(key=lambda s: s["name"])
return web.json_response({"services": services})
# Process control (restart/stop/start) for the dashboard's "Windows-services"
# panel. Mutating, so it requires the Bearer secret (unlike read-only /pm2).
# Whitelisted to ``legal-`` names only — never paperclip or arbitrary processes.
_PM2_ACTIONS = {"restart", "stop", "start"}
async def pm2_control(request: web.Request) -> web.Response:
"""Run ``pm2 <action> <name>`` for a whitelisted legal-* process."""
unauth = _check_bearer(request)
if unauth is not None:
return unauth
try:
body = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "invalid JSON body"}, status=400)
name = str(body.get("name", "")).strip()
action = str(body.get("action", "")).strip()
if action not in _PM2_ACTIONS:
return web.json_response(
{"error": f"action must be one of {sorted(_PM2_ACTIONS)}"}, status=400
)
if not name.startswith("legal-"):
return web.json_response(
{"error": "name must be a legal-* process"}, status=403
)
try:
rc, out, err = await _pm2_run(action, name, "--silent", timeout=30)
if rc != 0:
return web.json_response(
{"ok": False,
"error": f"pm2 {action} {name} failed: "
f"{err.decode('utf-8','replace')[:200]}"},
status=502,
)
# Re-read just this process so the UI settles on the real new state.
rc2, out2, _ = await _pm2_run("jlist")
svc = None
if rc2 == 0:
for a in json.loads(out2.decode("utf-8", "replace")):
if a.get("name") == name:
svc = _trim_service(a)
break
return web.json_response({"ok": True, "action": action, "service": svc})
except FileNotFoundError:
return web.json_response({"error": "pm2 not found on PATH"}, status=502)
except Exception as e: # never throw
return web.json_response({"ok": False, "error": f"pm2 error: {e}"}, status=502)
def _check_bearer(request: web.Request) -> web.Response | None:
auth = request.headers.get("Authorization", "")
expected = "Bearer " + _SHARED_SECRET
@@ -156,6 +219,7 @@ def build_app() -> web.Application:
app = web.Application(client_max_size=64 * 1024 * 1024)
app.router.add_get("/health", health)
app.router.add_get("/pm2", pm2_status)
app.router.add_post("/pm2/control", pm2_control)
app.router.add_post("/fetch", fetch)
return app

View File

@@ -1401,6 +1401,21 @@ UPDATE digests SET digest_kind =
CREATE INDEX IF NOT EXISTS idx_digests_kind ON digests(digest_kind);
"""
SCHEMA_V33_SQL = """
-- drain_controls: a per-drain "startup type" switch for the /operations
-- dashboard's process-management panel. pm2 cron_restart resurrects a stopped
-- cron job at the next tick, so `pm2 stop` is NOT a durable "disable" for the
-- drains. Instead each drain checks this flag at startup and no-ops when
-- disabled (like a Windows service set to Disabled). The container writes it
-- directly (no host roundtrip); the drains read it. name = the pm2 process
-- name (e.g. 'legal-metadata-drain').
CREATE TABLE IF NOT EXISTS drain_controls (
name TEXT PRIMARY KEY,
disabled BOOLEAN NOT NULL DEFAULT false,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
"""
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
async with pool.acquire() as conn:
@@ -1437,7 +1452,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await conn.execute(SCHEMA_V30_SQL)
await conn.execute(SCHEMA_V31_SQL)
await conn.execute(SCHEMA_V32_SQL)
logger.info("Database schema initialized (v1-v32)")
await conn.execute(SCHEMA_V33_SQL)
logger.info("Database schema initialized (v1-v33)")
async def init_schema() -> None:
@@ -6144,3 +6160,34 @@ async def court_fetch_job_list(status: str | None = None, limit: int = 100) -> l
limit,
)
return [_row_to_court_fetch_job(r) for r in rows]
# ── Drain controls (/operations process-management panel) ──────────────────
async def is_drain_disabled(name: str) -> bool:
"""True if the named drain is switched off (drains check this at startup)."""
pool = await get_pool()
async with pool.acquire() as conn:
val = await conn.fetchval(
"SELECT disabled FROM drain_controls WHERE name = $1", name
)
return bool(val)
async def set_drain_disabled(name: str, disabled: bool) -> None:
"""Switch a drain on/off (upsert). name = pm2 process name."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"INSERT INTO drain_controls (name, disabled, updated_at) "
"VALUES ($1, $2, now()) "
"ON CONFLICT (name) DO UPDATE SET disabled = $2, updated_at = now()",
name, disabled,
)
async def get_drain_controls() -> dict[str, bool]:
"""Map of drain name → disabled flag (only rows that were ever toggled)."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT name, disabled FROM drain_controls")
return {r["name"]: bool(r["disabled"]) for r in rows}

View File

@@ -98,7 +98,8 @@
| `drain_halacha_queue.py` | python | ריקון תור חילוץ ההלכות (`process_pending_extractions kind='halacha'`) ב-batches של 4 עד שהתור ריק (2 סבבים ריקים). חילוץ-הלכות נשאר על claude_session (לא Gemini). self-heal ל-orphaned `processing`. ההלכות נוחתות `pending_review` (שער-יו"ר). | דרך `legal-halacha-drain.config.cjs` (pm2 cron) / ידני |
| `legal-halacha-drain.config.cjs` | pm2/js | **תזמון כל שעתיים של `drain_halacha_queue.py`** (cron `47 */2 * * *`, `HALACHA_DRAIN_CRON` לעקיפה) — מונע סתימה של תור חילוץ-ההלכות. קצב שמרני (Claude איטי + כל ריצה מוסיפה לתור-אישור-היו"ר). דורש claude CLI. התקנה: `pm2 start scripts/legal-halacha-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
| `ingest_digests_batch.py` | python | קליטת batch של יומוני "כל יום" מ-`data/digests/incoming/` דרך המסלול העצמאי של קורפוס-הגילוי (`digest_library.ingest_digest`) — חילוץ-LLM (תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים), embedding יחיד, ו-autolink לפסק המקורי (X12/INV-DIG3). רצף (לא מקבילי). מזהה-יומון+תאריך נגזרים משם-הקובץ; העלון החודשי מדולג. **לא מעביר קבצים** — ה-DB (content_hash) הוא מקור-האמת היחיד; הרצה חוזרת מדלגת על קיימים (`exists`). config מ-`~/.env`. | ידני, per-batch (חלופה ל-MCP `digest_upload`) |
| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. | **cron יומי** (10:00, אחרי ה-poll של n8n; flock למניעת חפיפה → `data/digests/drain.log`) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` |
| `drain_digests.py` | python | ריקון תור ההעשרה של יומונים (X12): מעבד כל digest בסטטוס `pending` דרך `digest_library.enrich_digest` (חילוץ-LLM Sonnet + embedding + autolink). מקבילי (CONCURRENCY=3, env-tunable), idempotent. מוסיף `~/.local/bin` ל-PATH כדי שה-claude CLI יימצא תחת cron. בודק דגל `drain_controls('legal-digest-drain')` ב-startup → no-op כשכבוי מ-/operations. | דרך `legal-digest-drain.config.cjs` (pm2 cron) + ידני אחרי backfill. חלופת-MCP: `digest_process_pending` |
| `legal-digest-drain.config.cjs` | pm2/js | **תזמון כל שעתיים של `drain_digests.py`** (cron `0 */2 * * *`, `DIGEST_DRAIN_CRON` לעקיפה) — הועבר מ-crontab של המערכת ל-pm2 כדי שיופיע ויהיה שליט בדף `/operations` (הרץ-עכשיו/הפעל/כבה). `autorestart:false` (one-shot per tick). דורש claude CLI + `VOYAGE_API_KEY`. התקנה: `pm2 start scripts/legal-digest-drain.config.cjs && pm2 save`. | pm2 cron (host-side) |
## סקריפטים שנמחקו (git history בלבד)

View File

@@ -22,9 +22,15 @@ import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import court_fetch_orchestrator as orch
from legal_mcp.services import db
async def main() -> int:
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-court-fetch-drain"):
print("===SKIP=== legal-court-fetch-drain disabled via /operations", flush=True)
return 0
limit = int(sys.argv[1]) if len(sys.argv) > 1 else 5
res = await orch.drain_pending(limit=limit)
print(f"===court-fetch drain=== processed={res.get('processed', 0)} "

View File

@@ -12,8 +12,9 @@ rows still 'pending'; safe to re-run. The DB is the single source of truth.
Used two ways:
1. Manually after a backfill: mcp-server/.venv/bin/python scripts/drain_digests.py
2. Daily cron (after the n8n 09:30 Gmail poll) — see crontab; runs under flock
so a slow run never overlaps the next. Logs to data/digests/drain.log.
2. pm2 cron ``legal-digest-drain`` (scripts/legal-digest-drain.config.cjs) —
one-shot per tick. Controllable from the /operations dashboard (run-now /
enable / disable). Logs to data/digests/drain.log.
claude CLI must be on PATH (the cron line prepends ~/.local/bin). Config
(POSTGRES_URL, VOYAGE_API_KEY) auto-loads from ~/.env via legal_mcp.config.
@@ -36,6 +37,12 @@ CONCURRENCY = int(os.environ.get("DIGEST_DRAIN_CONCURRENCY", "3"))
async def main() -> int:
pool = await db.get_pool()
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-digest-drain"):
print("===SKIP=== legal-digest-drain disabled via /operations", flush=True)
await db.close_pool()
return 0
# get_pool() runs schema migrations first — incl. the V32 digest_kind backfill
# that classifies legacy rows — so the failure check below is safe from the
# very first run (no legacy row has digest_kind='').

View File

@@ -0,0 +1,47 @@
"""Drain the halacha extraction queue for the incoming batch.
Calls the canonical process_pending_extractions(kind='halacha') in small batches
until the queue is empty (two consecutive zero-progress rounds). Serial + global
advisory-lock coordinated inside the service — avoids concurrent Claude load spikes.
Run: mcp-server/.venv/bin/python scripts/drain_halacha_queue.py
"""
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db
from legal_mcp.services import precedent_library as pl
async def main():
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-halacha-drain"):
print("===SKIP=== legal-halacha-drain disabled via /operations", flush=True)
return
total = 0
empty_rounds = 0
rnd = 0
while empty_rounds < 2:
rnd += 1
out = await pl.process_pending_extractions(kind="halacha", limit=4)
processed = out.get("processed", 0)
total_pending = out.get("total_pending", 0)
total += processed
print(f"[round {rnd}] processed={processed} total_pending={total_pending} status={out.get('status')}", flush=True)
for r in out.get("results", []):
print(f" {r.get('case_number')}: {r.get('status')} stored={r.get('stored')} retry={r.get('retry_attempts')}", flush=True)
if processed == 0:
empty_rounds += 1
await asyncio.sleep(5)
else:
empty_rounds = 0
print(f"\n===DONE=== total halachot rounds processed; cases handled cumulatively={total}", flush=True)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -18,10 +18,16 @@ import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "mcp-server", "src"))
from legal_mcp.services import db
from legal_mcp.services import precedent_library as pl
async def main() -> int:
# /operations "disable" switch — no-op immediately if turned off (pm2
# cron_restart can still fire a stopped job, so the gate lives in the DB).
if await db.is_drain_disabled("legal-metadata-drain"):
print("===SKIP=== legal-metadata-drain disabled via /operations", flush=True)
return 0
batch = int(sys.argv[1]) if len(sys.argv) > 1 else 10
total = 0
empty_rounds = 0

View File

@@ -0,0 +1,37 @@
/**
* pm2 ecosystem entry for legal-digest-drain — scheduled (every 2 h) drain of
* the digest-enrichment queue (X12: "כל יום" yomonim → Sonnet enrichment +
* embedding + autolink). Migrated from a bare system crontab line to pm2 so it
* appears in — and is controllable from — the /operations dashboard (run-now /
* enable / disable) like every other drain.
*
* Pattern: cron_restart fires the script on schedule; autorestart:false → runs
* once and exits (pm2 shows "stopped" between ticks — expected). The script
* already serialises itself (it self-heals stale 'processing' rows), so no flock
* is needed under pm2's one-shot model.
*
* Requires (host ~/.env via legal_mcp.config): POSTGRES_URL, VOYAGE_API_KEY, and
* the local `claude` CLI on PATH (the script prepends ~/.local/bin).
*
* Install (once):
* pm2 start /home/chaim/legal-ai/scripts/legal-digest-drain.config.cjs
* pm2 save
* Run now (manual): mcp-server/.venv/bin/python scripts/drain_digests.py
* Schedule override: DIGEST_DRAIN_CRON (default every 2 h at :00).
*/
const cron = process.env.DIGEST_DRAIN_CRON || "0 */2 * * *";
module.exports = {
apps: [
{
name: "legal-digest-drain",
cwd: "/home/chaim/legal-ai",
script: "/home/chaim/legal-ai/mcp-server/.venv/bin/python",
args: "scripts/drain_digests.py",
env: { HOME: "/home/chaim", PYTHONUNBUFFERED: "1" },
autorestart: false, // one-shot per cron tick
cron_restart: cron,
max_memory_restart: "800M",
},
],
};

View File

@@ -4,46 +4,81 @@ import Link from "next/link";
import { AppShell } from "@/components/app-shell";
import { Card, CardContent } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
import { Switch } from "@/components/ui/switch";
import { Skeleton } from "@/components/ui/skeleton";
import {
useOperations,
useServiceAction,
useDrainToggle,
type OpsService,
type OperationsSnapshot,
type PipelineStats,
} from "@/lib/api/operations";
function mb(bytes: number): string {
return `${Math.round((bytes || 0) / 1024 / 1024)}MB`;
}
function uptime(ms: number): string {
function ago(ms: number): string {
if (!ms) return "—";
const secs = Math.floor((Date.now() - ms) / 1000);
if (secs < 60) return `${secs}ש׳`;
if (secs < 3600) return `${Math.floor(secs / 60)}ד׳`;
if (secs < 86400) return `${Math.floor(secs / 3600)}ש׳`;
return `${Math.floor(secs / 86400)}י׳`;
if (secs < 60) return `לפני ${secs}ש׳`;
if (secs < 3600) return `לפני ${Math.floor(secs / 60)}ד׳`;
if (secs < 86400) return `לפני ${Math.floor(secs / 3600)}ש׳`;
return `לפני ${Math.floor(secs / 86400)}י׳`;
}
// Hebrew labels for the raw status strings the backend reports.
const STATUS_HE: Record<string, string> = {
online: "פעיל",
stopped: "עצור",
errored: "שגיאה",
launching: "עולה",
pending: "ממתין",
processing: "בעיבוד",
running: "רץ",
done: "הושלם",
completed: "הושלם",
failed: "נכשל",
manual: "ידני",
approved: "אושר",
pending_review: "ממתין לאישור",
rejected: "נדחה",
published: "פורסם",
deferred: "נדחה זמנית",
open: "פתוח",
closed: "סגור",
unknown: "לא ידוע",
};
function he(status: string): string {
return STATUS_HE[status] ?? status;
}
const STATUS_VARIANT: Record<string, "default" | "secondary" | "destructive" | "outline"> = {
online: "default",
done: "default",
approved: "default",
completed: "default",
approved: "default",
published: "default",
stopped: "secondary",
pending: "secondary",
pending_review: "secondary",
open: "secondary",
running: "outline",
processing: "outline",
launching: "outline",
failed: "destructive",
errored: "destructive",
manual: "destructive",
open: "secondary",
rejected: "destructive",
};
function StatusBadge({ value, count }: { value: string; count?: number }) {
return (
<Badge variant={STATUS_VARIANT[value] ?? "outline"} className="font-normal">
{value}
{he(value)}
{count !== undefined ? <span className="ms-1 font-semibold">{count}</span> : null}
</Badge>
);
@@ -52,56 +87,151 @@ function StatusBadge({ value, count }: { value: string; count?: number }) {
const SERVICE_LABELS: Record<string, string> = {
"legal-court-fetch-service": "שירות אחזור פסיקה (דפדפן נט המשפט)",
"legal-court-fetch-xvfb": "צג וירטואלי (Xvfb) לדפדפן",
"legal-court-fetch-drain": "תזמון: ניקוז תור אחזור (שעתי)",
"legal-court-fetch-drain": "תזמון: ניקוז תור אחזור פסיקה (שעתי)",
"legal-metadata-drain": "תזמון: חילוץ מטא-דאטה (Gemini, ×15 דק׳)",
"legal-halacha-drain": "תזמון: חילוץ הלכות (Claude, ×שעתיים)",
"legal-digest-drain": "תזמון: העשרת יומונים (Sonnet, ×שעתיים)",
"legal-reaper": "מנקה תהליכים-יתומים (נגד דליפות זיכרון)",
"legal-chat-service": "שירות צ׳אט אימון (גשר ל-claude CLI)",
};
function ServicesSection({ data }: { data: OperationsSnapshot }) {
// ── Process-management panel (the "Windows services" view) ─────────────────
function ServiceControls({
s,
busy,
onAction,
onToggle,
}: {
s: OpsService;
busy: boolean;
onAction: (action: "restart" | "stop" | "start" | "run-now") => void;
onToggle: (disabled: boolean) => void;
}) {
const isCron = !!s.cron;
if (isCron) {
// Cron drain: "run now" (pm2 restart) + an enable/disable switch (DB flag).
return (
<div className="flex items-center gap-2 shrink-0">
<Button
size="xs"
variant="outline"
disabled={busy || s.disabled}
onClick={() => onAction("run-now")}
title={s.disabled ? "הפעל את התזמון כדי להריץ" : "הרץ את ה-drain פעם אחת מיד"}
>
הרץ עכשיו
</Button>
<label className="flex items-center gap-1.5 text-[0.7rem] text-ink-muted cursor-pointer">
<Switch
checked={!s.disabled}
disabled={busy}
onCheckedChange={(on) => {
if (on || confirm(`לכבות את התזמון "${SERVICE_LABELS[s.name] ?? s.name}"?`)) {
onToggle(!on);
}
}}
/>
{s.disabled ? "כבוי" : "פעיל"}
</label>
</div>
);
}
// Daemon: restart / stop / start.
const online = s.status === "online";
return (
<div className="flex items-center gap-1.5 shrink-0">
<Button size="xs" variant="outline" disabled={busy} onClick={() => onAction("restart")}>
הפעל מחדש
</Button>
{online ? (
<Button
size="xs"
variant="ghost"
disabled={busy}
className="text-destructive"
onClick={() => {
if (confirm(`לעצור את "${SERVICE_LABELS[s.name] ?? s.name}"?`)) onAction("stop");
}}
>
עצור
</Button>
) : (
<Button size="xs" variant="ghost" disabled={busy} onClick={() => onAction("start")}>
הפעל
</Button>
)}
</div>
);
}
function ServicesPanel({ data }: { data: OperationsSnapshot }) {
const action = useServiceAction();
const toggle = useDrainToggle();
const busy = action.isPending || toggle.isPending;
return (
<Card className="bg-surface border-rule shadow-sm">
<CardContent className="px-6 py-5">
<h2 className="text-navy text-lg mb-1">שירותי רקע (pm2)</h2>
<h2 className="text-navy text-lg mb-1">ניהול תהליכי-רקע (pm2)</h2>
<p className="text-ink-muted text-xs mb-4">
דמונים ומשימות-תזמון על שרת המארח. &quot;cron&quot; = רץ לפי לוח-זמנים (מציג
&quot;stopped&quot; בין הרצות תקין).
כמו &quot;שירותים&quot; ב-Windows. דמון = שירות רץ-תמיד (הפעל-מחדש/עצור/הפעל).
תזמון (cron) = רץ לפי לוח-זמנים (&quot;הרץ עכשיו&quot; להרצה מיידית, ומתג
הפעלה/כיבוי של התזמון).
</p>
{data.services_error ? (
<p className="text-sm text-destructive">{data.services_error}</p>
) : data.services.length === 0 ? (
<p className="text-sm text-ink-muted">אין שירותים.</p>
) : (
<div className="grid gap-2 sm:grid-cols-2">
{data.services.map((s: OpsService) => (
<div
key={s.name}
className="flex items-center justify-between gap-3 rounded-md border border-rule-soft bg-rule-soft/30 px-3 py-2"
>
<div className="min-w-0">
<div className="flex items-center gap-2">
<StatusBadge value={s.status} />
{s.cron ? (
<span className="text-[0.7rem] text-ink-muted font-mono" dir="ltr">
{s.cron}
<div className="grid gap-2">
{data.services.map((s: OpsService) => {
const isCron = !!s.cron;
return (
<div
key={s.name}
className="flex items-center justify-between gap-3 rounded-md border border-rule-soft bg-rule-soft/30 px-3 py-2"
>
<div className="min-w-0">
<div className="flex items-center gap-2 flex-wrap">
{isCron ? (
<Badge
variant={s.disabled ? "destructive" : "default"}
className="font-normal"
>
{s.disabled ? "כבוי" : "פעיל (מתוזמן)"}
</Badge>
) : (
<StatusBadge value={s.status} />
)}
{s.cron ? (
<span className="text-[0.7rem] text-ink-muted font-mono" dir="ltr">
{s.cron}
</span>
) : null}
</div>
<div className="text-[0.8rem] text-navy truncate mt-0.5">
{SERVICE_LABELS[s.name] ?? s.name}
</div>
<div className="text-[0.66rem] text-ink-muted flex items-center gap-2 flex-wrap">
<span className="font-mono" dir="ltr">
{s.name}
</span>
) : null}
</div>
<div className="text-[0.78rem] text-navy truncate mt-0.5">
{SERVICE_LABELS[s.name] ?? s.name}
</div>
<div className="text-[0.68rem] text-ink-muted font-mono" dir="ltr">
{s.name}
<span>{mb(s.memory_bytes)}</span>
<span>{s.restarts}</span>
<span>{isCron ? `ריצה אחרונה ${ago(s.uptime_ms)}` : ago(s.uptime_ms)}</span>
</div>
</div>
<ServiceControls
s={s}
busy={busy}
onAction={(a) => action.mutate({ name: s.name, action: a })}
onToggle={(disabled) => toggle.mutate({ name: s.name, disabled })}
/>
</div>
<div className="text-end text-[0.7rem] text-ink-muted shrink-0">
<div>{mb(s.memory_bytes)}</div>
<div>{s.restarts}</div>
{!s.cron ? <div>{uptime(s.uptime_ms)}</div> : null}
</div>
</div>
))}
);
})}
</div>
)}
</CardContent>
@@ -109,6 +239,68 @@ function ServicesSection({ data }: { data: OperationsSnapshot }) {
);
}
// ── Uniform queue stats ────────────────────────────────────────────────────
function StatTile({
label,
value,
tone,
title,
}: {
label: string;
value: number;
tone: "navy" | "muted" | "amber" | "green" | "red";
title?: string;
}) {
const toneClass = {
navy: "text-navy",
muted: "text-ink-muted",
amber: "text-gold-deep",
green: "text-emerald-600",
red: "text-destructive",
}[tone];
return (
<div
title={title}
className="flex flex-col items-center rounded-md border border-rule-soft px-2.5 py-1.5 min-w-[68px]"
>
<span className={`text-base font-semibold leading-none ${toneClass}`}>{value}</span>
<span className="text-[0.66rem] text-ink-muted mt-1 text-center">{label}</span>
</div>
);
}
function UniformStats({ p }: { p: PipelineStats }) {
return (
<div className="space-y-2">
<div className="flex flex-wrap gap-2">
<StatTile
label="בתור"
value={p.queued}
tone="navy"
title="ממתינים שנדרשו במפורש לעיבוד — אלה שה-drain הבא יטפל בהם"
/>
<StatTile
label="ממתין (בקלוג)"
value={p.pending}
tone="muted"
title="כל הפריטים שטרם עובדו (ברירת-מחדל) — לאו דווקא בתור הפעיל"
/>
<StatTile label="בעיבוד" value={p.processing} tone="amber" />
<StatTile label="הושלם" value={p.done} tone="green" />
{p.failed > 0 ? <StatTile label="נכשל" value={p.failed} tone="red" /> : null}
</div>
{p.running_now.length > 0 ? (
<div className="text-[0.74rem] text-navy">
<span className="text-ink-muted">רץ עכשיו: </span>
{p.running_now.join(" · ")}
</div>
) : (
<div className="text-[0.72rem] text-ink-muted">אין פריט בעיבוד כרגע</div>
)}
</div>
);
}
function StatusRow({ by }: { by: Record<string, number> }) {
const entries = Object.entries(by).filter(([, n]) => n > 0);
if (entries.length === 0) return <span className="text-ink-muted text-sm">ריק</span>;
@@ -134,7 +326,7 @@ function PipelineCard({
}) {
return (
<Card className="bg-surface border-rule shadow-sm">
<CardContent className="px-5 py-4 space-y-2">
<CardContent className="px-5 py-4 space-y-2.5">
<div>
<h3 className="text-navy text-sm font-semibold mb-0.5">{title}</h3>
<p className="text-ink-muted text-[0.72rem]">{desc}</p>
@@ -160,7 +352,7 @@ export default function OperationsPage() {
<h1 className="text-navy mb-0">תפעול מה רץ ברקע</h1>
<p className="text-ink-muted text-sm mt-1 max-w-3xl">
כל מה שמוריד ומנתח אוטומטית: שירותי-המארח, משימות-התזמון, ותורי-העבודה.
מתרענן כל 5 שניות.
ניתן להפעיל-מחדש / לעצור / להריץ-עכשיו כל תהליך. מתרענן כל 5 שניות.
</p>
</header>
@@ -177,39 +369,44 @@ export default function OperationsPage() {
</div>
) : (
<>
<ServicesSection data={data} />
<ServicesPanel data={data} />
<div className="grid gap-4 md:grid-cols-2 xl:grid-cols-3">
<PipelineCard
title="אחזור פסיקה (X13)"
desc="הורדה אוטומטית מנט-המשפט / פורטל-העליון → קורפוס"
>
<StatusRow by={data.pipelines.court_fetch.by_status} />
<UniformStats p={data.pipelines.court_fetch} />
</PipelineCard>
<PipelineCard
title="חילוץ מטא-דאטה"
desc="Gemini Flash — שם/תקציר/תגיות לכל פסיקה"
>
<StatusRow by={data.pipelines.metadata_extraction.by_status} />
<p className="text-[0.72rem] text-ink-muted">
בתור לחילוץ: {data.pipelines.metadata_extraction.queued}
</p>
<UniformStats p={data.pipelines.metadata_extraction} />
</PipelineCard>
<PipelineCard
title="חילוץ הלכות"
desc="Claude — הלכות מכל פסיקה (→ אישור יו״ר)"
>
<StatusRow by={data.pipelines.halacha_extraction.by_status} />
<UniformStats p={data.pipelines.halacha_extraction} />
</PipelineCard>
<PipelineCard
title="העשרת יומונים (רדאר)"
desc="Sonnet — תקציר/תגיות/קישור-לפסיקה לכל יומון"
>
<UniformStats p={data.pipelines.digests} />
<p className="text-[0.72rem] text-ink-muted">
בתור לחילוץ: {data.pipelines.halacha_extraction.queued}
{data.pipelines.digests.linked}/{data.pipelines.digests.total} מקושרים
לפסיקה
</p>
</PipelineCard>
<PipelineCard
title="אישור הלכות (שער יו״ר)"
desc="הלכות שחולצו, ממתינות להכרעת דפנה ב-/approvals"
desc="הלכות שחולצו, ממתינות להכרעת דפנה ב-/approvals — שער-אנושי, לא תהליך"
>
<StatusRow by={data.pipelines.halacha_review.by_status} />
</PipelineCard>
@@ -220,16 +417,6 @@ export default function OperationsPage() {
>
<StatusRow by={data.pipelines.missing_precedents.by_status} />
</PipelineCard>
<PipelineCard
title="יומונים (רדאר)"
desc="מצביעים על פסקי-דין → מזניקים אחזור אוטומטי"
>
<p className="text-sm text-navy">
{data.pipelines.digests.linked}/{data.pipelines.digests.total} מקושרים
לפסיקה
</p>
</PipelineCard>
</div>
<div className="grid gap-4 lg:grid-cols-2">

View File

@@ -1,4 +1,5 @@
import { useQuery } from "@tanstack/react-query";
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { toast } from "sonner";
import { apiRequest } from "./client";
export type OpsService = {
@@ -10,6 +11,7 @@ export type OpsService = {
memory_bytes: number;
cron: string;
autorestart: boolean;
disabled?: boolean; // cron drain switched off via the dashboard
};
export type CourtFetchJob = {
@@ -28,16 +30,27 @@ export type IngestedRow = {
created_at: string;
};
/** The uniform per-pipeline shape every background drain reports. */
export type PipelineStats = {
pending: number; // backlog: rows not yet processed (status default)
processing: number; // being worked right now
done: number; // completed
failed: number; // terminal failures (court_fetch folds in 'manual')
queued: number; // explicitly enqueued for the next drain run
running_now: string[]; // human labels of the items currently processing
by_status: Record<string, number>; // raw counts, for the curious
};
export type OperationsSnapshot = {
services: OpsService[];
services_error: string | null;
pipelines: {
court_fetch: { by_status: Record<string, number>; recent: CourtFetchJob[] };
metadata_extraction: { by_status: Record<string, number>; queued: number };
halacha_extraction: { by_status: Record<string, number>; queued: number };
court_fetch: PipelineStats & { recent: CourtFetchJob[] };
metadata_extraction: PipelineStats;
halacha_extraction: PipelineStats;
digests: PipelineStats & { total: number; linked: number };
halacha_review: { by_status: Record<string, number> };
missing_precedents: { by_status: Record<string, number> };
digests: { total: number; linked: number };
ingested_recent: IngestedRow[];
};
};
@@ -51,3 +64,42 @@ export function useOperations() {
staleTime: 3000,
});
}
export type ServiceAction = "restart" | "stop" | "start" | "run-now";
/** Control a background service (daemon restart/stop/start, or run a drain now). */
export function useServiceAction() {
const qc = useQueryClient();
return useMutation({
mutationFn: ({ name, action }: { name: string; action: ServiceAction }) =>
apiRequest(`/api/operations/services/${name}/${action}`, { method: "POST" }),
onSuccess: (_d, { action }) => {
const labels: Record<ServiceAction, string> = {
"run-now": "הופעל עכשיו",
restart: "הופעל מחדש",
stop: "נעצר",
start: "הופעל",
};
toast.success(labels[action]);
qc.invalidateQueries({ queryKey: ["operations"] });
},
onError: (e) => toast.error(`הפעולה נכשלה: ${String(e)}`),
});
}
/** Switch a cron drain on/off (its "startup type"). */
export function useDrainToggle() {
const qc = useQueryClient();
return useMutation({
mutationFn: ({ name, disabled }: { name: string; disabled: boolean }) =>
apiRequest(`/api/operations/drains/${name}/disabled`, {
method: "POST",
body: { disabled },
}),
onSuccess: (_d, { disabled }) => {
toast.success(disabled ? "התזמון כובה" : "התזמון הופעל");
qc.invalidateQueries({ queryKey: ["operations"] });
},
onError: (e) => toast.error(`העדכון נכשל: ${String(e)}`),
});
}

View File

@@ -22,7 +22,7 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "
import zipfile
from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile
from fastapi import BackgroundTasks, Body, FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse, StreamingResponse
from typing import Any, Literal
from pydantic import BaseModel
@@ -6030,6 +6030,49 @@ async def _ops_pm2_services() -> dict:
return {"services": [], "error": f"לא ניתן להגיע לשירות-המארח: {e}"}
async def _ops_pm2_control(name: str, action: str) -> dict:
"""Proxy a mutating pm2 action to the host bridge (Bearer-authenticated)."""
secret = os.environ.get("COURT_FETCH_SHARED_SECRET", "").strip()
headers = {"Authorization": f"Bearer {secret}"} if secret else {}
async with httpx.AsyncClient(timeout=35.0) as client:
r = await client.post(
f"{_COURT_FETCH_SERVICE_URL}/pm2/control",
json={"name": name, "action": action}, headers=headers,
)
try:
payload = r.json()
except Exception:
payload = {"error": r.text[:200]}
if r.status_code != 200:
raise HTTPException(r.status_code, payload.get("error", "pm2 control failed"))
return payload
def _norm_pipeline(
by_status: dict, *,
pending: tuple[str, ...], processing: tuple[str, ...],
done: tuple[str, ...], failed: tuple[str, ...],
queued: int, running_now: list[str], extra: dict | None = None,
) -> dict:
"""Project a raw status-count map into the dashboard's uniform shape:
pending / processing / done / failed / queued + the live running items."""
def total(keys: tuple[str, ...]) -> int:
return sum(int(by_status.get(k, 0) or 0) for k in keys)
out = {
"pending": total(pending),
"processing": total(processing),
"done": total(done),
"failed": total(failed),
"queued": queued,
"running_now": running_now,
"by_status": by_status,
}
if extra:
out.update(extra)
return out
@app.get("/api/operations")
async def operations_snapshot():
"""Everything running in the background: services + pipelines/queues."""
@@ -6038,6 +6081,9 @@ async def operations_snapshot():
async def counts(sql: str) -> dict:
return {r[0]: r[1] for r in await conn.fetch(sql)}
async def col(sql: str) -> list[str]:
return [r[0] for r in await conn.fetch(sql) if r[0]]
court_fetch = await counts(
"SELECT status, count(*) FROM court_fetch_jobs GROUP BY 1"
)
@@ -6047,6 +6093,10 @@ async def operations_snapshot():
"updated_at FROM court_fetch_jobs ORDER BY updated_at DESC LIMIT 15"
)
]
court_running = await col(
"SELECT coalesce(nullif(citation_raw,''), case_number_norm) "
"FROM court_fetch_jobs WHERE status = 'running' ORDER BY updated_at LIMIT 5"
)
meta = await counts(
"SELECT coalesce(metadata_extraction_status,'unknown'), count(*) "
"FROM case_law GROUP BY 1"
@@ -6054,6 +6104,10 @@ async def operations_snapshot():
meta_queued = await conn.fetchval(
"SELECT count(*) FROM case_law WHERE metadata_extraction_requested_at IS NOT NULL"
)
meta_running = await col(
"SELECT case_number FROM case_law WHERE metadata_extraction_status = 'processing' "
"ORDER BY metadata_extraction_requested_at NULLS LAST LIMIT 5"
)
hal_ext = await counts(
"SELECT coalesce(halacha_extraction_status,'unknown'), count(*) "
"FROM case_law GROUP BY 1"
@@ -6061,12 +6115,24 @@ async def operations_snapshot():
hal_queued = await conn.fetchval(
"SELECT count(*) FROM case_law WHERE halacha_extraction_requested_at IS NOT NULL"
)
hal_running = await col(
"SELECT case_number FROM case_law WHERE halacha_extraction_status = 'processing' "
"ORDER BY halacha_extraction_requested_at NULLS LAST LIMIT 5"
)
review = await counts("SELECT review_status, count(*) FROM halachot GROUP BY 1")
missing = await counts("SELECT status, count(*) FROM missing_precedents GROUP BY 1")
digest_ext = await counts(
"SELECT coalesce(extraction_status,'unknown'), count(*) FROM digests GROUP BY 1"
)
digests_total = await conn.fetchval("SELECT count(*) FROM digests")
digests_linked = await conn.fetchval(
"SELECT count(*) FROM digests WHERE linked_case_law_id IS NOT NULL"
)
digest_running = await col(
"SELECT coalesce(nullif(underlying_citation,''), nullif(concept_tag,''), "
"'יומון '||yomon_number) FROM digests WHERE extraction_status = 'processing' "
"ORDER BY updated_at LIMIT 5"
)
ingested_recent = [
dict(r) for r in await conn.fetch(
"SELECT case_number, court, source_url, created_at FROM case_law "
@@ -6075,6 +6141,9 @@ async def operations_snapshot():
]
pm2 = await _ops_pm2_services()
controls = await db.get_drain_controls()
for svc in pm2["services"]:
svc["disabled"] = controls.get(svc.get("name", ""), False)
def _iso(rows: list[dict]) -> list[dict]:
for d in rows:
@@ -6087,17 +6156,72 @@ async def operations_snapshot():
"services": pm2["services"],
"services_error": pm2["error"],
"pipelines": {
"court_fetch": {"by_status": court_fetch, "recent": _iso(court_recent)},
"metadata_extraction": {"by_status": meta, "queued": meta_queued},
"halacha_extraction": {"by_status": hal_ext, "queued": hal_queued},
"court_fetch": {
**_norm_pipeline(
court_fetch,
pending=("pending",), processing=("running",), done=("done",),
failed=("failed", "manual"),
queued=int(court_fetch.get("pending", 0)) + int(court_fetch.get("failed", 0)),
running_now=court_running,
),
"recent": _iso(court_recent),
},
"metadata_extraction": _norm_pipeline(
meta,
pending=("pending",), processing=("processing",), done=("completed",),
failed=("failed",), queued=meta_queued, running_now=meta_running,
),
"halacha_extraction": _norm_pipeline(
hal_ext,
pending=("pending",), processing=("processing",), done=("completed",),
failed=("failed",), queued=hal_queued, running_now=hal_running,
),
"digests": _norm_pipeline(
digest_ext,
pending=("pending",), processing=("processing",), done=("completed",),
failed=("failed",),
queued=int(digest_ext.get("pending", 0)), running_now=digest_running,
extra={"total": digests_total, "linked": digests_linked},
),
# Human gates, not background drains — surfaced as status counts only.
"halacha_review": {"by_status": review},
"missing_precedents": {"by_status": missing},
"digests": {"total": digests_total, "linked": digests_linked},
"ingested_recent": _iso(ingested_recent),
},
}
_OPS_SERVICE_ACTIONS = {"restart", "stop", "start", "run-now"}
@app.post("/api/operations/services/{name}/{action}")
async def operations_service_action(name: str, action: str):
"""Control a background service (restart/stop/start) or run a drain now.
'run-now' maps to pm2 restart — for a one-shot cron drain that fires the
job immediately. Whitelisted to legal-* (enforced again on the host)."""
if action not in _OPS_SERVICE_ACTIONS:
raise HTTPException(400, f"action חייב להיות אחד מ-{sorted(_OPS_SERVICE_ACTIONS)}")
if not name.startswith("legal-"):
raise HTTPException(403, "ניתן לשלוט רק בשירותי legal-*")
pm2_action = "restart" if action == "run-now" else action
return await _ops_pm2_control(name, pm2_action)
@app.post("/api/operations/drains/{name}/disabled")
async def operations_drain_toggle(name: str, body: dict = Body(...)):
"""Switch a cron drain on/off (the 'startup type' in the services panel).
Written straight to drain_controls — no host roundtrip; the drain reads the
flag at startup and no-ops when disabled (pm2 cron_restart can't be trusted
to stay stopped)."""
if not name.startswith("legal-"):
raise HTTPException(403, "ניתן לשלוט רק בשירותי legal-*")
disabled = bool(body.get("disabled"))
await db.set_drain_disabled(name, disabled)
return {"ok": True, "name": name, "disabled": disabled}
@app.get("/api/digests/{digest_id}")
async def digest_get(digest_id: str):
try: