Merge pull request 'feat(bulletins): catalog monthly "עו"ד על נדל"ן" bulletins into the radar (X12)' (#154) from worktree-bulletins-catalog into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m36s
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m36s
This commit was merged in pull request #154.
This commit is contained in:
@@ -51,6 +51,19 @@
|
||||
`digest_kind=''`** (מעולם לא סווג) — כך הודעה (kind=`announcement`, בלי citation) **אינה** נחשבת כשל
|
||||
ואינה מנוסה-מחדש לנצח. ההיוריסטיקה הישנה ("שני השדות ריקים") טיפלה בהודעות בטעות כ-retry אינסופי.
|
||||
|
||||
### 2.1 מקור שני ל-radar — העלון החודשי "עו"ד על נדל"ן"
|
||||
|
||||
פרסום **נפרד** מהיומון היומי: עלון חודשי ממוספר (משרדי צבי שוב + רונית אלפר), **רב-נושאי** — מאמר-עומק,
|
||||
עדכוני-חקיקה, וסט מצביעי-פסיקה מקובצים לפי נושא. נקלט **לאותה טבלת `digests`** (לא קורפוס מקביל — G2),
|
||||
מובחן ע"י `publication='עו"ד על נדל"ן'` (מול `'כל יום'`). עלון אחד **מתפצל ל-N שורות** דרך
|
||||
`bulletin_splitter` (LLM, local-only) → `bulletin_library.ingest_bulletin`:
|
||||
- **מצביעי-פסיקה** → `digest_kind='decision'` — מצטרפים ל-radar ומקושרים לפסק (autolink + X13 כמו היומון).
|
||||
- **מאמרים** → `digest_kind='article'` — טקסט-מלא + embedding לחיפוש-עומק; **רקע בלבד, INV-DIG1 חל** (לא מצוטט).
|
||||
- **עדכוני-חקיקה — לא נקלטים** (החלטת יו"ר).
|
||||
|
||||
מפתח-הדדאפ לפריט-עלון הוא **`content_hash` (per-פריט)**, כי `yomon_number` ריק (ה-upsert על yomon-number
|
||||
לא חל; `uq_digests_content_hash` תופס re-runs). אידמפוטנטי. סקריפט: `scripts/ingest_bulletins.py`.
|
||||
|
||||
---
|
||||
|
||||
## 3. למה זה לא קורפוס-ציטוט רביעי (הקושיה המרכזית — G2)
|
||||
|
||||
121
mcp-server/src/legal_mcp/services/bulletin_library.py
Normal file
121
mcp-server/src/legal_mcp/services/bulletin_library.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Ingest a monthly "עו"ד על נדל"ן" bulletin into the digests radar (X12).
|
||||
|
||||
A bulletin PDF is multi-topic: it EXPLODES into several digest rows — one per
|
||||
case-law pointer (digest_kind='decision') and one per article (digest_kind=
|
||||
'article'), all tagged publication='עו"ד על נדל"ן' to distinguish them from the
|
||||
daily "כל יום" issues. This reuses the existing radar (no parallel corpus — G2):
|
||||
the case pointers join search_digests / the /digests page and autolink to the
|
||||
underlying ruling exactly like a daily digest; articles are deep-context only.
|
||||
|
||||
LOCAL-ONLY (LLM split + embedding) — host scripts/MCP, never the container path.
|
||||
Idempotent: each item's content_hash (hash of its analysis_text) is the dedup
|
||||
key, so re-running a bulletin skips already-ingested items.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from legal_mcp.services import db, embeddings, extractor
|
||||
from legal_mcp.services import bulletin_splitter, digest_library
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PUBLICATION = 'עו"ד על נדל"ן'
|
||||
SOURCE_FIRM = "צבי שוב + רונית אלפר, עורכי דין"
|
||||
|
||||
|
||||
async def _store_and_embed(digest_row: dict) -> None:
|
||||
"""Compute + store the single radar embedding for a freshly created item."""
|
||||
emb_text = digest_library._embedding_text(digest_row)
|
||||
if not emb_text:
|
||||
return
|
||||
try:
|
||||
vecs = await embeddings.embed_texts([emb_text], input_type="document")
|
||||
if vecs:
|
||||
await db.store_digest_embedding(digest_row["id"], vecs[0])
|
||||
except Exception as e: # §6 — surfaced, not swallowed
|
||||
logger.warning("bulletin item embedding failed for %s: %s", digest_row.get("id"), e)
|
||||
|
||||
|
||||
async def _create_item(*, analysis_text: str, kind: str, concept_tag: str,
|
||||
headline: str, summary: str, citation: str, court: str,
|
||||
practice_area: str, subject_tags: list[str], src: str) -> dict | None:
|
||||
"""Create one digest row from a bulletin item. Returns the row, or None if it
|
||||
already exists (idempotent skip) or the insert raced on content_hash."""
|
||||
content_hash = db._content_hash(analysis_text)
|
||||
if await db.get_digest_by_content_hash(content_hash):
|
||||
return None
|
||||
try:
|
||||
return await db.create_digest(
|
||||
analysis_text=analysis_text,
|
||||
publication=PUBLICATION,
|
||||
source_firm=SOURCE_FIRM,
|
||||
concept_tag=concept_tag,
|
||||
headline_holding=headline,
|
||||
summary=summary,
|
||||
underlying_citation=citation,
|
||||
underlying_court=court,
|
||||
practice_area=practice_area,
|
||||
subject_tags=subject_tags,
|
||||
source_document_path=src,
|
||||
extraction_status="completed",
|
||||
digest_kind=kind,
|
||||
)
|
||||
except Exception as e:
|
||||
# uq_digests_content_hash race (concurrent run) → treat as already-present.
|
||||
if "uq_digests_content_hash" in str(e):
|
||||
return None
|
||||
raise
|
||||
|
||||
|
||||
async def ingest_bulletin(file_path: str, model: str | None = None) -> dict:
|
||||
"""Split a bulletin PDF into digest rows (case pointers + articles).
|
||||
|
||||
Returns counts: {cases, articles, created, skipped, linked}. Idempotent.
|
||||
"""
|
||||
path = str(file_path)
|
||||
raw_text, _pages, _meta = await extractor.extract_text(path)
|
||||
split = await bulletin_splitter.split(raw_text, model=model)
|
||||
cases, articles = split.get("cases", []), split.get("articles", [])
|
||||
|
||||
out = {"file": Path(path).name, "cases": len(cases), "articles": len(articles),
|
||||
"created": 0, "skipped": 0, "linked": 0}
|
||||
|
||||
for c in cases:
|
||||
# analysis_text bundles the pointer's substance → stable per-item hash.
|
||||
atext = "\n".join(p for p in (
|
||||
c["concept_tag"], c["headline_holding"], c["summary"], c["underlying_citation"]
|
||||
) if p).strip()
|
||||
row = await _create_item(
|
||||
analysis_text=atext, kind="decision", concept_tag=c["concept_tag"],
|
||||
headline=c["headline_holding"], summary=c["summary"],
|
||||
citation=c["underlying_citation"], court=c["underlying_court"],
|
||||
practice_area=c["practice_area"], subject_tags=c["subject_tags"], src=path,
|
||||
)
|
||||
if row is None:
|
||||
out["skipped"] += 1
|
||||
continue
|
||||
out["created"] += 1
|
||||
await _store_and_embed(row)
|
||||
linked = await digest_library.try_autolink(row["id"], c["underlying_citation"])
|
||||
if linked:
|
||||
out["linked"] += 1
|
||||
|
||||
for a in articles:
|
||||
# The article body is the substance; prefix authors into the summary.
|
||||
body = a["body"] or a["summary"]
|
||||
summary = (f"מאת {a['authors']}. " if a["authors"] else "") + (a["summary"] or "")
|
||||
atext = "\n".join(p for p in (a["title"], summary, body) if p).strip()
|
||||
row = await _create_item(
|
||||
analysis_text=atext, kind="article", concept_tag=a["title"],
|
||||
headline=a["title"], summary=summary, citation="", court="",
|
||||
practice_area=a["practice_area"], subject_tags=a["subject_tags"], src=path,
|
||||
)
|
||||
if row is None:
|
||||
out["skipped"] += 1
|
||||
continue
|
||||
out["created"] += 1
|
||||
await _store_and_embed(row)
|
||||
|
||||
return out
|
||||
147
mcp-server/src/legal_mcp/services/bulletin_splitter.py
Normal file
147
mcp-server/src/legal_mcp/services/bulletin_splitter.py
Normal file
@@ -0,0 +1,147 @@
|
||||
"""Split a monthly "עו"ד על נדל"ן" bulletin into typed radar items (X12).
|
||||
|
||||
The monthly bulletin (a SEPARATE publication from the daily "כל יום" digest) is
|
||||
multi-topic: it bundles a featured ARTICLE, a list of legislative updates, and a
|
||||
set of CASE-LAW pointers grouped by topic. The chair chose to catalog the
|
||||
**case-law pointers** (each → a digest, like the daily issue) and the
|
||||
**articles** (deep-context background) — legislative updates are skipped.
|
||||
|
||||
This module is the LLM splitter only. ``bulletin_library.ingest_bulletin`` turns
|
||||
its output into digest rows. Like the daily extractor it is LOCAL-ONLY (claude
|
||||
CLI) and MUST NOT be imported from the FastAPI container path.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from legal_mcp import config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_VALID_PRACTICE_AREAS = {"rishuy_uvniya", "betterment_levy", "compensation_197"}
|
||||
|
||||
BULLETIN_SPLIT_PROMPT = """\
|
||||
אתה מקבל טקסט מלא של **עלון חודשי "עו"ד על נדל"ן"** (פרסום מקצועי רב-נושאי בתחום
|
||||
תכנון ובנייה, מקרקעין, היטל השבחה, פיצויים והתחדשות עירונית). פצל אותו לפריטים.
|
||||
|
||||
העלון בנוי משלושה חלקים: (א) **מאמר** מקצועי ארוך אחד או יותר; (ב) **עדכוני חקיקה**
|
||||
(תיקוני-חוק, אישורי-תכניות, חוזרים) — **התעלם מהם, אל תחלץ**; (ג) **עדכוני פסיקה**
|
||||
מקובצים לפי נושא — כל פריט = מראה-מקום של פסק דין/החלטה + שורת-תקציר.
|
||||
|
||||
**אל תמציא** — חלץ רק מה שמופיע בטקסט. שדה חסר → מחרוזת ריקה.
|
||||
|
||||
## פלט נדרש
|
||||
החזר JSON אחד (object), ללא markdown:
|
||||
|
||||
{
|
||||
"cases": [
|
||||
{
|
||||
"underlying_citation": "מראה-המקום המלא של הפסק כפי שמופיע, מילה במילה (למשל 'ערר 8018-02-22 הועדה המקומית בת ים נ' קבוצת מזרחי ובניו השקעות בע\\"מ'). השדה הקריטי.",
|
||||
"concept_tag": "הנושא/הכותרת שתחתיה מופיע הפריט (למשל 'היטל השבחה', 'הפקעות', 'פירוק שיתוף').",
|
||||
"headline_holding": "שורת-התקציר/הכותרת של הפריט — מה נקבע/השאלה (למשל 'חוסר וודאות בין תכנית קודמת לבין ההקלה').",
|
||||
"summary": "תקציר ניטרלי קצר אם יש פירוט נוסף בגוף; אחרת חזור על headline_holding.",
|
||||
"underlying_court": "הערכאה אם מצוינת (למשל 'בית המשפט המחוזי', 'ועדת ערר').",
|
||||
"practice_area": "אחד מ: 'rishuy_uvniya' / 'betterment_levy' / 'compensation_197' — אם ברור מהנושא; אחרת ריק.",
|
||||
"subject_tags": ["2-5 תגיות snake_case בעברית"]
|
||||
}
|
||||
],
|
||||
"articles": [
|
||||
{
|
||||
"title": "כותרת המאמר (למשל 'הפקעת קרקעות כיום - על המחוקק לתקן את העיוות שנוצר').",
|
||||
"authors": "שמות המחברים (למשל 'עו\\"ד צבי שוב, עו\\"ד רונית אלפר').",
|
||||
"summary": "2-4 משפטים: על מה המאמר ומה הטענה המרכזית.",
|
||||
"body": "הטקסט המלא של המאמר (כל הפסקאות), לצורך embedding וחיפוש-עומק.",
|
||||
"practice_area": "אחד מ-3 אם ברור; אחרת ריק.",
|
||||
"subject_tags": ["2-5 תגיות snake_case"]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
## כללים
|
||||
1. **underlying_citation** — חלץ במלואו ובדיוק; הוא הגשר לפסק. פריט-פסיקה בלי מראה-מקום ברור → דלג עליו.
|
||||
2. **cases** — כל מצביעי-הפסיקה בעלון, גם אם תחת נושאים שונים. אל תאחד פריטים נפרדים.
|
||||
3. **articles** — רק מאמרי-עומק (לא רשימת עדכונים). body = הטקסט המלא.
|
||||
4. **עדכוני חקיקה/אישורי-תכניות/חוזרים — לא לחלץ כלל.**
|
||||
5. אם אין מאמר או אין פסיקה — החזר מערך ריק לאותו מפתח.
|
||||
"""
|
||||
|
||||
|
||||
def _norm_str(d: dict, key: str) -> str:
|
||||
v = d.get(key)
|
||||
return v.strip() if isinstance(v, str) else ""
|
||||
|
||||
|
||||
def _norm_tags(d: dict) -> list[str]:
|
||||
tags = d.get("subject_tags")
|
||||
if not isinstance(tags, list):
|
||||
return []
|
||||
return [str(t).strip() for t in tags if str(t).strip()][:8]
|
||||
|
||||
|
||||
def _norm_pa(d: dict) -> str:
|
||||
pa = _norm_str(d, "practice_area")
|
||||
return pa if pa in _VALID_PRACTICE_AREAS else ""
|
||||
|
||||
|
||||
async def split(raw_text: str, model: str | None = None) -> dict:
|
||||
"""Return ``{"cases": [...], "articles": [...]}`` extracted from a bulletin.
|
||||
|
||||
Empty lists on any failure (surfaced as a warning, never raised) so the
|
||||
batch keeps going. Each item is type-normalized; malformed items are dropped.
|
||||
"""
|
||||
from legal_mcp.services import claude_session
|
||||
|
||||
text = (raw_text or "").strip()
|
||||
if not text:
|
||||
return {"cases": [], "articles": []}
|
||||
|
||||
try:
|
||||
result = await claude_session.query_json(
|
||||
text,
|
||||
system=BULLETIN_SPLIT_PROMPT,
|
||||
model=(model or config.DIGEST_EXTRACT_MODEL or None),
|
||||
tools="", # pure text→JSON; disable tools (avoids error_max_turns)
|
||||
)
|
||||
except Exception as e: # §6 — surfaced, not swallowed
|
||||
logger.warning("bulletin_splitter: query failed: %s", e)
|
||||
return {"cases": [], "articles": []}
|
||||
|
||||
if not isinstance(result, dict):
|
||||
logger.warning("bulletin_splitter: expected dict, got %s", type(result).__name__)
|
||||
return {"cases": [], "articles": []}
|
||||
|
||||
cases: list[dict] = []
|
||||
for c in result.get("cases") or []:
|
||||
if not isinstance(c, dict):
|
||||
continue
|
||||
citation = _norm_str(c, "underlying_citation")
|
||||
if not citation: # rule 1: no anchor → skip
|
||||
continue
|
||||
cases.append({
|
||||
"underlying_citation": citation,
|
||||
"concept_tag": _norm_str(c, "concept_tag"),
|
||||
"headline_holding": _norm_str(c, "headline_holding"),
|
||||
"summary": _norm_str(c, "summary") or _norm_str(c, "headline_holding"),
|
||||
"underlying_court": _norm_str(c, "underlying_court"),
|
||||
"practice_area": _norm_pa(c),
|
||||
"subject_tags": _norm_tags(c),
|
||||
})
|
||||
|
||||
articles: list[dict] = []
|
||||
for a in result.get("articles") or []:
|
||||
if not isinstance(a, dict):
|
||||
continue
|
||||
title = _norm_str(a, "title")
|
||||
body = _norm_str(a, "body")
|
||||
if not (title or body):
|
||||
continue
|
||||
articles.append({
|
||||
"title": title,
|
||||
"authors": _norm_str(a, "authors"),
|
||||
"summary": _norm_str(a, "summary"),
|
||||
"body": body,
|
||||
"practice_area": _norm_pa(a),
|
||||
"subject_tags": _norm_tags(a),
|
||||
})
|
||||
|
||||
return {"cases": cases, "articles": articles}
|
||||
@@ -3667,10 +3667,12 @@ async def create_digest(
|
||||
subject_tags: list[str] | None = None,
|
||||
source_document_path: str = "",
|
||||
extraction_status: str = "processing",
|
||||
digest_kind: str = "",
|
||||
) -> dict:
|
||||
"""Upsert a digest (X12). Idempotent on yomon_number (INV-G3): a repeat
|
||||
upload of the same yomon updates in place. content_hash is the secondary
|
||||
dedup key for digests whose number couldn't be parsed."""
|
||||
dedup key for digests whose number couldn't be parsed (and the primary key
|
||||
for bulletin items, which carry no yomon_number — see uq_digests_content_hash)."""
|
||||
pool = await get_pool()
|
||||
content_hash = _content_hash(analysis_text)
|
||||
async with pool.acquire() as conn:
|
||||
@@ -3684,10 +3686,10 @@ async def create_digest(
|
||||
headline_holding, analysis_text, summary, underlying_citation,
|
||||
underlying_court, underlying_date, underlying_judge, practice_area,
|
||||
appeal_subtype, subject_tags, source_document_path,
|
||||
content_hash, extraction_status
|
||||
content_hash, extraction_status, digest_kind
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
|
||||
$14, $15, $16, $17, $18
|
||||
$14, $15, $16, $17, $18, $19
|
||||
)
|
||||
ON CONFLICT (yomon_number) WHERE yomon_number <> ''
|
||||
DO UPDATE SET
|
||||
@@ -3708,6 +3710,7 @@ async def create_digest(
|
||||
source_document_path = COALESCE(NULLIF(EXCLUDED.source_document_path, ''), digests.source_document_path),
|
||||
content_hash = EXCLUDED.content_hash,
|
||||
extraction_status = EXCLUDED.extraction_status,
|
||||
digest_kind = COALESCE(NULLIF(EXCLUDED.digest_kind, ''), digests.digest_kind),
|
||||
updated_at = now()
|
||||
RETURNING {_DIGEST_COLS}
|
||||
""",
|
||||
@@ -3715,7 +3718,7 @@ async def create_digest(
|
||||
headline_holding, analysis_text, summary, underlying_citation,
|
||||
underlying_court, underlying_date, underlying_judge, practice_area,
|
||||
appeal_subtype, list(subject_tags or []), source_document_path,
|
||||
content_hash, extraction_status,
|
||||
content_hash, extraction_status, digest_kind,
|
||||
)
|
||||
return _row_to_digest(row)
|
||||
|
||||
|
||||
56
scripts/ingest_bulletins.py
Normal file
56
scripts/ingest_bulletins.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Ingest the monthly "עו"ד על נדל"ן" bulletin archive into the digests radar (X12).
|
||||
|
||||
Each staged bulletin PDF (data/bulletins/incoming) is split by LLM into case-law
|
||||
pointers (digest_kind='decision') + articles (digest_kind='article'), all tagged
|
||||
publication='עו"ד על נדל"ן'. Idempotent — per-item content_hash dedup, so re-runs
|
||||
only add new items. Runs on the HOST (LLM is local-only), with the mcp-server venv:
|
||||
|
||||
mcp-server/.venv/bin/python scripts/ingest_bulletins.py [--dir PATH] [--limit N]
|
||||
"""
|
||||
import argparse
|
||||
import asyncio
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "mcp-server" / "src"))
|
||||
|
||||
from legal_mcp.services import db, bulletin_library # noqa: E402
|
||||
|
||||
DEFAULT_DIR = Path(__file__).resolve().parent.parent / "data" / "bulletins" / "incoming"
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--dir", default=str(DEFAULT_DIR), help="folder of bulletin PDFs")
|
||||
ap.add_argument("--limit", type=int, default=0, help="process at most N files (0=all)")
|
||||
args = ap.parse_args()
|
||||
|
||||
folder = Path(args.dir)
|
||||
files = sorted(p for p in folder.glob("*.pdf"))
|
||||
if args.limit:
|
||||
files = files[: args.limit]
|
||||
total = len(files)
|
||||
print(f"ingesting {total} bulletins from {folder}", flush=True)
|
||||
|
||||
await db.get_pool()
|
||||
agg = {"cases": 0, "articles": 0, "created": 0, "skipped": 0, "linked": 0}
|
||||
t0 = time.time()
|
||||
for i, f in enumerate(files, 1):
|
||||
try:
|
||||
r = await bulletin_library.ingest_bulletin(str(f))
|
||||
for k in agg:
|
||||
agg[k] += r.get(k, 0)
|
||||
print(f"[{i}/{total}] {r['file']}: cases={r['cases']} articles={r['articles']} "
|
||||
f"created={r['created']} skipped={r['skipped']} linked={r['linked']}", flush=True)
|
||||
except Exception as e: # one bad bulletin must not abort the batch
|
||||
print(f"[{i}/{total}] FAIL {f.name}: {type(e).__name__}: {e}", flush=True)
|
||||
|
||||
print(f"\nDONE {total} bulletins in {(time.time()-t0)/60:.1f}min | "
|
||||
f"cases={agg['cases']} articles={agg['articles']} created={agg['created']} "
|
||||
f"skipped={agg['skipped']} linked={agg['linked']}", flush=True)
|
||||
await db.close_pool()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user