#!/usr/bin/env python3 """T1 — אכלוס style_exemplars מקורפוס דפנה (style_corpus + internal_committee). מפצל כל החלטה של דפנה לסעיפים (chunker._split_into_sections), ומכל סעיף לפסקאות, מטמיע (Voyage) ושומר ב-style_exemplars עם section/outcome/practice_area — כדי שהכותב יוכל לאחזר פסקאות-בלוק אמיתיות של דפנה בזמן כתיבה (T2/T3). מקור-סגנון בלבד (INV-LRN5) — לא מהות. אידמפוטנטי: מנקה לכל decision לפני הכנסה. שימוש: python3 scripts/backfill_style_exemplars.py # dry-run (סופר בלבד) python3 scripts/backfill_style_exemplars.py --apply # מטמיע ושומר דורש POSTGRES_URL + מפתח Voyage בסביבה (כמו שאר ה-MCP). """ from __future__ import annotations import argparse import asyncio import logging from legal_mcp.services import db, embeddings from legal_mcp.services.chunker import _split_into_sections logging.basicConfig(level=logging.INFO, format="%(message)s") log = logging.getLogger("backfill_exemplars") # chunker section_type → style_exemplars.section _SECTION_MAP = { "facts": "background", "appellant_claims": "claims", "respondent_claims": "claims", "legal_analysis": "discussion", "conclusion": "summary", "ruling": "summary", "intro": "other", "other": "other", } MIN_WORDS = 25 # skip tiny fragments MAX_WORDS = 450 # skip over-long blobs (likely un-split) MAX_PER_SECTION = 15 def _paragraphs(section_text: str) -> list[str]: """Split a section into paragraph units (blank-line separated; fall back to lines).""" raw = [p.strip() for p in section_text.split("\n\n")] if len(raw) <= 1: raw = [p.strip() for p in section_text.split("\n")] out = [] for p in raw: wc = len(p.split()) if MIN_WORDS <= wc <= MAX_WORDS: out.append(p) return out[:MAX_PER_SECTION] async def _gather_sources() -> list[dict]: """All of Dafna's decisions: style_corpus + internal_committee (chair דפנה).""" pool = await db.get_pool() rows: list[dict] = [] async with pool.acquire() as conn: sc = await conn.fetch( "SELECT decision_number, full_text, outcome, practice_area " "FROM style_corpus WHERE full_text <> ''" ) for r in sc: rows.append({ "decision_number": r["decision_number"] or "", "source": "style_corpus", "full_text": r["full_text"], "outcome": r["outcome"] or "", "practice_area": r["practice_area"] or "", }) ic = await conn.fetch( "SELECT case_number, full_text, practice_area FROM case_law " "WHERE source_kind = 'internal_committee' AND coalesce(chair_name,'') LIKE '%דפנה%' " "AND coalesce(full_text,'') <> ''" ) for r in ic: rows.append({ "decision_number": r["case_number"] or "", "source": "internal_committee", "full_text": r["full_text"], "outcome": "", "practice_area": r["practice_area"] or "", }) return rows async def main(apply: bool) -> None: sources = await _gather_sources() log.info("מקורות: %d החלטות של דפנה (style_corpus + internal_committee)", len(sources)) total_paras = 0 for src in sources: units: list[tuple[str, str]] = [] # (section, paragraph) for section_type, section_text in _split_into_sections(src["full_text"]): section = _SECTION_MAP.get(section_type, "other") for para in _paragraphs(section_text): units.append((section, para)) if not units: continue total_paras += len(units) log.info(" %-14s %-16s → %d פסקאות", src["source"], src["decision_number"], len(units)) if not apply: continue await db.delete_style_exemplars(src["decision_number"], src["source"]) texts = [u[1] for u in units] vecs = await embeddings.embed_texts(texts, input_type="document") for (section, para), vec in zip(units, vecs): await db.insert_style_exemplar( decision_number=src["decision_number"], source=src["source"], practice_area=src["practice_area"], outcome=src["outcome"], section=section, paragraph_text=para, word_count=len(para.split()), embedding=vec, ) if apply: cov = await db.count_style_exemplars() log.info("הושלם. style_exemplars: %s", cov) else: log.info("dry-run: %d פסקאות יוטמעו. הרץ --apply לביצוע.", total_paras) if __name__ == "__main__": ap = argparse.ArgumentParser() ap.add_argument("--apply", action="store_true", help="embed + insert (default: dry-run)") args = ap.parse_args() asyncio.run(main(args.apply))