feat(digests): קורפוס יומונים כשכבת-גילוי (radar) — X12

מאגר חדש ליומוני "כל יום" (עפר טויסטר) כשכבת-גילוי מעל קורפוסי-הפסיקה:
מקור-משני המצביע על פסק הדין המקורי, נקלט לטבלה נפרדת `digests`, נחפש
סמנטית, ומקושר לפסק המקורי בספריית הפסיקה — אך לעולם אינו מצוטט בהחלטה
ואינו מחלץ הלכות.

Phase 0 (spec):
- docs/spec/X12-digests-radar.md — INV-DIG1 (מצביע לא מצוטט) /
  INV-DIG2 (מסלול-קליטה נפרד, לא מקביל — מקיים G2) / INV-DIG3 (קישור-לפסק
  הוא הגשר; חוסר-קישור = פער גלוי). עדכון אינדקס 00/03/README.

Phase 1 (MVP):
- SCHEMA_V30: טבלת `digests` (HNSW על embedding — לא ivfflat, להימנע מ-recall
  cliff בקורפוס קטן/צומח) + GIN/FTS + UNIQUE חלקי ל-idempotent.
- services/digest_metadata_extractor.py — חילוץ-LLM (claude_session local-only,
  ייבוא lazy): תג-מושג, כותרת-הלכה, מראה-מקום, שני-תאריכים מובחנים, תגיות.
- services/digest_library.py — מסלול קצר עצמאי (INV-DIG2): extract→hash→LLM→
  embedding יחיד→autolink. לא משתמש ב-ingest.ingest_document.
- tools/digests.py + רישום 7 כלים ב-server.py (digest_upload/list/get/link/
  relink/delete + search_digests).
- scripts/ingest_digests_batch.py — קליטה ידנית מ-data/digests/incoming.
- legal-researcher.md: שלב 2ב.0 (סריקת-radar לפני אימות) + סעיף-דוח ט +
  3 כלים ב-frontmatter. HEARTBEAT §8: ניתוב יומון→digest_upload.

אומת end-to-end: 4 יומונים נקלטו (מטא-דאטה מדויק), חיפוש סמנטי מדרג נכון
("היטל השבחה"→5160, "תמא 38"→5158), link/relink/autolink/revert + מעטפת-MCP.

Invariants: מוסיף INV-DIG1/2/3 (X12). מקיים G2 (bounded context נפרד, לא
מסלול מקביל), G3 (idempotent upsert), G4 (אין בליעה שקטה — פער-קישור מוצף),
G9 (עקיבוּת — היומון מצביע על מקור עקיב). נוגע G7 (RRF) — נדחה, חיפוש
סמנטי-בלבד בשלב 1 (FTS index מוכן).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 17:49:00 +00:00
parent 9eaabffba4
commit 8171572cdd
13 changed files with 1353 additions and 5 deletions

View File

