Files
legal-ai/mcp-server/src/legal_mcp/services/chunker.py
Chaim 7ee90dce31
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m27s
feat: external precedent library with auto halacha extraction
Adds a third corpus of legal authority distinct from style_corpus
(Daphna's prior decisions for voice) and case_precedents (chair-attached
quotes per case). The new corpus holds chair-uploaded court rulings and
other appeals committee decisions, with binding rules (הלכות) extracted
automatically and queued for chair approval.

Pipeline (web/app.py + services/precedent_library.py):
file → extract → chunk → Voyage embed → halacha_extractor → store +
publish progress over the existing Redis SSE channel.

Schema V7 (services/db.py): extends case_law with source_kind +
extraction status fields under a CHECK constraint pinning practice_area
to the three appeals committee domains (rishuy_uvniya, betterment_levy,
compensation_197). New precedent_chunks (vector(1024)) and halachot
tables (vector(1024) over rule_statement, IVFFlat indexes, gin on
practice_areas/subject_tags). Halachot start as pending_review; only
approved/published rows are visible to search_precedent_library.

Agents: legal-writer, legal-researcher, legal-analyst, legal-ceo,
legal-qa get search_precedent_library. legal-writer prompt explains
the three-corpus distinction and CREAC use; legal-qa now verifies that
every cited halacha resolves to an approved row in the corpus.

UI: /precedents page with four tabs — library / semantic search /
pending review (J/K nav, A/R/E shortcuts, badge count) / stats.
Reuses the existing upload-sheet progress + SSE pattern.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 08:38:18 +00:00

133 lines
4.2 KiB
Python

"""Legal document chunker - splits text into sections and chunks for RAG."""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from legal_mcp import config
# Hebrew legal section headers.
# Covers both appeals committee decisions and external court rulings —
# court rulings use slightly different vocabulary (פסק דין, נימוקים, סוף דבר).
SECTION_PATTERNS = [
(r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע", "facts"),
(r"טענות\s*העוררי[םן]|טענות\s*המערערי[םן]|עיקר\s*טענות\s*העוררי[םן]", "appellant_claims"),
(r"טענות\s*המשיבי[םן]|תשובת\s*המשיבי[םן]|עיקר\s*טענות\s*המשיבי[םן]", "respondent_claims"),
(r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|נימוקים", "legal_analysis"),
(r"מסקנ[הות]|סיכום|סוף\s*דבר", "conclusion"),
(r"פסק[- ]?דין|החלטה|לפיכך\s*אני\s*מחליט|התוצאה", "ruling"),
(r"מבוא|פתיחה|לפניי", "intro"),
]
@dataclass
class Chunk:
content: str
section_type: str = "other"
page_number: int | None = None
chunk_index: int = 0
def chunk_document(
text: str,
chunk_size: int = config.CHUNK_SIZE_TOKENS,
overlap: int = config.CHUNK_OVERLAP_TOKENS,
) -> list[Chunk]:
"""Split a legal document into chunks, respecting section boundaries."""
if not text.strip():
return []
sections = _split_into_sections(text)
chunks: list[Chunk] = []
idx = 0
for section_type, section_text in sections:
section_chunks = _split_section(section_text, chunk_size, overlap)
for chunk_text in section_chunks:
chunks.append(Chunk(
content=chunk_text,
section_type=section_type,
chunk_index=idx,
))
idx += 1
return chunks
def _split_into_sections(text: str) -> list[tuple[str, str]]:
"""Split text into (section_type, text) pairs based on Hebrew headers."""
# Find all section headers and their positions
markers: list[tuple[int, str]] = []
for pattern, section_type in SECTION_PATTERNS:
for match in re.finditer(pattern, text):
markers.append((match.start(), section_type))
if not markers:
# No sections found - treat as single block
return [("other", text)]
markers.sort(key=lambda x: x[0])
sections: list[tuple[str, str]] = []
# Text before first section
if markers[0][0] > 0:
intro_text = text[: markers[0][0]].strip()
if intro_text:
sections.append(("intro", intro_text))
# Each section
for i, (pos, section_type) in enumerate(markers):
end = markers[i + 1][0] if i + 1 < len(markers) else len(text)
section_text = text[pos:end].strip()
if section_text:
sections.append((section_type, section_text))
return sections
def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]:
"""Split a section into overlapping chunks by paragraphs.
Uses approximate token counting (Hebrew ~1.5 chars per token).
"""
if not text.strip():
return []
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
chunks: list[str] = []
current: list[str] = []
current_tokens = 0
for para in paragraphs:
para_tokens = _estimate_tokens(para)
if current_tokens + para_tokens > chunk_size and current:
chunks.append("\n".join(current))
# Keep overlap
overlap_paras: list[str] = []
overlap_tokens = 0
for p in reversed(current):
pt = _estimate_tokens(p)
if overlap_tokens + pt > overlap:
break
overlap_paras.insert(0, p)
overlap_tokens += pt
current = overlap_paras
current_tokens = overlap_tokens
current.append(para)
current_tokens += para_tokens
if current:
chunks.append("\n".join(current))
return chunks
def _estimate_tokens(text: str) -> int:
"""Rough token estimate for Hebrew text (~1.5 chars per token)."""
return max(1, len(text) // 2)