All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m27s
Adds a third corpus of legal authority distinct from style_corpus (Daphna's prior decisions for voice) and case_precedents (chair-attached quotes per case). The new corpus holds chair-uploaded court rulings and other appeals committee decisions, with binding rules (הלכות) extracted automatically and queued for chair approval. Pipeline (web/app.py + services/precedent_library.py): file → extract → chunk → Voyage embed → halacha_extractor → store + publish progress over the existing Redis SSE channel. Schema V7 (services/db.py): extends case_law with source_kind + extraction status fields under a CHECK constraint pinning practice_area to the three appeals committee domains (rishuy_uvniya, betterment_levy, compensation_197). New precedent_chunks (vector(1024)) and halachot tables (vector(1024) over rule_statement, IVFFlat indexes, gin on practice_areas/subject_tags). Halachot start as pending_review; only approved/published rows are visible to search_precedent_library. Agents: legal-writer, legal-researcher, legal-analyst, legal-ceo, legal-qa get search_precedent_library. legal-writer prompt explains the three-corpus distinction and CREAC use; legal-qa now verifies that every cited halacha resolves to an approved row in the corpus. UI: /precedents page with four tabs — library / semantic search / pending review (J/K nav, A/R/E shortcuts, badge count) / stats. Reuses the existing upload-sheet progress + SSE pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
133 lines
4.2 KiB
Python
133 lines
4.2 KiB
Python
"""Legal document chunker - splits text into sections and chunks for RAG."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
|
|
from legal_mcp import config
|
|
|
|
# Hebrew legal section headers.
|
|
# Covers both appeals committee decisions and external court rulings —
|
|
# court rulings use slightly different vocabulary (פסק דין, נימוקים, סוף דבר).
|
|
SECTION_PATTERNS = [
|
|
(r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע", "facts"),
|
|
(r"טענות\s*העוררי[םן]|טענות\s*המערערי[םן]|עיקר\s*טענות\s*העוררי[םן]", "appellant_claims"),
|
|
(r"טענות\s*המשיבי[םן]|תשובת\s*המשיבי[םן]|עיקר\s*טענות\s*המשיבי[םן]", "respondent_claims"),
|
|
(r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|נימוקים", "legal_analysis"),
|
|
(r"מסקנ[הות]|סיכום|סוף\s*דבר", "conclusion"),
|
|
(r"פסק[- ]?דין|החלטה|לפיכך\s*אני\s*מחליט|התוצאה", "ruling"),
|
|
(r"מבוא|פתיחה|לפניי", "intro"),
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class Chunk:
|
|
content: str
|
|
section_type: str = "other"
|
|
page_number: int | None = None
|
|
chunk_index: int = 0
|
|
|
|
|
|
def chunk_document(
|
|
text: str,
|
|
chunk_size: int = config.CHUNK_SIZE_TOKENS,
|
|
overlap: int = config.CHUNK_OVERLAP_TOKENS,
|
|
) -> list[Chunk]:
|
|
"""Split a legal document into chunks, respecting section boundaries."""
|
|
if not text.strip():
|
|
return []
|
|
|
|
sections = _split_into_sections(text)
|
|
chunks: list[Chunk] = []
|
|
idx = 0
|
|
|
|
for section_type, section_text in sections:
|
|
section_chunks = _split_section(section_text, chunk_size, overlap)
|
|
for chunk_text in section_chunks:
|
|
chunks.append(Chunk(
|
|
content=chunk_text,
|
|
section_type=section_type,
|
|
chunk_index=idx,
|
|
))
|
|
idx += 1
|
|
|
|
return chunks
|
|
|
|
|
|
def _split_into_sections(text: str) -> list[tuple[str, str]]:
|
|
"""Split text into (section_type, text) pairs based on Hebrew headers."""
|
|
# Find all section headers and their positions
|
|
markers: list[tuple[int, str]] = []
|
|
|
|
for pattern, section_type in SECTION_PATTERNS:
|
|
for match in re.finditer(pattern, text):
|
|
markers.append((match.start(), section_type))
|
|
|
|
if not markers:
|
|
# No sections found - treat as single block
|
|
return [("other", text)]
|
|
|
|
markers.sort(key=lambda x: x[0])
|
|
|
|
sections: list[tuple[str, str]] = []
|
|
|
|
# Text before first section
|
|
if markers[0][0] > 0:
|
|
intro_text = text[: markers[0][0]].strip()
|
|
if intro_text:
|
|
sections.append(("intro", intro_text))
|
|
|
|
# Each section
|
|
for i, (pos, section_type) in enumerate(markers):
|
|
end = markers[i + 1][0] if i + 1 < len(markers) else len(text)
|
|
section_text = text[pos:end].strip()
|
|
if section_text:
|
|
sections.append((section_type, section_text))
|
|
|
|
return sections
|
|
|
|
|
|
def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]:
|
|
"""Split a section into overlapping chunks by paragraphs.
|
|
|
|
Uses approximate token counting (Hebrew ~1.5 chars per token).
|
|
"""
|
|
if not text.strip():
|
|
return []
|
|
|
|
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
|
|
chunks: list[str] = []
|
|
current: list[str] = []
|
|
current_tokens = 0
|
|
|
|
for para in paragraphs:
|
|
para_tokens = _estimate_tokens(para)
|
|
|
|
if current_tokens + para_tokens > chunk_size and current:
|
|
chunks.append("\n".join(current))
|
|
# Keep overlap
|
|
overlap_paras: list[str] = []
|
|
overlap_tokens = 0
|
|
for p in reversed(current):
|
|
pt = _estimate_tokens(p)
|
|
if overlap_tokens + pt > overlap:
|
|
break
|
|
overlap_paras.insert(0, p)
|
|
overlap_tokens += pt
|
|
current = overlap_paras
|
|
current_tokens = overlap_tokens
|
|
|
|
current.append(para)
|
|
current_tokens += para_tokens
|
|
|
|
if current:
|
|
chunks.append("\n".join(current))
|
|
|
|
return chunks
|
|
|
|
|
|
def _estimate_tokens(text: str) -> int:
|
|
"""Rough token estimate for Hebrew text (~1.5 chars per token)."""
|
|
return max(1, len(text) // 2)
|