@@ -1287,6 +1287,71 @@ ALTER TABLE halacha_goldset ADD COLUMN IF NOT EXISTS ai_generated_at TIMESTAMPTZ
"""
SCHEMA_V30_SQL = """
-- digests (X12): Ofer Toister daily "כל יום" one-pagers. A SECONDARY,
-- discovery-layer ("radar") source — NOT authoritative law. Kept in its OWN
-- table (never case_law) so it cannot pollute the precedent corpus, never
-- enters the halacha pipeline (INV-DIG2), and is never cited directly in a
-- decision (INV-DIG1). Its only job is to point the researcher at the
-- UNDERLYING ruling, which is ingested separately into case_law and cited from
-- there. linked_case_law_id is the bridge (INV-DIG3): filled once the
-- underlying ruling is in the library; NULL = an open knowledge gap.
CREATE TABLE IF NOT EXISTS digests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
yomon_number TEXT NOT NULL DEFAULT '', -- "5163"
digest_date DATE, -- date of the yomon ISSUE
publication TEXT NOT NULL DEFAULT 'כל יום',
source_firm TEXT NOT NULL DEFAULT 'עפר טויסטר, עורכי דין',
concept_tag TEXT NOT NULL DEFAULT '', -- "שיקול הדעת המצומצם"
headline_holding TEXT NOT NULL DEFAULT '', -- bold subtitle = the holding
analysis_text TEXT NOT NULL DEFAULT '', -- the 1-2 page body (raw text)
summary TEXT NOT NULL DEFAULT '', -- 2-3 sentence LLM summary
underlying_citation TEXT NOT NULL DEFAULT '', -- 'עת"מ 46111-12-22 יכין-אפק...'
underlying_court TEXT NOT NULL DEFAULT '',
underlying_date DATE, -- date the RULING was given (≠ digest_date)
underlying_judge TEXT NOT NULL DEFAULT '',
practice_area TEXT NOT NULL DEFAULT '', -- rishuy_uvniya/betterment_levy/compensation_197
appeal_subtype TEXT NOT NULL DEFAULT '',
subject_tags TEXT[] NOT NULL DEFAULT '{}',
linked_case_law_id UUID REFERENCES case_law(id) ON DELETE SET NULL,
embedding vector(1024), -- single vector of concept+headline+summary+analysis
source_document_path TEXT NOT NULL DEFAULT '', -- staged PDF path (rel to DATA_DIR)
content_hash TEXT NOT NULL DEFAULT '', -- sha256 of extracted text — idempotent upload
extraction_status TEXT NOT NULL DEFAULT 'pending', -- pending/processing/completed/failed
content_tsv tsvector GENERATED ALWAYS AS (
to_tsvector('simple',
coalesce(concept_tag,'') || ' ' || coalesce(headline_holding,'') || ' ' ||
coalesce(summary,'') || ' ' || coalesce(analysis_text,''))
) STORED,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- Idempotent re-upload (INV-G3): same yomon number = same digest. yomon_number
-- can be '' transiently (before extraction), so the unique index is partial.
CREATE UNIQUE INDEX IF NOT EXISTS uq_digests_yomon_number
ON digests(yomon_number) WHERE yomon_number <> '';
-- Secondary dedup key when yomon_number couldn't be parsed.
CREATE UNIQUE INDEX IF NOT EXISTS uq_digests_content_hash
ON digests(content_hash) WHERE content_hash <> '';
-- HNSW (not ivfflat): the digests radar is a small, slowly-growing corpus
-- (~1/day). ivfflat trains `lists` centroids and probes a subset at query time,
-- so on a small table a single probe can hit an empty list and return 0 rows
-- (recall cliff). HNSW has no list-training/probe step — correct recall from
-- the first row — so it is the right index for a corpus that starts ~empty.
DROP INDEX IF EXISTS idx_digests_embedding; -- drop any pre-existing ivfflat
CREATE INDEX IF NOT EXISTS idx_digests_embedding_hnsw
ON digests USING hnsw (embedding vector_cosine_ops);
CREATE INDEX IF NOT EXISTS idx_digests_linked ON digests(linked_case_law_id);
CREATE INDEX IF NOT EXISTS idx_digests_practice_area ON digests(practice_area);
CREATE INDEX IF NOT EXISTS idx_digests_concept_tag ON digests(concept_tag);
CREATE INDEX IF NOT EXISTS idx_digests_subject_tags ON digests USING gin(subject_tags);
-- Lexical half of a future hybrid (Phase-1 search is semantic-only; index is ready).
CREATE INDEX IF NOT EXISTS idx_digests_content_tsv ON digests USING gin(content_tsv);
"""
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
async with pool.acquire() as conn:
await conn.execute(SCHEMA_SQL)
@@ -1319,7 +1384,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
await conn.execute(SCHEMA_V27_SQL)
await conn.execute(SCHEMA_V28_SQL)
await conn.execute(SCHEMA_V29_SQL)
logger.info("Database schema initialized (v1-v29)")
await conn.execute(SCHEMA_V30_SQL)
logger.info("Database schema initialized (v1-v30)")
async def init_schema() -> None:
@@ -3494,6 +3560,311 @@ async def delete_case_law(case_law_id: UUID) -> bool:
return result == "DELETE 1"
# ── Digests (X12 — radar layer; separate table, INV-DIG1/2/3) ────────
_DIGEST_COLS = (
"id, yomon_number, digest_date, publication, source_firm, concept_tag, "
"headline_holding, analysis_text, summary, underlying_citation, "
"underlying_court, underlying_date, underlying_judge, practice_area, "
"appeal_subtype, subject_tags, linked_case_law_id, source_document_path, "
"content_hash, extraction_status, created_at, updated_at"
)
_DIGEST_UPDATE_ALLOWED = {
"yomon_number", "digest_date", "publication", "source_firm", "concept_tag",
"headline_holding", "analysis_text", "summary", "underlying_citation",
"underlying_court", "underlying_date", "underlying_judge", "practice_area",
"appeal_subtype", "subject_tags", "source_document_path", "content_hash",
"extraction_status",
}
def _row_to_digest(row: asyncpg.Record | dict | None) -> dict | None:
"""Normalize a digests row: ISO-format dates, ensure subject_tags is a list."""
if row is None:
return None
d = dict(row)
for k in ("digest_date", "underlying_date", "created_at", "updated_at"):
if d.get(k) is not None and hasattr(d[k], "isoformat"):
d[k] = d[k].isoformat()
if d.get("subject_tags") is None:
d["subject_tags"] = []
if d.get("id") is not None:
d["id"] = str(d["id"])
if d.get("linked_case_law_id") is not None:
d["linked_case_law_id"] = str(d["linked_case_law_id"])
return d
async def create_digest(
*,
analysis_text: str,
yomon_number: str = "",
digest_date: date | None = None,
publication: str = "כל יום",
source_firm: str = "עפר טויסטר, עורכי דין",
concept_tag: str = "",
headline_holding: str = "",
summary: str = "",
underlying_citation: str = "",
underlying_court: str = "",
underlying_date: date | None = None,
underlying_judge: str = "",
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
source_document_path: str = "",
extraction_status: str = "processing",
) -> dict:
"""Upsert a digest (X12). Idempotent on yomon_number (INV-G3): a repeat
upload of the same yomon updates in place. content_hash is the secondary
dedup key for digests whose number couldn't be parsed."""
pool = await get_pool()
content_hash = _content_hash(analysis_text)
async with pool.acquire() as conn:
# Upsert on the partial unique index uq_digests_yomon_number
# (yomon_number WHERE yomon_number <> ''). Predicate repeated in
# ON CONFLICT as required for partial indexes.
row = await conn.fetchrow(
f"""
INSERT INTO digests (
yomon_number, digest_date, publication, source_firm, concept_tag,
headline_holding, analysis_text, summary, underlying_citation,
underlying_court, underlying_date, underlying_judge, practice_area,
appeal_subtype, subject_tags, source_document_path,
content_hash, extraction_status
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
$14, $15, $16, $17, $18
)
ON CONFLICT (yomon_number) WHERE yomon_number <> ''
DO UPDATE SET
digest_date = COALESCE(EXCLUDED.digest_date, digests.digest_date),
publication = EXCLUDED.publication,
source_firm = EXCLUDED.source_firm,
concept_tag = EXCLUDED.concept_tag,
headline_holding = EXCLUDED.headline_holding,
analysis_text = EXCLUDED.analysis_text,
summary = EXCLUDED.summary,
underlying_citation = EXCLUDED.underlying_citation,
underlying_court = EXCLUDED.underlying_court,
underlying_date = COALESCE(EXCLUDED.underlying_date, digests.underlying_date),
underlying_judge = EXCLUDED.underlying_judge,
practice_area = EXCLUDED.practice_area,
appeal_subtype = EXCLUDED.appeal_subtype,
subject_tags = EXCLUDED.subject_tags,
source_document_path = COALESCE(NULLIF(EXCLUDED.source_document_path, ''), digests.source_document_path),
content_hash = EXCLUDED.content_hash,
extraction_status = EXCLUDED.extraction_status,
updated_at = now()
RETURNING {_DIGEST_COLS}
""",
yomon_number, digest_date, publication, source_firm, concept_tag,
headline_holding, analysis_text, summary, underlying_citation,
underlying_court, underlying_date, underlying_judge, practice_area,
appeal_subtype, list(subject_tags or []), source_document_path,
content_hash, extraction_status,
)
return _row_to_digest(row)
async def get_digest(digest_id: UUID | str) -> dict | None:
pool = await get_pool()
cid = digest_id if isinstance(digest_id, UUID) else UUID(str(digest_id))
row = await pool.fetchrow(
f"SELECT {_DIGEST_COLS} FROM digests WHERE id = $1", cid,
)
return _row_to_digest(row)
async def get_digest_by_content_hash(content_hash: str) -> dict | None:
if not content_hash:
return None
pool = await get_pool()
row = await pool.fetchrow(
f"SELECT {_DIGEST_COLS} FROM digests WHERE content_hash = $1", content_hash,
)
return _row_to_digest(row)
async def update_digest(digest_id: UUID | str, **fields) -> dict | None:
"""Patch metadata fields on a digest row. Whitelist via _DIGEST_UPDATE_ALLOWED."""
cid = digest_id if isinstance(digest_id, UUID) else UUID(str(digest_id))
updates = {k: v for k, v in fields.items() if k in _DIGEST_UPDATE_ALLOWED}
if not updates:
return await get_digest(cid)
pool = await get_pool()
set_parts = []
params: list = [cid]
for i, (k, v) in enumerate(updates.items(), start=2):
if k == "subject_tags":
v = list(v or [])
set_parts.append(f"{k} = ${i}")
params.append(v)
set_parts.append("updated_at = now()")
sql = f"UPDATE digests SET {', '.join(set_parts)} WHERE id = $1 RETURNING {_DIGEST_COLS}"
row = await pool.fetchrow(sql, *params)
return _row_to_digest(row)
async def store_digest_embedding(digest_id: UUID | str, vector: list[float]) -> None:
pool = await get_pool()
cid = digest_id if isinstance(digest_id, UUID) else UUID(str(digest_id))
await pool.execute(
"UPDATE digests SET embedding = $2, updated_at = now() WHERE id = $1",
cid, vector,
)
async def link_digest_to_case_law(
digest_id: UUID | str, case_law_id: UUID | str | None,
) -> dict | None:
"""Set (or clear, with None) the bridge to the underlying ruling (INV-DIG3)."""
pool = await get_pool()
cid = digest_id if isinstance(digest_id, UUID) else UUID(str(digest_id))
clid = None
if case_law_id is not None:
clid = case_law_id if isinstance(case_law_id, UUID) else UUID(str(case_law_id))
row = await pool.fetchrow(
f"UPDATE digests SET linked_case_law_id = $2, updated_at = now() "
f"WHERE id = $1 RETURNING {_DIGEST_COLS}",
cid, clid,
)
return _row_to_digest(row)
async def delete_digest(digest_id: UUID | str) -> bool:
pool = await get_pool()
cid = digest_id if isinstance(digest_id, UUID) else UUID(str(digest_id))
result = await pool.execute("DELETE FROM digests WHERE id = $1", cid)
return result == "DELETE 1"
async def list_digests(
practice_area: str = "",
concept_tag: str = "",
linked: bool | None = None,
search: str = "",
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List digests with simple filters. linked=True/False filters on whether
the underlying ruling is in the library yet (INV-DIG3 gap surfacing)."""
pool = await get_pool()
conditions: list[str] = []
params: list = []
idx = 1
if practice_area:
conditions.append(f"practice_area = ${idx}")
params.append(practice_area)
idx += 1
if concept_tag:
conditions.append(f"concept_tag ILIKE ${idx}")
params.append(f"%{concept_tag}%")
idx += 1
if linked is True:
conditions.append("linked_case_law_id IS NOT NULL")
elif linked is False:
conditions.append("linked_case_law_id IS NULL")
if search:
conditions.append(
f"(yomon_number ILIKE ${idx} OR concept_tag ILIKE ${idx} "
f"OR headline_holding ILIKE ${idx} OR underlying_citation ILIKE ${idx} "
f"OR summary ILIKE ${idx})"
)
params.append(f"%{search}%")
idx += 1
where_sql = (" WHERE " + " AND ".join(conditions)) if conditions else ""
params.extend([limit, offset])
sql = (
f"SELECT {_DIGEST_COLS} FROM digests{where_sql} "
f"ORDER BY digest_date DESC NULLS LAST, created_at DESC "
f"LIMIT ${idx} OFFSET ${idx + 1}"
)
rows = await pool.fetch(sql, *params)
return [_row_to_digest(r) for r in rows]
async def search_digests_semantic(
query_embedding: list[float],
practice_area: str = "",
subject_tag: str = "",
concept_tag: str = "",
limit: int = 10,
) -> list[dict]:
"""Pure-semantic search over the digests radar (X12). Single vector per row
(no chunks/halachot), so no RRF here — see X12 §6. Joins the linked ruling's
citation when present so the researcher sees the pointer target directly."""
pool = await get_pool()
conditions = ["d.embedding IS NOT NULL"]
params: list = [query_embedding, limit]
idx = 3
if practice_area:
conditions.append(f"d.practice_area = ${idx}")
params.append(practice_area)
idx += 1
if subject_tag:
conditions.append(f"${idx} = ANY(d.subject_tags)")
params.append(subject_tag)
idx += 1
if concept_tag:
conditions.append(f"d.concept_tag ILIKE ${idx}")
params.append(f"%{concept_tag}%")
idx += 1
sql = f"""
SELECT {', '.join('d.' + c for c in _DIGEST_COLS.split(', '))},
cl.case_number AS linked_case_number,
cl.case_name AS linked_case_name,
cl.searchable AS linked_searchable,
1 - (d.embedding <=> $1) AS score
FROM digests d
LEFT JOIN case_law cl ON cl.id = d.linked_case_law_id
WHERE {' AND '.join(conditions)}
ORDER BY d.embedding <=> $1
LIMIT $2
"""
rows = await pool.fetch(sql, *params)
out = []
for r in rows:
d = _row_to_digest(r)
d["linked_case_number"] = r["linked_case_number"]
d["linked_case_name"] = r["linked_case_name"]
d["linked_searchable"] = r["linked_searchable"]
d["score"] = float(r["score"])
d["type"] = "digest"
out.append(d)
return out
async def find_case_law_by_citation_fuzzy(citation: str) -> dict | None:
"""Best-effort match of a digest's underlying_citation to a case_law row,
for autolink (INV-DIG3). Tries: (1) exact case_number; (2) canonical docket
substring (e.g. '46111-12-22') contained in a case_law.case_number. Returns
the first match or None — never raises, never mutates."""
citation = (citation or "").strip()
if not citation:
return None
pool = await get_pool()
row = await pool.fetchrow(
"SELECT * FROM case_law WHERE case_number = $1 LIMIT 1",
citation,
)
if row:
return _row_to_case_law(row)
# Extract a docket-like token: digits with '-' or '/' separators, e.g.
# 46111-12-22 or 3975/22. Match it as a substring of case_number.
m = re.search(r"\d+[-/]\d+(?:[-/]\d+)?", citation)
if not m:
return None
docket = m.group(0)
row = await pool.fetchrow(
"SELECT * FROM case_law "
"WHERE case_number ILIKE $1 ORDER BY created_at LIMIT 1",
f"%{docket}%",
)
return _row_to_case_law(row) if row else None
async def store_precedent_chunks(
case_law_id: UUID, chunks: list[dict],
) -> int: