diff --git a/mcp-server/src/legal_mcp/server.py b/mcp-server/src/legal_mcp/server.py
index aadfd9c..574566e 100644
--- a/mcp-server/src/legal_mcp/server.py
+++ b/mcp-server/src/legal_mcp/server.py
@@ -410,6 +410,12 @@ async def search_digests(
)
+@mcp.tool()
+async def digest_process_pending(limit: int = 20) -> str:
+ """ריקון תור היומונים שהועלו מה-UI וממתינים לעיבוד-LLM מקומי. מריץ חילוץ-מטא-דאטה + embedding + autolink על כל יומון 'pending' (מקומית עם CLI). חלופת-MCP ל-scripts/ingest_digests_batch.py."""
+ return await digest_tools.digest_process_pending(_clamp_limit(limit))
+
+
@mcp.tool()
async def halacha_review(
halacha_id: str,
diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py
index 36e9301..e5d1589 100644
--- a/mcp-server/src/legal_mcp/services/db.py
+++ b/mcp-server/src/legal_mcp/services/db.py
@@ -3785,6 +3785,18 @@ async def list_digests(
return [_row_to_digest(r) for r in rows]
+async def list_pending_digests(limit: int = 20) -> list[dict]:
+ """Digests awaiting local LLM enrichment (web-upload queue, X12). The
+ drainer (digest_library.process_pending_digests) picks these up."""
+ pool = await get_pool()
+ rows = await pool.fetch(
+ f"SELECT {_DIGEST_COLS} FROM digests WHERE extraction_status = 'pending' "
+ f"ORDER BY created_at LIMIT $1",
+ limit,
+ )
+ return [_row_to_digest(r) for r in rows]
+
+
async def search_digests_semantic(
query_embedding: list[float],
practice_area: str = "",
diff --git a/mcp-server/src/legal_mcp/services/digest_library.py b/mcp-server/src/legal_mcp/services/digest_library.py
index dc7488e..6f37799 100644
--- a/mcp-server/src/legal_mcp/services/digest_library.py
+++ b/mcp-server/src/legal_mcp/services/digest_library.py
@@ -2,18 +2,23 @@
A digest ("כל יום" daily one-pager) is a SECONDARY source that POINTS at a
ruling — it is never cited in a decision (INV-DIG1) and never enters the
-precedent/halacha pipeline (INV-DIG2). Ingest is therefore a short, standalone
-path that reuses only ATOMIC services (extract_text, embeddings), NOT the
-canonical ``ingest.ingest_document`` (which is bound to case_law):
+precedent/halacha pipeline (INV-DIG2). Ingest reuses only ATOMIC services
+(extract_text, embeddings), NOT the canonical ``ingest.ingest_document``.
- file → extract_text → content_hash (idempotent) → LLM metadata extract
- → create_digest → single embedding (concept+headline+summary+analysis)
- → try_autolink(underlying_citation → case_law) [INV-DIG3]
- → extraction_status='completed'
+Two intake paths share one enrichment core:
+
+ - ``ingest_digest`` (local/MCP, e.g. batch script) — does everything
+ synchronously: stage → extract_text → create →
+ LLM enrich → embed → autolink → completed.
+ - ``create_pending_digest`` (CONTAINER-SAFE — the web upload) — stage →
+ extract_text → create row with status='pending'.
+ No LLM, no embedding. ``process_pending_digests``
+ (local/MCP) drains the queue and enriches.
claude_session rule: ``digest_metadata_extractor`` (local CLI) is imported
-LAZILY inside ``ingest_digest`` only, so this module is import-safe from the
-FastAPI container for the search/list/link/delete paths (DB + voyage only).
+LAZILY inside the enrichment core only, so this module stays import-safe from
+the FastAPI container for create_pending / search / list / link / delete
+(DB + voyage only — voyage embedding only runs in the local enrich path).
"""
from __future__ import annotations
@@ -42,13 +47,26 @@ async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
return None
-def _embedding_text(fields: dict) -> str:
+def _coerce_date(v) -> date | None:
+ if v is None or v == "":
+ return None
+ if isinstance(v, date):
+ return v
+ if isinstance(v, str):
+ try:
+ return date.fromisoformat(v[:10])
+ except ValueError:
+ return None
+ return None
+
+
+def _embedding_text(row: dict) -> str:
"""The single vector indexes the digest as an atomic discovery unit."""
parts = [
- fields.get("concept_tag", ""),
- fields.get("headline_holding", ""),
- fields.get("summary", ""),
- fields.get("analysis_text", ""),
+ row.get("concept_tag", ""),
+ row.get("headline_holding", ""),
+ row.get("summary", ""),
+ row.get("analysis_text", ""),
]
return "\n".join(p for p in parts if p).strip()
@@ -70,6 +88,161 @@ async def try_autolink(digest_id: UUID | str, underlying_citation: str) -> str |
return str(match["id"])
+# ── Container-safe creation (web upload) — no LLM, no embedding ──────
+
+async def create_pending_digest(
+ *,
+ file_path: str | Path,
+ yomon_number: str = "",
+ digest_date: date | str | None = None,
+ practice_area: str = "",
+ appeal_subtype: str = "",
+ subject_tags: list[str] | None = None,
+ progress: ProgressCb | None = None,
+) -> dict:
+ """Stage the file, extract text (PyMuPDF — container-safe), and create a
+ digest row with extraction_status='pending'. The LLM metadata extraction,
+ embedding, and autolink are deferred to ``process_pending_digests`` (local).
+
+ Returns {status, digest_id, extraction_status} or {status:'exists', ...}.
+ Idempotent on content_hash (INV-G3).
+ """
+ progress = progress or _noop_progress
+ if practice_area and practice_area not in _VALID_PRACTICE_AREAS:
+ raise ValueError(f"invalid practice_area: {practice_area!r}")
+ src = Path(file_path)
+ if not src.exists():
+ raise ValueError(f"file not found: {file_path}")
+
+ await progress("staging", 10, "מעתיק קובץ")
+ staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
+ rel_path = str(staged.relative_to(config.DATA_DIR)) \
+ if str(staged).startswith(str(config.DATA_DIR)) else str(staged)
+
+ await progress("extracting_text", 50, "מחלץ טקסט")
+ raw_text, _pc, _off = await extractor.extract_text(str(staged))
+ raw_text = (raw_text or "").strip()
+ if not raw_text:
+ raise ValueError("no text extracted from digest")
+
+ content_hash = db._content_hash(raw_text)
+ existing = await db.get_digest_by_content_hash(content_hash)
+ if existing:
+ await progress("completed", 100, "יומון זהה כבר קיים")
+ return {"status": "exists", "digest_id": existing["id"],
+ "extraction_status": existing.get("extraction_status")}
+
+ record = await db.create_digest(
+ analysis_text=raw_text,
+ yomon_number=yomon_number.strip(),
+ digest_date=_coerce_date(digest_date),
+ practice_area=practice_area,
+ appeal_subtype=appeal_subtype.strip(),
+ subject_tags=list(subject_tags) if subject_tags else [],
+ source_document_path=rel_path,
+ extraction_status="pending",
+ )
+ await progress("queued", 100, "ממתין לעיבוד מקומי (LLM)")
+ return {"status": "pending", "digest_id": record["id"],
+ "extraction_status": "pending"}
+
+
+# ── Local enrichment core (LLM + embed + autolink) ──────────────────
+
+async def enrich_digest(digest_id: UUID | str, progress: ProgressCb | None = None) -> dict:
+ """Run LLM metadata extraction over a digest's analysis_text, fill ONLY
+ empty fields (preserve user-supplied values), embed, autolink, complete.
+
+ **MCP-tool-only path** (uses the local LLM extractor). Idempotent.
+ """
+ progress = progress or _noop_progress
+ row = await db.get_digest(digest_id)
+ if not row:
+ raise ValueError("digest not found")
+ analysis = (row.get("analysis_text") or "").strip()
+ if not analysis:
+ await db.update_digest(digest_id, extraction_status="failed")
+ return {"status": "no_text", "digest_id": str(digest_id)}
+
+ await db.update_digest(digest_id, extraction_status="processing")
+ await progress("extracting_metadata", 40, "מחלץ מטא-דאטה (LLM)")
+ from legal_mcp.services import digest_metadata_extractor
+ extracted = await digest_metadata_extractor.extract(analysis)
+
+ # Fill only empty fields (preserve user-supplied values from the form).
+ fields: dict = {}
+ for key in ("yomon_number", "concept_tag", "headline_holding", "summary",
+ "underlying_citation", "underlying_court", "underlying_judge",
+ "practice_area", "appeal_subtype"):
+ if not (row.get(key) or "").strip() and extracted.get(key):
+ fields[key] = extracted[key]
+ if row.get("digest_date") is None and extracted.get("digest_date"):
+ fields["digest_date"] = extracted["digest_date"]
+ if row.get("underlying_date") is None and extracted.get("underlying_date"):
+ fields["underlying_date"] = extracted["underlying_date"]
+ if not (row.get("subject_tags") or []) and extracted.get("subject_tags"):
+ fields["subject_tags"] = extracted["subject_tags"]
+
+ if fields:
+ await db.update_digest(digest_id, **fields)
+ merged = await db.get_digest(digest_id)
+
+ await progress("embedding", 75, "מחשב embedding")
+ emb_text = _embedding_text(merged)
+ if emb_text:
+ try:
+ vecs = await embeddings.embed_texts([emb_text], input_type="document")
+ if vecs:
+ await db.store_digest_embedding(digest_id, vecs[0])
+ except Exception as e: # surfaced, not swallowed (§6)
+ logger.warning("digest embedding failed for %s: %s", digest_id, e)
+
+ await progress("linking", 90, "מנסה לקשר לפסק המקורי")
+ linked_id = None
+ if not merged.get("linked_case_law_id"):
+ linked_id = await try_autolink(digest_id, merged.get("underlying_citation", ""))
+
+ await db.update_digest(digest_id, extraction_status="completed")
+ await progress("completed", 100, "הושלם")
+ return {
+ "status": "completed",
+ "digest_id": str(digest_id),
+ "yomon_number": merged.get("yomon_number", ""),
+ "underlying_citation": merged.get("underlying_citation", ""),
+ "linked_case_law_id": merged.get("linked_case_law_id") or linked_id,
+ "fields_filled": sorted(fields.keys()),
+ }
+
+
+async def process_pending_digests(limit: int = 20) -> dict:
+ """Drain the digest extraction queue (rows stamped extraction_status='pending'
+ by the web upload). Local/MCP only — runs the LLM enrichment per row.
+ Sequential (avoids LLM rate-limit storms), mirrors process_pending_extractions."""
+ pending = await db.list_pending_digests(limit=limit)
+ if not pending:
+ return {"status": "no_pending", "processed": 0, "results": []}
+ results = []
+ processed = 0
+ for row in pending:
+ did = row["id"]
+ try:
+ res = await enrich_digest(did)
+ processed += 1
+ results.append({"digest_id": str(did), "status": res.get("status"),
+ "linked": bool(res.get("linked_case_law_id"))})
+ except Exception as e:
+ logger.exception("process_pending_digests failed for %s: %s", did, e)
+ try:
+ await db.update_digest(did, extraction_status="failed")
+ except Exception:
+ logger.exception("could not mark digest %s failed", did)
+ results.append({"digest_id": str(did), "status": "failed", "error": str(e)})
+ return {"status": "completed", "processed": processed,
+ "total_pending": len(pending), "results": results}
+
+
+# ── Full synchronous ingest (local/MCP, e.g. batch script) ──────────
+
async def ingest_digest(
*,
file_path: str | Path,
@@ -80,109 +253,25 @@ async def ingest_digest(
subject_tags: list[str] | None = None,
progress: ProgressCb | None = None,
) -> dict:
- """Ingest one digest. **MCP-tool-only** (uses the local LLM extractor).
+ """Ingest one digest synchronously. **MCP-tool-only** (uses the LLM).
- User-supplied args win over LLM-extracted values for the same field
- (the chair typed them deliberately); empty args are filled from the LLM.
- Idempotent on yomon_number / content_hash (INV-G3).
+ Creates the row (with any user-supplied values) then enriches in place.
+ Idempotent on content_hash (INV-G3).
"""
progress = progress or _noop_progress
- if practice_area and practice_area not in _VALID_PRACTICE_AREAS:
- raise ValueError(f"invalid practice_area: {practice_area!r}")
+ created = await create_pending_digest(
+ file_path=file_path, yomon_number=yomon_number, digest_date=digest_date,
+ practice_area=practice_area, appeal_subtype=appeal_subtype,
+ subject_tags=subject_tags, progress=progress,
+ )
+ if created.get("status") == "exists":
+ return created
+ digest_id = created["digest_id"]
+ enriched = await enrich_digest(digest_id, progress=progress)
+ return enriched
- src = Path(file_path)
- if not src.exists():
- raise ValueError(f"file not found: {file_path}")
-
- await progress("staging", 5, "מעתיק קובץ")
- staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
- rel_path = str(staged.relative_to(config.DATA_DIR)) \
- if str(staged).startswith(str(config.DATA_DIR)) else str(staged)
-
- await progress("extracting_text", 20, "מחלץ טקסט")
- raw_text, _page_count, _offsets = await extractor.extract_text(str(staged))
- raw_text = (raw_text or "").strip()
- if not raw_text:
- raise ValueError("no text extracted from digest")
-
- # Idempotency: identical text already ingested → return existing row.
- content_hash = db._content_hash(raw_text)
- existing = await db.get_digest_by_content_hash(content_hash)
- if existing:
- await progress("completed", 100, "יומון זהה כבר קיים — לא נוצר כפל")
- return {
- "status": "exists",
- "digest_id": existing["id"],
- "yomon_number": existing.get("yomon_number", ""),
- "linked_case_law_id": existing.get("linked_case_law_id"),
- }
-
- # LLM metadata extraction (lazy import — keeps this module container-safe).
- await progress("extracting_metadata", 45, "מחלץ מטא-דאטה (LLM)")
- from legal_mcp.services import digest_metadata_extractor
- extracted = await digest_metadata_extractor.extract(raw_text)
-
- def _coerce_date(v) -> date | None:
- if v is None or v == "":
- return None
- if isinstance(v, date):
- return v
- if isinstance(v, str):
- try:
- return date.fromisoformat(v[:10])
- except ValueError:
- return None
- return None
-
- # Merge: explicit user args win; otherwise fall back to LLM extraction.
- fields = {
- "analysis_text": raw_text,
- "yomon_number": yomon_number.strip() or extracted.get("yomon_number", ""),
- "digest_date": _coerce_date(digest_date) or extracted.get("digest_date"),
- "concept_tag": extracted.get("concept_tag", ""),
- "headline_holding": extracted.get("headline_holding", ""),
- "summary": extracted.get("summary", ""),
- "underlying_citation": extracted.get("underlying_citation", ""),
- "underlying_court": extracted.get("underlying_court", ""),
- "underlying_date": extracted.get("underlying_date"),
- "underlying_judge": extracted.get("underlying_judge", ""),
- "practice_area": practice_area or extracted.get("practice_area", ""),
- "appeal_subtype": appeal_subtype.strip() or extracted.get("appeal_subtype", ""),
- "subject_tags": list(subject_tags) if subject_tags else extracted.get("subject_tags", []),
- "source_document_path": rel_path,
- "extraction_status": "processing",
- }
-
- await progress("storing", 70, "שומר רשומה")
- record = await db.create_digest(**fields)
- digest_id = record["id"]
-
- # Single embedding for the whole digest (atomic discovery unit — X12 §6).
- await progress("embedding", 85, "מחשב embedding")
- emb_text = _embedding_text(fields)
- if emb_text:
- try:
- vecs = await embeddings.embed_texts([emb_text], input_type="document")
- if vecs:
- await db.store_digest_embedding(digest_id, vecs[0])
- except Exception as e: # surfaced, not swallowed (§6)
- logger.warning("digest embedding failed for %s: %s", digest_id, e)
-
- # Bridge to the underlying ruling if it is already in the library (INV-DIG3).
- await progress("linking", 95, "מנסה לקשר לפסק המקורי")
- linked_id = await try_autolink(digest_id, fields["underlying_citation"])
-
- await db.update_digest(digest_id, extraction_status="completed")
- await progress("completed", 100, "הושלם")
- return {
- "status": "completed",
- "digest_id": digest_id,
- "yomon_number": fields["yomon_number"],
- "underlying_citation": fields["underlying_citation"],
- "linked_case_law_id": linked_id,
- "fields_extracted": sorted(extracted.keys()),
- }
+# ── Linking (INV-DIG3) ──────────────────────────────────────────────
async def link_digest(digest_id: UUID | str, case_law_id: UUID | str) -> dict:
"""Manually link a digest to an underlying ruling (INV-DIG3). Idempotent."""
@@ -205,8 +294,7 @@ async def link_digest(digest_id: UUID | str, case_law_id: UUID | str) -> dict:
async def relink_digest(digest_id: UUID | str) -> dict:
- """Re-run autolink for a digest whose underlying ruling may now be in the
- library. No-op if already linked or no match found."""
+ """Re-run autolink for an unlinked digest. No-op if already linked / no match."""
digest = await db.get_digest(digest_id)
if not digest:
raise ValueError("digest not found")
@@ -222,6 +310,16 @@ async def relink_digest(digest_id: UUID | str) -> dict:
}
+async def unlink_digest(digest_id: UUID | str) -> dict:
+ """Clear a digest's link to the underlying ruling."""
+ updated = await db.link_digest_to_case_law(digest_id, None)
+ if updated is None:
+ raise ValueError("digest not found")
+ return {"unlinked": True, "digest_id": str(digest_id)}
+
+
+# ── Read / search (container-safe: DB + voyage) ─────────────────────
+
async def search_digests(
query: str,
practice_area: str = "",
@@ -255,14 +353,14 @@ async def list_digests(
offset: int = 0,
) -> list[dict]:
return await db.list_digests(
- practice_area=practice_area,
- concept_tag=concept_tag,
- linked=linked,
- search=search,
- limit=limit,
- offset=offset,
+ practice_area=practice_area, concept_tag=concept_tag, linked=linked,
+ search=search, limit=limit, offset=offset,
)
+async def update_digest(digest_id: UUID | str, **fields) -> dict | None:
+ return await db.update_digest(digest_id, **fields)
+
+
async def delete_digest(digest_id: UUID | str) -> bool:
return await db.delete_digest(digest_id)
diff --git a/mcp-server/src/legal_mcp/tools/digests.py b/mcp-server/src/legal_mcp/tools/digests.py
index 0ff0b19..468e2ef 100644
--- a/mcp-server/src/legal_mcp/tools/digests.py
+++ b/mcp-server/src/legal_mcp/tools/digests.py
@@ -159,3 +159,14 @@ async def search_digests(
if not results:
return empty("לא נמצאו יומונים תואמים.")
return _ok(results)
+
+
+async def digest_process_pending(limit: int = 20) -> str:
+ """ריקון תור היומונים שהועלו מה-UI וממתינים לעיבוד-LLM מקומי. מריץ חילוץ-
+ מטא-דאטה + embedding + autolink על כל יומון בסטטוס 'pending', מקומית עם
+ ה-CLI (claude_session local-only). מנקה לסטטוס 'completed'."""
+ try:
+ result = await digest_library.process_pending_digests(limit=limit)
+ except Exception as e:
+ return _err(str(e))
+ return _ok(result)
diff --git a/web-ui/src/app/digests/page.tsx b/web-ui/src/app/digests/page.tsx
new file mode 100644
index 0000000..ee55e43
--- /dev/null
+++ b/web-ui/src/app/digests/page.tsx
@@ -0,0 +1,82 @@
+"use client";
+
+import Link from "next/link";
+import { AppShell } from "@/components/app-shell";
+import { Card, CardContent } from "@/components/ui/card";
+import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
+import { Badge } from "@/components/ui/badge";
+import { DigestListPanel } from "@/components/digests/digest-list-panel";
+import { DigestSearchPanel } from "@/components/digests/digest-search-panel";
+import { useDigestPending } from "@/lib/api/digests";
+
+/**
+ * Digests radar page (X12) — a SECONDARY discovery layer ABOVE the citation
+ * corpora. Deliberately a SEPARATE page from /precedents to keep the
+ * authoritative/secondary boundary visible: a digest POINTS at a ruling, it is
+ * never cited in a decision (INV-DIG1) and never extracts halachot (INV-DIG2).
+ *
+ * Two tabs:
+ * - יומונים — browse + upload + link to the underlying ruling
+ * - חיפוש — semantic radar search
+ */
+
+function PendingBadge() {
+ const { data } = useDigestPending();
+ const n = data?.count ?? 0;
+ if (!n) return null;
+ return (
+
+ {n} ממתינים
+
+ );
+}
+
+export default function DigestsPage() {
+ return (
+
+
+
+
+
יומונים — רדאר פסיקה
+
+ סיכומי "כל יום" (עפר טויסטר) של פסקי דין והחלטות עדכניים.
+ שכבת-גילוי בלבד: כל יומון מצביע על פסק הדין המקורי —
+ הוא אינו מצוטט בהחלטה ואינו מחלץ הלכות. כשהפסק רלוונטי, מעלים אותו
+ לספריית הפסיקה ומצטטים משם.
+
+
+
+
+
+
+
+
+
+
+ יומונים
+
+
+ חיפוש
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ );
+}
diff --git a/web-ui/src/components/app-shell.tsx b/web-ui/src/components/app-shell.tsx
index 872ff7d..4eeb385 100644
--- a/web-ui/src/components/app-shell.tsx
+++ b/web-ui/src/components/app-shell.tsx
@@ -50,6 +50,7 @@ const NAV_GROUPS: NavGroup[] = [
id: "knowledge",
items: [
{ href: "/precedents", label: "ספריית פסיקה" },
+ { href: "/digests", label: "יומונים" },
{ href: "/missing-precedents", label: "פסיקה חסרה" },
{ href: "/goldset", label: "מדגם-זהב" },
{ href: "/training", label: "אימון סגנון" },
diff --git a/web-ui/src/components/digests/digest-card.tsx b/web-ui/src/components/digests/digest-card.tsx
new file mode 100644
index 0000000..f10de94
--- /dev/null
+++ b/web-ui/src/components/digests/digest-card.tsx
@@ -0,0 +1,103 @@
+"use client";
+
+import Link from "next/link";
+import type { ReactNode } from "react";
+import { Badge } from "@/components/ui/badge";
+import { practiceAreaLabel } from "@/components/precedents/practice-area";
+import type { Digest } from "@/lib/api/digests";
+
+function formatDate(iso: string | null) {
+ if (!iso) return "—";
+ try {
+ return new Date(iso).toLocaleDateString("he-IL");
+ } catch {
+ return iso;
+ }
+}
+
+/**
+ * Presentational card for a single digest ("כל יום" radar entry).
+ *
+ * A digest is a SECONDARY source that POINTS at a ruling — the card makes the
+ * pointer-not-citation nature explicit: the underlying ruling's citation is
+ * shown with a link-status badge (linked → the ruling is in the precedent
+ * library; unlinked → an open knowledge gap, INV-DIG3).
+ */
+export function DigestCard({
+ digest,
+ score,
+ actions,
+}: {
+ digest: Digest;
+ score?: number;
+ actions?: ReactNode;
+}) {
+ const linked = Boolean(digest.linked_case_law_id);
+ return (
+