feat(ops): /operations dashboard — everything running in the background
A single live page for all the background work that downloads/analyses, so the chair can see what's running instead of guessing. - court_fetch_service: GET /pm2 (unauthenticated, host-only) → trimmed pm2 jlist for the legal-* services (status, restarts, mem, cron schedule). - FastAPI GET /api/operations: aggregates the DB-backed pipelines (court_fetch jobs, metadata + halacha extraction queues, halacha review gate, missing_precedents, digests, recent court ingests) and proxies the host /pm2 over the docker bridge (graceful if the host service is down). - web-ui /operations page (+ src/lib/api/operations.ts hook, nav entry under admin): services grid (with Hebrew labels + schedules) + pipeline cards + recent-fetch / recent-ingest lists. Auto-refreshes every 5s. tsc --noEmit clean; pm2 status carries nothing sensitive and the bind (10.0.1.1) is host/container-only, so /pm2 needs no secret. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
88
web/app.py
88
web/app.py
@@ -6008,6 +6008,94 @@ async def digest_queue_pending(limit: int = 20):
|
||||
return {"items": items, "count": len(items)}
|
||||
|
||||
|
||||
# ── Operations dashboard (/operations) ────────────────────────────────────
|
||||
# One snapshot of everything running in the background that downloads or
|
||||
# analyses: the host pm2 services/crons + the DB-backed pipelines & queues.
|
||||
_COURT_FETCH_SERVICE_URL = os.environ.get(
|
||||
"COURT_FETCH_SERVICE_URL", "http://10.0.1.1:8771"
|
||||
)
|
||||
|
||||
|
||||
async def _ops_pm2_services() -> dict:
|
||||
"""Proxy the host court-fetch-service /pm2 (host-only capability)."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=8.0) as client:
|
||||
r = await client.get(f"{_COURT_FETCH_SERVICE_URL}/pm2")
|
||||
if r.status_code == 200:
|
||||
return {"services": r.json().get("services", []), "error": None}
|
||||
return {"services": [], "error": f"pm2 bridge {r.status_code}"}
|
||||
except Exception as e: # host service down / unreachable
|
||||
return {"services": [], "error": f"לא ניתן להגיע לשירות-המארח: {e}"}
|
||||
|
||||
|
||||
@app.get("/api/operations")
|
||||
async def operations_snapshot():
|
||||
"""Everything running in the background: services + pipelines/queues."""
|
||||
pool = await db.get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
async def counts(sql: str) -> dict:
|
||||
return {r[0]: r[1] for r in await conn.fetch(sql)}
|
||||
|
||||
court_fetch = await counts(
|
||||
"SELECT status, count(*) FROM court_fetch_jobs GROUP BY 1"
|
||||
)
|
||||
court_recent = [
|
||||
dict(r) for r in await conn.fetch(
|
||||
"SELECT case_number_norm, citation_raw, tier, status, error, "
|
||||
"updated_at FROM court_fetch_jobs ORDER BY updated_at DESC LIMIT 15"
|
||||
)
|
||||
]
|
||||
meta = await counts(
|
||||
"SELECT coalesce(metadata_extraction_status,'unknown'), count(*) "
|
||||
"FROM case_law GROUP BY 1"
|
||||
)
|
||||
meta_queued = await conn.fetchval(
|
||||
"SELECT count(*) FROM case_law WHERE metadata_extraction_requested_at IS NOT NULL"
|
||||
)
|
||||
hal_ext = await counts(
|
||||
"SELECT coalesce(halacha_extraction_status,'unknown'), count(*) "
|
||||
"FROM case_law GROUP BY 1"
|
||||
)
|
||||
hal_queued = await conn.fetchval(
|
||||
"SELECT count(*) FROM case_law WHERE halacha_extraction_requested_at IS NOT NULL"
|
||||
)
|
||||
review = await counts("SELECT review_status, count(*) FROM halachot GROUP BY 1")
|
||||
missing = await counts("SELECT status, count(*) FROM missing_precedents GROUP BY 1")
|
||||
digests_total = await conn.fetchval("SELECT count(*) FROM digests")
|
||||
digests_linked = await conn.fetchval(
|
||||
"SELECT count(*) FROM digests WHERE linked_case_law_id IS NOT NULL"
|
||||
)
|
||||
ingested_recent = [
|
||||
dict(r) for r in await conn.fetch(
|
||||
"SELECT case_number, court, source_url, created_at FROM case_law "
|
||||
"WHERE source_url LIKE '%court.gov.il%' ORDER BY created_at DESC LIMIT 12"
|
||||
)
|
||||
]
|
||||
|
||||
pm2 = await _ops_pm2_services()
|
||||
|
||||
def _iso(rows: list[dict]) -> list[dict]:
|
||||
for d in rows:
|
||||
for k, v in list(d.items()):
|
||||
if hasattr(v, "isoformat"):
|
||||
d[k] = v.isoformat()
|
||||
return rows
|
||||
|
||||
return {
|
||||
"services": pm2["services"],
|
||||
"services_error": pm2["error"],
|
||||
"pipelines": {
|
||||
"court_fetch": {"by_status": court_fetch, "recent": _iso(court_recent)},
|
||||
"metadata_extraction": {"by_status": meta, "queued": meta_queued},
|
||||
"halacha_extraction": {"by_status": hal_ext, "queued": hal_queued},
|
||||
"halacha_review": {"by_status": review},
|
||||
"missing_precedents": {"by_status": missing},
|
||||
"digests": {"total": digests_total, "linked": digests_linked},
|
||||
"ingested_recent": _iso(ingested_recent),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@app.get("/api/digests/{digest_id}")
|
||||
async def digest_get(digest_id: str):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user