diff --git a/docs/spec/X12-digests-radar.md b/docs/spec/X12-digests-radar.md index 5940f9b..b1c3cab 100644 --- a/docs/spec/X12-digests-radar.md +++ b/docs/spec/X12-digests-radar.md @@ -51,6 +51,19 @@ `digest_kind=''`** (מעולם לא סווג) — כך הודעה (kind=`announcement`, בלי citation) **אינה** נחשבת כשל ואינה מנוסה-מחדש לנצח. ההיוריסטיקה הישנה ("שני השדות ריקים") טיפלה בהודעות בטעות כ-retry אינסופי. +### 2.1 מקור שני ל-radar — העלון החודשי "עו"ד על נדל"ן" + +פרסום **נפרד** מהיומון היומי: עלון חודשי ממוספר (משרדי צבי שוב + רונית אלפר), **רב-נושאי** — מאמר-עומק, +עדכוני-חקיקה, וסט מצביעי-פסיקה מקובצים לפי נושא. נקלט **לאותה טבלת `digests`** (לא קורפוס מקביל — G2), +מובחן ע"י `publication='עו"ד על נדל"ן'` (מול `'כל יום'`). עלון אחד **מתפצל ל-N שורות** דרך +`bulletin_splitter` (LLM, local-only) → `bulletin_library.ingest_bulletin`: +- **מצביעי-פסיקה** → `digest_kind='decision'` — מצטרפים ל-radar ומקושרים לפסק (autolink + X13 כמו היומון). +- **מאמרים** → `digest_kind='article'` — טקסט-מלא + embedding לחיפוש-עומק; **רקע בלבד, INV-DIG1 חל** (לא מצוטט). +- **עדכוני-חקיקה — לא נקלטים** (החלטת יו"ר). + +מפתח-הדדאפ לפריט-עלון הוא **`content_hash` (per-פריט)**, כי `yomon_number` ריק (ה-upsert על yomon-number +לא חל; `uq_digests_content_hash` תופס re-runs). אידמפוטנטי. סקריפט: `scripts/ingest_bulletins.py`. + --- ## 3. למה זה לא קורפוס-ציטוט רביעי (הקושיה המרכזית — G2) diff --git a/mcp-server/src/legal_mcp/services/bulletin_library.py b/mcp-server/src/legal_mcp/services/bulletin_library.py new file mode 100644 index 0000000..af71f5a --- /dev/null +++ b/mcp-server/src/legal_mcp/services/bulletin_library.py @@ -0,0 +1,121 @@ +"""Ingest a monthly "עו"ד על נדל"ן" bulletin into the digests radar (X12). + +A bulletin PDF is multi-topic: it EXPLODES into several digest rows — one per +case-law pointer (digest_kind='decision') and one per article (digest_kind= +'article'), all tagged publication='עו"ד על נדל"ן' to distinguish them from the +daily "כל יום" issues. This reuses the existing radar (no parallel corpus — G2): +the case pointers join search_digests / the /digests page and autolink to the +underlying ruling exactly like a daily digest; articles are deep-context only. + +LOCAL-ONLY (LLM split + embedding) — host scripts/MCP, never the container path. +Idempotent: each item's content_hash (hash of its analysis_text) is the dedup +key, so re-running a bulletin skips already-ingested items. +""" +from __future__ import annotations + +import logging +from pathlib import Path + +from legal_mcp.services import db, embeddings, extractor +from legal_mcp.services import bulletin_splitter, digest_library + +logger = logging.getLogger(__name__) + +PUBLICATION = 'עו"ד על נדל"ן' +SOURCE_FIRM = "צבי שוב + רונית אלפר, עורכי דין" + + +async def _store_and_embed(digest_row: dict) -> None: + """Compute + store the single radar embedding for a freshly created item.""" + emb_text = digest_library._embedding_text(digest_row) + if not emb_text: + return + try: + vecs = await embeddings.embed_texts([emb_text], input_type="document") + if vecs: + await db.store_digest_embedding(digest_row["id"], vecs[0]) + except Exception as e: # §6 — surfaced, not swallowed + logger.warning("bulletin item embedding failed for %s: %s", digest_row.get("id"), e) + + +async def _create_item(*, analysis_text: str, kind: str, concept_tag: str, + headline: str, summary: str, citation: str, court: str, + practice_area: str, subject_tags: list[str], src: str) -> dict | None: + """Create one digest row from a bulletin item. Returns the row, or None if it + already exists (idempotent skip) or the insert raced on content_hash.""" + content_hash = db._content_hash(analysis_text) + if await db.get_digest_by_content_hash(content_hash): + return None + try: + return await db.create_digest( + analysis_text=analysis_text, + publication=PUBLICATION, + source_firm=SOURCE_FIRM, + concept_tag=concept_tag, + headline_holding=headline, + summary=summary, + underlying_citation=citation, + underlying_court=court, + practice_area=practice_area, + subject_tags=subject_tags, + source_document_path=src, + extraction_status="completed", + digest_kind=kind, + ) + except Exception as e: + # uq_digests_content_hash race (concurrent run) → treat as already-present. + if "uq_digests_content_hash" in str(e): + return None + raise + + +async def ingest_bulletin(file_path: str, model: str | None = None) -> dict: + """Split a bulletin PDF into digest rows (case pointers + articles). + + Returns counts: {cases, articles, created, skipped, linked}. Idempotent. + """ + path = str(file_path) + raw_text, _pages, _meta = await extractor.extract_text(path) + split = await bulletin_splitter.split(raw_text, model=model) + cases, articles = split.get("cases", []), split.get("articles", []) + + out = {"file": Path(path).name, "cases": len(cases), "articles": len(articles), + "created": 0, "skipped": 0, "linked": 0} + + for c in cases: + # analysis_text bundles the pointer's substance → stable per-item hash. + atext = "\n".join(p for p in ( + c["concept_tag"], c["headline_holding"], c["summary"], c["underlying_citation"] + ) if p).strip() + row = await _create_item( + analysis_text=atext, kind="decision", concept_tag=c["concept_tag"], + headline=c["headline_holding"], summary=c["summary"], + citation=c["underlying_citation"], court=c["underlying_court"], + practice_area=c["practice_area"], subject_tags=c["subject_tags"], src=path, + ) + if row is None: + out["skipped"] += 1 + continue + out["created"] += 1 + await _store_and_embed(row) + linked = await digest_library.try_autolink(row["id"], c["underlying_citation"]) + if linked: + out["linked"] += 1 + + for a in articles: + # The article body is the substance; prefix authors into the summary. + body = a["body"] or a["summary"] + summary = (f"מאת {a['authors']}. " if a["authors"] else "") + (a["summary"] or "") + atext = "\n".join(p for p in (a["title"], summary, body) if p).strip() + row = await _create_item( + analysis_text=atext, kind="article", concept_tag=a["title"], + headline=a["title"], summary=summary, citation="", court="", + practice_area=a["practice_area"], subject_tags=a["subject_tags"], src=path, + ) + if row is None: + out["skipped"] += 1 + continue + out["created"] += 1 + await _store_and_embed(row) + + return out diff --git a/mcp-server/src/legal_mcp/services/bulletin_splitter.py b/mcp-server/src/legal_mcp/services/bulletin_splitter.py new file mode 100644 index 0000000..d458933 --- /dev/null +++ b/mcp-server/src/legal_mcp/services/bulletin_splitter.py @@ -0,0 +1,147 @@ +"""Split a monthly "עו"ד על נדל"ן" bulletin into typed radar items (X12). + +The monthly bulletin (a SEPARATE publication from the daily "כל יום" digest) is +multi-topic: it bundles a featured ARTICLE, a list of legislative updates, and a +set of CASE-LAW pointers grouped by topic. The chair chose to catalog the +**case-law pointers** (each → a digest, like the daily issue) and the +**articles** (deep-context background) — legislative updates are skipped. + +This module is the LLM splitter only. ``bulletin_library.ingest_bulletin`` turns +its output into digest rows. Like the daily extractor it is LOCAL-ONLY (claude +CLI) and MUST NOT be imported from the FastAPI container path. +""" +from __future__ import annotations + +import logging + +from legal_mcp import config + +logger = logging.getLogger(__name__) + +_VALID_PRACTICE_AREAS = {"rishuy_uvniya", "betterment_levy", "compensation_197"} + +BULLETIN_SPLIT_PROMPT = """\ +אתה מקבל טקסט מלא של **עלון חודשי "עו"ד על נדל"ן"** (פרסום מקצועי רב-נושאי בתחום +תכנון ובנייה, מקרקעין, היטל השבחה, פיצויים והתחדשות עירונית). פצל אותו לפריטים. + +העלון בנוי משלושה חלקים: (א) **מאמר** מקצועי ארוך אחד או יותר; (ב) **עדכוני חקיקה** +(תיקוני-חוק, אישורי-תכניות, חוזרים) — **התעלם מהם, אל תחלץ**; (ג) **עדכוני פסיקה** +מקובצים לפי נושא — כל פריט = מראה-מקום של פסק דין/החלטה + שורת-תקציר. + +**אל תמציא** — חלץ רק מה שמופיע בטקסט. שדה חסר → מחרוזת ריקה. + +## פלט נדרש +החזר JSON אחד (object), ללא markdown: + +{ + "cases": [ + { + "underlying_citation": "מראה-המקום המלא של הפסק כפי שמופיע, מילה במילה (למשל 'ערר 8018-02-22 הועדה המקומית בת ים נ' קבוצת מזרחי ובניו השקעות בע\\"מ'). השדה הקריטי.", + "concept_tag": "הנושא/הכותרת שתחתיה מופיע הפריט (למשל 'היטל השבחה', 'הפקעות', 'פירוק שיתוף').", + "headline_holding": "שורת-התקציר/הכותרת של הפריט — מה נקבע/השאלה (למשל 'חוסר וודאות בין תכנית קודמת לבין ההקלה').", + "summary": "תקציר ניטרלי קצר אם יש פירוט נוסף בגוף; אחרת חזור על headline_holding.", + "underlying_court": "הערכאה אם מצוינת (למשל 'בית המשפט המחוזי', 'ועדת ערר').", + "practice_area": "אחד מ: 'rishuy_uvniya' / 'betterment_levy' / 'compensation_197' — אם ברור מהנושא; אחרת ריק.", + "subject_tags": ["2-5 תגיות snake_case בעברית"] + } + ], + "articles": [ + { + "title": "כותרת המאמר (למשל 'הפקעת קרקעות כיום - על המחוקק לתקן את העיוות שנוצר').", + "authors": "שמות המחברים (למשל 'עו\\"ד צבי שוב, עו\\"ד רונית אלפר').", + "summary": "2-4 משפטים: על מה המאמר ומה הטענה המרכזית.", + "body": "הטקסט המלא של המאמר (כל הפסקאות), לצורך embedding וחיפוש-עומק.", + "practice_area": "אחד מ-3 אם ברור; אחרת ריק.", + "subject_tags": ["2-5 תגיות snake_case"] + } + ] +} + +## כללים +1. **underlying_citation** — חלץ במלואו ובדיוק; הוא הגשר לפסק. פריט-פסיקה בלי מראה-מקום ברור → דלג עליו. +2. **cases** — כל מצביעי-הפסיקה בעלון, גם אם תחת נושאים שונים. אל תאחד פריטים נפרדים. +3. **articles** — רק מאמרי-עומק (לא רשימת עדכונים). body = הטקסט המלא. +4. **עדכוני חקיקה/אישורי-תכניות/חוזרים — לא לחלץ כלל.** +5. אם אין מאמר או אין פסיקה — החזר מערך ריק לאותו מפתח. +""" + + +def _norm_str(d: dict, key: str) -> str: + v = d.get(key) + return v.strip() if isinstance(v, str) else "" + + +def _norm_tags(d: dict) -> list[str]: + tags = d.get("subject_tags") + if not isinstance(tags, list): + return [] + return [str(t).strip() for t in tags if str(t).strip()][:8] + + +def _norm_pa(d: dict) -> str: + pa = _norm_str(d, "practice_area") + return pa if pa in _VALID_PRACTICE_AREAS else "" + + +async def split(raw_text: str, model: str | None = None) -> dict: + """Return ``{"cases": [...], "articles": [...]}`` extracted from a bulletin. + + Empty lists on any failure (surfaced as a warning, never raised) so the + batch keeps going. Each item is type-normalized; malformed items are dropped. + """ + from legal_mcp.services import claude_session + + text = (raw_text or "").strip() + if not text: + return {"cases": [], "articles": []} + + try: + result = await claude_session.query_json( + text, + system=BULLETIN_SPLIT_PROMPT, + model=(model or config.DIGEST_EXTRACT_MODEL or None), + tools="", # pure text→JSON; disable tools (avoids error_max_turns) + ) + except Exception as e: # §6 — surfaced, not swallowed + logger.warning("bulletin_splitter: query failed: %s", e) + return {"cases": [], "articles": []} + + if not isinstance(result, dict): + logger.warning("bulletin_splitter: expected dict, got %s", type(result).__name__) + return {"cases": [], "articles": []} + + cases: list[dict] = [] + for c in result.get("cases") or []: + if not isinstance(c, dict): + continue + citation = _norm_str(c, "underlying_citation") + if not citation: # rule 1: no anchor → skip + continue + cases.append({ + "underlying_citation": citation, + "concept_tag": _norm_str(c, "concept_tag"), + "headline_holding": _norm_str(c, "headline_holding"), + "summary": _norm_str(c, "summary") or _norm_str(c, "headline_holding"), + "underlying_court": _norm_str(c, "underlying_court"), + "practice_area": _norm_pa(c), + "subject_tags": _norm_tags(c), + }) + + articles: list[dict] = [] + for a in result.get("articles") or []: + if not isinstance(a, dict): + continue + title = _norm_str(a, "title") + body = _norm_str(a, "body") + if not (title or body): + continue + articles.append({ + "title": title, + "authors": _norm_str(a, "authors"), + "summary": _norm_str(a, "summary"), + "body": body, + "practice_area": _norm_pa(a), + "subject_tags": _norm_tags(a), + }) + + return {"cases": cases, "articles": articles} diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index 2af1df8..896372d 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -3667,10 +3667,12 @@ async def create_digest( subject_tags: list[str] | None = None, source_document_path: str = "", extraction_status: str = "processing", + digest_kind: str = "", ) -> dict: """Upsert a digest (X12). Idempotent on yomon_number (INV-G3): a repeat upload of the same yomon updates in place. content_hash is the secondary - dedup key for digests whose number couldn't be parsed.""" + dedup key for digests whose number couldn't be parsed (and the primary key + for bulletin items, which carry no yomon_number — see uq_digests_content_hash).""" pool = await get_pool() content_hash = _content_hash(analysis_text) async with pool.acquire() as conn: @@ -3684,10 +3686,10 @@ async def create_digest( headline_holding, analysis_text, summary, underlying_citation, underlying_court, underlying_date, underlying_judge, practice_area, appeal_subtype, subject_tags, source_document_path, - content_hash, extraction_status + content_hash, extraction_status, digest_kind ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, - $14, $15, $16, $17, $18 + $14, $15, $16, $17, $18, $19 ) ON CONFLICT (yomon_number) WHERE yomon_number <> '' DO UPDATE SET @@ -3708,6 +3710,7 @@ async def create_digest( source_document_path = COALESCE(NULLIF(EXCLUDED.source_document_path, ''), digests.source_document_path), content_hash = EXCLUDED.content_hash, extraction_status = EXCLUDED.extraction_status, + digest_kind = COALESCE(NULLIF(EXCLUDED.digest_kind, ''), digests.digest_kind), updated_at = now() RETURNING {_DIGEST_COLS} """, @@ -3715,7 +3718,7 @@ async def create_digest( headline_holding, analysis_text, summary, underlying_citation, underlying_court, underlying_date, underlying_judge, practice_area, appeal_subtype, list(subject_tags or []), source_document_path, - content_hash, extraction_status, + content_hash, extraction_status, digest_kind, ) return _row_to_digest(row) diff --git a/scripts/ingest_bulletins.py b/scripts/ingest_bulletins.py new file mode 100644 index 0000000..ca28eb3 --- /dev/null +++ b/scripts/ingest_bulletins.py @@ -0,0 +1,56 @@ +"""Ingest the monthly "עו"ד על נדל"ן" bulletin archive into the digests radar (X12). + +Each staged bulletin PDF (data/bulletins/incoming) is split by LLM into case-law +pointers (digest_kind='decision') + articles (digest_kind='article'), all tagged +publication='עו"ד על נדל"ן'. Idempotent — per-item content_hash dedup, so re-runs +only add new items. Runs on the HOST (LLM is local-only), with the mcp-server venv: + + mcp-server/.venv/bin/python scripts/ingest_bulletins.py [--dir PATH] [--limit N] +""" +import argparse +import asyncio +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src")) + +from legal_mcp.services import db, bulletin_library # noqa: E402 + +DEFAULT_DIR = Path(__file__).resolve().parent.parent / "data" / "bulletins" / "incoming" + + +async def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--dir", default=str(DEFAULT_DIR), help="folder of bulletin PDFs") + ap.add_argument("--limit", type=int, default=0, help="process at most N files (0=all)") + args = ap.parse_args() + + folder = Path(args.dir) + files = sorted(p for p in folder.glob("*.pdf")) + if args.limit: + files = files[: args.limit] + total = len(files) + print(f"ingesting {total} bulletins from {folder}", flush=True) + + await db.get_pool() + agg = {"cases": 0, "articles": 0, "created": 0, "skipped": 0, "linked": 0} + t0 = time.time() + for i, f in enumerate(files, 1): + try: + r = await bulletin_library.ingest_bulletin(str(f)) + for k in agg: + agg[k] += r.get(k, 0) + print(f"[{i}/{total}] {r['file']}: cases={r['cases']} articles={r['articles']} " + f"created={r['created']} skipped={r['skipped']} linked={r['linked']}", flush=True) + except Exception as e: # one bad bulletin must not abort the batch + print(f"[{i}/{total}] FAIL {f.name}: {type(e).__name__}: {e}", flush=True) + + print(f"\nDONE {total} bulletins in {(time.time()-t0)/60:.1f}min | " + f"cases={agg['cases']} articles={agg['articles']} created={agg['created']} " + f"skipped={agg['skipped']} linked={agg['linked']}", flush=True) + await db.close_pool() + + +if __name__ == "__main__": + asyncio.run(main())