feat(digests): Phase 2 — API endpoints + /digests UI (X12)
משטחי-משתמש לקורפוס היומונים: endpoints ב-FastAPI + דף UI נפרד /digests (לדפדוף, חיפוש, העלאה, וקישור לפסק המקורי). היומון נשאר מקור-משני המצביע על הפסק — אינו מצוטט בהחלטה (INV-DIG1) ואינו מחלץ הלכות (INV-DIG2). Backend (container-safe + local split): - digest_library: פוצל ל-create_pending_digest (CONTAINER-SAFE: stage+ extract_text+create row 'pending', בלי LLM) ↔ enrich_digest/ process_pending_digests (local: LLM+embed+autolink). ingest_digest מאחד. - db.list_pending_digests; MCP digest_process_pending (tool+server) — חלופה ל-batch script לריקון התור. - web/app.py: 10 endpoints /api/digests/* (upload/list/search/queue-pending/ get/patch/delete/link/relink/unlink). upload=INSERT-only pending (ה-LLM רץ מקומית — claude_session local-only). כולם מחזירים dict בדפוס precedent. Frontend (Next 16, ללא api:types — hooks עם טיפוסים hand-written כמו precedent-library.ts): - lib/api/digests.ts — hooks (useDigests/useDigestSearch/useDigestPending/ useUploadDigest/useLink/Relink/Unlink/Delete/Update). - דף /digests נפרד (לא כרטיסייה ב-/precedents — לשמור גבול סמכותי/משני, INV-DIG1): טאבים יומונים/חיפוש + DigestCard (badge קישור-לפסק) + DigestUploadDialog + pending badge. nav + header-context. אומת: backend round-trip מלא (create_pending→list_pending→process_pending→ search→restore); web-ui מתקמפל (webpack/tsc נקי, route /digests נוצר). הערה: build דיפולטי (turbopack) נכשל ב-worktree עקב symlink ל-node_modules — ב-CI/Docker (node_modules אמיתי) עובד; אומת עם --webpack. Invariants: מקיים INV-DIG1/2 (upload לא מחלץ הלכות, UI מציג "מצביע לא מצוטט"), INV-DIG3 (link/relink/queue). G4 (אין בליעה — שגיאות→toast/HTTP), G2 (מסלול נפרד, לא מקביל). X6 (חוזה UI↔API — endpoints בדפוס precedent; hooks hand-written כמו שאר ה-domain modules). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,18 +2,23 @@
|
||||
|
||||
A digest ("כל יום" daily one-pager) is a SECONDARY source that POINTS at a
|
||||
ruling — it is never cited in a decision (INV-DIG1) and never enters the
|
||||
precedent/halacha pipeline (INV-DIG2). Ingest is therefore a short, standalone
|
||||
path that reuses only ATOMIC services (extract_text, embeddings), NOT the
|
||||
canonical ``ingest.ingest_document`` (which is bound to case_law):
|
||||
precedent/halacha pipeline (INV-DIG2). Ingest reuses only ATOMIC services
|
||||
(extract_text, embeddings), NOT the canonical ``ingest.ingest_document``.
|
||||
|
||||
file → extract_text → content_hash (idempotent) → LLM metadata extract
|
||||
→ create_digest → single embedding (concept+headline+summary+analysis)
|
||||
→ try_autolink(underlying_citation → case_law) [INV-DIG3]
|
||||
→ extraction_status='completed'
|
||||
Two intake paths share one enrichment core:
|
||||
|
||||
- ``ingest_digest`` (local/MCP, e.g. batch script) — does everything
|
||||
synchronously: stage → extract_text → create →
|
||||
LLM enrich → embed → autolink → completed.
|
||||
- ``create_pending_digest`` (CONTAINER-SAFE — the web upload) — stage →
|
||||
extract_text → create row with status='pending'.
|
||||
No LLM, no embedding. ``process_pending_digests``
|
||||
(local/MCP) drains the queue and enriches.
|
||||
|
||||
claude_session rule: ``digest_metadata_extractor`` (local CLI) is imported
|
||||
LAZILY inside ``ingest_digest`` only, so this module is import-safe from the
|
||||
FastAPI container for the search/list/link/delete paths (DB + voyage only).
|
||||
LAZILY inside the enrichment core only, so this module stays import-safe from
|
||||
the FastAPI container for create_pending / search / list / link / delete
|
||||
(DB + voyage only — voyage embedding only runs in the local enrich path).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -42,13 +47,26 @@ async def _noop_progress(_status: str, _percent: int, _msg: str) -> None:
|
||||
return None
|
||||
|
||||
|
||||
def _embedding_text(fields: dict) -> str:
|
||||
def _coerce_date(v) -> date | None:
|
||||
if v is None or v == "":
|
||||
return None
|
||||
if isinstance(v, date):
|
||||
return v
|
||||
if isinstance(v, str):
|
||||
try:
|
||||
return date.fromisoformat(v[:10])
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _embedding_text(row: dict) -> str:
|
||||
"""The single vector indexes the digest as an atomic discovery unit."""
|
||||
parts = [
|
||||
fields.get("concept_tag", ""),
|
||||
fields.get("headline_holding", ""),
|
||||
fields.get("summary", ""),
|
||||
fields.get("analysis_text", ""),
|
||||
row.get("concept_tag", ""),
|
||||
row.get("headline_holding", ""),
|
||||
row.get("summary", ""),
|
||||
row.get("analysis_text", ""),
|
||||
]
|
||||
return "\n".join(p for p in parts if p).strip()
|
||||
|
||||
@@ -70,6 +88,161 @@ async def try_autolink(digest_id: UUID | str, underlying_citation: str) -> str |
|
||||
return str(match["id"])
|
||||
|
||||
|
||||
# ── Container-safe creation (web upload) — no LLM, no embedding ──────
|
||||
|
||||
async def create_pending_digest(
|
||||
*,
|
||||
file_path: str | Path,
|
||||
yomon_number: str = "",
|
||||
digest_date: date | str | None = None,
|
||||
practice_area: str = "",
|
||||
appeal_subtype: str = "",
|
||||
subject_tags: list[str] | None = None,
|
||||
progress: ProgressCb | None = None,
|
||||
) -> dict:
|
||||
"""Stage the file, extract text (PyMuPDF — container-safe), and create a
|
||||
digest row with extraction_status='pending'. The LLM metadata extraction,
|
||||
embedding, and autolink are deferred to ``process_pending_digests`` (local).
|
||||
|
||||
Returns {status, digest_id, extraction_status} or {status:'exists', ...}.
|
||||
Idempotent on content_hash (INV-G3).
|
||||
"""
|
||||
progress = progress or _noop_progress
|
||||
if practice_area and practice_area not in _VALID_PRACTICE_AREAS:
|
||||
raise ValueError(f"invalid practice_area: {practice_area!r}")
|
||||
src = Path(file_path)
|
||||
if not src.exists():
|
||||
raise ValueError(f"file not found: {file_path}")
|
||||
|
||||
await progress("staging", 10, "מעתיק קובץ")
|
||||
staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
|
||||
rel_path = str(staged.relative_to(config.DATA_DIR)) \
|
||||
if str(staged).startswith(str(config.DATA_DIR)) else str(staged)
|
||||
|
||||
await progress("extracting_text", 50, "מחלץ טקסט")
|
||||
raw_text, _pc, _off = await extractor.extract_text(str(staged))
|
||||
raw_text = (raw_text or "").strip()
|
||||
if not raw_text:
|
||||
raise ValueError("no text extracted from digest")
|
||||
|
||||
content_hash = db._content_hash(raw_text)
|
||||
existing = await db.get_digest_by_content_hash(content_hash)
|
||||
if existing:
|
||||
await progress("completed", 100, "יומון זהה כבר קיים")
|
||||
return {"status": "exists", "digest_id": existing["id"],
|
||||
"extraction_status": existing.get("extraction_status")}
|
||||
|
||||
record = await db.create_digest(
|
||||
analysis_text=raw_text,
|
||||
yomon_number=yomon_number.strip(),
|
||||
digest_date=_coerce_date(digest_date),
|
||||
practice_area=practice_area,
|
||||
appeal_subtype=appeal_subtype.strip(),
|
||||
subject_tags=list(subject_tags) if subject_tags else [],
|
||||
source_document_path=rel_path,
|
||||
extraction_status="pending",
|
||||
)
|
||||
await progress("queued", 100, "ממתין לעיבוד מקומי (LLM)")
|
||||
return {"status": "pending", "digest_id": record["id"],
|
||||
"extraction_status": "pending"}
|
||||
|
||||
|
||||
# ── Local enrichment core (LLM + embed + autolink) ──────────────────
|
||||
|
||||
async def enrich_digest(digest_id: UUID | str, progress: ProgressCb | None = None) -> dict:
|
||||
"""Run LLM metadata extraction over a digest's analysis_text, fill ONLY
|
||||
empty fields (preserve user-supplied values), embed, autolink, complete.
|
||||
|
||||
**MCP-tool-only path** (uses the local LLM extractor). Idempotent.
|
||||
"""
|
||||
progress = progress or _noop_progress
|
||||
row = await db.get_digest(digest_id)
|
||||
if not row:
|
||||
raise ValueError("digest not found")
|
||||
analysis = (row.get("analysis_text") or "").strip()
|
||||
if not analysis:
|
||||
await db.update_digest(digest_id, extraction_status="failed")
|
||||
return {"status": "no_text", "digest_id": str(digest_id)}
|
||||
|
||||
await db.update_digest(digest_id, extraction_status="processing")
|
||||
await progress("extracting_metadata", 40, "מחלץ מטא-דאטה (LLM)")
|
||||
from legal_mcp.services import digest_metadata_extractor
|
||||
extracted = await digest_metadata_extractor.extract(analysis)
|
||||
|
||||
# Fill only empty fields (preserve user-supplied values from the form).
|
||||
fields: dict = {}
|
||||
for key in ("yomon_number", "concept_tag", "headline_holding", "summary",
|
||||
"underlying_citation", "underlying_court", "underlying_judge",
|
||||
"practice_area", "appeal_subtype"):
|
||||
if not (row.get(key) or "").strip() and extracted.get(key):
|
||||
fields[key] = extracted[key]
|
||||
if row.get("digest_date") is None and extracted.get("digest_date"):
|
||||
fields["digest_date"] = extracted["digest_date"]
|
||||
if row.get("underlying_date") is None and extracted.get("underlying_date"):
|
||||
fields["underlying_date"] = extracted["underlying_date"]
|
||||
if not (row.get("subject_tags") or []) and extracted.get("subject_tags"):
|
||||
fields["subject_tags"] = extracted["subject_tags"]
|
||||
|
||||
if fields:
|
||||
await db.update_digest(digest_id, **fields)
|
||||
merged = await db.get_digest(digest_id)
|
||||
|
||||
await progress("embedding", 75, "מחשב embedding")
|
||||
emb_text = _embedding_text(merged)
|
||||
if emb_text:
|
||||
try:
|
||||
vecs = await embeddings.embed_texts([emb_text], input_type="document")
|
||||
if vecs:
|
||||
await db.store_digest_embedding(digest_id, vecs[0])
|
||||
except Exception as e: # surfaced, not swallowed (§6)
|
||||
logger.warning("digest embedding failed for %s: %s", digest_id, e)
|
||||
|
||||
await progress("linking", 90, "מנסה לקשר לפסק המקורי")
|
||||
linked_id = None
|
||||
if not merged.get("linked_case_law_id"):
|
||||
linked_id = await try_autolink(digest_id, merged.get("underlying_citation", ""))
|
||||
|
||||
await db.update_digest(digest_id, extraction_status="completed")
|
||||
await progress("completed", 100, "הושלם")
|
||||
return {
|
||||
"status": "completed",
|
||||
"digest_id": str(digest_id),
|
||||
"yomon_number": merged.get("yomon_number", ""),
|
||||
"underlying_citation": merged.get("underlying_citation", ""),
|
||||
"linked_case_law_id": merged.get("linked_case_law_id") or linked_id,
|
||||
"fields_filled": sorted(fields.keys()),
|
||||
}
|
||||
|
||||
|
||||
async def process_pending_digests(limit: int = 20) -> dict:
|
||||
"""Drain the digest extraction queue (rows stamped extraction_status='pending'
|
||||
by the web upload). Local/MCP only — runs the LLM enrichment per row.
|
||||
Sequential (avoids LLM rate-limit storms), mirrors process_pending_extractions."""
|
||||
pending = await db.list_pending_digests(limit=limit)
|
||||
if not pending:
|
||||
return {"status": "no_pending", "processed": 0, "results": []}
|
||||
results = []
|
||||
processed = 0
|
||||
for row in pending:
|
||||
did = row["id"]
|
||||
try:
|
||||
res = await enrich_digest(did)
|
||||
processed += 1
|
||||
results.append({"digest_id": str(did), "status": res.get("status"),
|
||||
"linked": bool(res.get("linked_case_law_id"))})
|
||||
except Exception as e:
|
||||
logger.exception("process_pending_digests failed for %s: %s", did, e)
|
||||
try:
|
||||
await db.update_digest(did, extraction_status="failed")
|
||||
except Exception:
|
||||
logger.exception("could not mark digest %s failed", did)
|
||||
results.append({"digest_id": str(did), "status": "failed", "error": str(e)})
|
||||
return {"status": "completed", "processed": processed,
|
||||
"total_pending": len(pending), "results": results}
|
||||
|
||||
|
||||
# ── Full synchronous ingest (local/MCP, e.g. batch script) ──────────
|
||||
|
||||
async def ingest_digest(
|
||||
*,
|
||||
file_path: str | Path,
|
||||
@@ -80,109 +253,25 @@ async def ingest_digest(
|
||||
subject_tags: list[str] | None = None,
|
||||
progress: ProgressCb | None = None,
|
||||
) -> dict:
|
||||
"""Ingest one digest. **MCP-tool-only** (uses the local LLM extractor).
|
||||
"""Ingest one digest synchronously. **MCP-tool-only** (uses the LLM).
|
||||
|
||||
User-supplied args win over LLM-extracted values for the same field
|
||||
(the chair typed them deliberately); empty args are filled from the LLM.
|
||||
Idempotent on yomon_number / content_hash (INV-G3).
|
||||
Creates the row (with any user-supplied values) then enriches in place.
|
||||
Idempotent on content_hash (INV-G3).
|
||||
"""
|
||||
progress = progress or _noop_progress
|
||||
if practice_area and practice_area not in _VALID_PRACTICE_AREAS:
|
||||
raise ValueError(f"invalid practice_area: {practice_area!r}")
|
||||
created = await create_pending_digest(
|
||||
file_path=file_path, yomon_number=yomon_number, digest_date=digest_date,
|
||||
practice_area=practice_area, appeal_subtype=appeal_subtype,
|
||||
subject_tags=subject_tags, progress=progress,
|
||||
)
|
||||
if created.get("status") == "exists":
|
||||
return created
|
||||
digest_id = created["digest_id"]
|
||||
enriched = await enrich_digest(digest_id, progress=progress)
|
||||
return enriched
|
||||
|
||||
src = Path(file_path)
|
||||
if not src.exists():
|
||||
raise ValueError(f"file not found: {file_path}")
|
||||
|
||||
await progress("staging", 5, "מעתיק קובץ")
|
||||
staged = ingest._stage_file(src, DIGEST_LIBRARY_DIR, "incoming")
|
||||
rel_path = str(staged.relative_to(config.DATA_DIR)) \
|
||||
if str(staged).startswith(str(config.DATA_DIR)) else str(staged)
|
||||
|
||||
await progress("extracting_text", 20, "מחלץ טקסט")
|
||||
raw_text, _page_count, _offsets = await extractor.extract_text(str(staged))
|
||||
raw_text = (raw_text or "").strip()
|
||||
if not raw_text:
|
||||
raise ValueError("no text extracted from digest")
|
||||
|
||||
# Idempotency: identical text already ingested → return existing row.
|
||||
content_hash = db._content_hash(raw_text)
|
||||
existing = await db.get_digest_by_content_hash(content_hash)
|
||||
if existing:
|
||||
await progress("completed", 100, "יומון זהה כבר קיים — לא נוצר כפל")
|
||||
return {
|
||||
"status": "exists",
|
||||
"digest_id": existing["id"],
|
||||
"yomon_number": existing.get("yomon_number", ""),
|
||||
"linked_case_law_id": existing.get("linked_case_law_id"),
|
||||
}
|
||||
|
||||
# LLM metadata extraction (lazy import — keeps this module container-safe).
|
||||
await progress("extracting_metadata", 45, "מחלץ מטא-דאטה (LLM)")
|
||||
from legal_mcp.services import digest_metadata_extractor
|
||||
extracted = await digest_metadata_extractor.extract(raw_text)
|
||||
|
||||
def _coerce_date(v) -> date | None:
|
||||
if v is None or v == "":
|
||||
return None
|
||||
if isinstance(v, date):
|
||||
return v
|
||||
if isinstance(v, str):
|
||||
try:
|
||||
return date.fromisoformat(v[:10])
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
# Merge: explicit user args win; otherwise fall back to LLM extraction.
|
||||
fields = {
|
||||
"analysis_text": raw_text,
|
||||
"yomon_number": yomon_number.strip() or extracted.get("yomon_number", ""),
|
||||
"digest_date": _coerce_date(digest_date) or extracted.get("digest_date"),
|
||||
"concept_tag": extracted.get("concept_tag", ""),
|
||||
"headline_holding": extracted.get("headline_holding", ""),
|
||||
"summary": extracted.get("summary", ""),
|
||||
"underlying_citation": extracted.get("underlying_citation", ""),
|
||||
"underlying_court": extracted.get("underlying_court", ""),
|
||||
"underlying_date": extracted.get("underlying_date"),
|
||||
"underlying_judge": extracted.get("underlying_judge", ""),
|
||||
"practice_area": practice_area or extracted.get("practice_area", ""),
|
||||
"appeal_subtype": appeal_subtype.strip() or extracted.get("appeal_subtype", ""),
|
||||
"subject_tags": list(subject_tags) if subject_tags else extracted.get("subject_tags", []),
|
||||
"source_document_path": rel_path,
|
||||
"extraction_status": "processing",
|
||||
}
|
||||
|
||||
await progress("storing", 70, "שומר רשומה")
|
||||
record = await db.create_digest(**fields)
|
||||
digest_id = record["id"]
|
||||
|
||||
# Single embedding for the whole digest (atomic discovery unit — X12 §6).
|
||||
await progress("embedding", 85, "מחשב embedding")
|
||||
emb_text = _embedding_text(fields)
|
||||
if emb_text:
|
||||
try:
|
||||
vecs = await embeddings.embed_texts([emb_text], input_type="document")
|
||||
if vecs:
|
||||
await db.store_digest_embedding(digest_id, vecs[0])
|
||||
except Exception as e: # surfaced, not swallowed (§6)
|
||||
logger.warning("digest embedding failed for %s: %s", digest_id, e)
|
||||
|
||||
# Bridge to the underlying ruling if it is already in the library (INV-DIG3).
|
||||
await progress("linking", 95, "מנסה לקשר לפסק המקורי")
|
||||
linked_id = await try_autolink(digest_id, fields["underlying_citation"])
|
||||
|
||||
await db.update_digest(digest_id, extraction_status="completed")
|
||||
await progress("completed", 100, "הושלם")
|
||||
return {
|
||||
"status": "completed",
|
||||
"digest_id": digest_id,
|
||||
"yomon_number": fields["yomon_number"],
|
||||
"underlying_citation": fields["underlying_citation"],
|
||||
"linked_case_law_id": linked_id,
|
||||
"fields_extracted": sorted(extracted.keys()),
|
||||
}
|
||||
|
||||
# ── Linking (INV-DIG3) ──────────────────────────────────────────────
|
||||
|
||||
async def link_digest(digest_id: UUID | str, case_law_id: UUID | str) -> dict:
|
||||
"""Manually link a digest to an underlying ruling (INV-DIG3). Idempotent."""
|
||||
@@ -205,8 +294,7 @@ async def link_digest(digest_id: UUID | str, case_law_id: UUID | str) -> dict:
|
||||
|
||||
|
||||
async def relink_digest(digest_id: UUID | str) -> dict:
|
||||
"""Re-run autolink for a digest whose underlying ruling may now be in the
|
||||
library. No-op if already linked or no match found."""
|
||||
"""Re-run autolink for an unlinked digest. No-op if already linked / no match."""
|
||||
digest = await db.get_digest(digest_id)
|
||||
if not digest:
|
||||
raise ValueError("digest not found")
|
||||
@@ -222,6 +310,16 @@ async def relink_digest(digest_id: UUID | str) -> dict:
|
||||
}
|
||||
|
||||
|
||||
async def unlink_digest(digest_id: UUID | str) -> dict:
|
||||
"""Clear a digest's link to the underlying ruling."""
|
||||
updated = await db.link_digest_to_case_law(digest_id, None)
|
||||
if updated is None:
|
||||
raise ValueError("digest not found")
|
||||
return {"unlinked": True, "digest_id": str(digest_id)}
|
||||
|
||||
|
||||
# ── Read / search (container-safe: DB + voyage) ─────────────────────
|
||||
|
||||
async def search_digests(
|
||||
query: str,
|
||||
practice_area: str = "",
|
||||
@@ -255,14 +353,14 @@ async def list_digests(
|
||||
offset: int = 0,
|
||||
) -> list[dict]:
|
||||
return await db.list_digests(
|
||||
practice_area=practice_area,
|
||||
concept_tag=concept_tag,
|
||||
linked=linked,
|
||||
search=search,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
practice_area=practice_area, concept_tag=concept_tag, linked=linked,
|
||||
search=search, limit=limit, offset=offset,
|
||||
)
|
||||
|
||||
|
||||
async def update_digest(digest_id: UUID | str, **fields) -> dict | None:
|
||||
return await db.update_digest(digest_id, **fields)
|
||||
|
||||
|
||||
async def delete_digest(digest_id: UUID | str) -> bool:
|
||||
return await db.delete_digest(digest_id)
|
||||
|
||||
Reference in New Issue
Block a user