Files
legal-ai/mcp-server/src/legal_mcp/services/chunker.py
Chaim 6fcfdc76db fix(#79): chunker never emits sub-50-char fragment chunks (#55 follow-up)
A section that opens with a short header line ('דיון', 'טענות המשיבים')
followed by a paragraph larger than chunk_size flushed the header alone as a
tiny chunk. #55 added a query-time >=50 filter to hide these; this removes
them at the source.

_split_section: (1) don't flush a buffer still below MIN_CHUNK_CHARS — let it
absorb the next paragraph even if that overflows chunk_size, so a short header
rides with its following content; (2) fold a trailing tiny chunk back into its
predecessor.

Verified: re-chunked the 4 corpus docs that still had a tiny chunk
(ע"א 5138/04, בר"מ 2340/02, בג"ץ 6525/15, 403-17) — corpus-wide chunks<50
went 4 -> 0; all 4 stay embedded/searchable and rank top in a relevant search
(נווה שלום #1 for the s.19(ג)(1) exemption query). No regression.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 08:10:10 +00:00

375 lines
15 KiB
Python

"""Legal document chunker - splits text into sections and chunks for RAG.
The default :func:`chunk_document` emits a single tier of overlapping
chunks (legacy single-tier indexing). :func:`chunk_document_hierarchical`
emits two tiers — small "child" chunks for retrieval matching, plus
larger "parent" chunks that supply broader context to the LLM (parent-
doc retrieval, TaskMaster #48). The hierarchical variant lives
alongside the legacy one so callers can opt in via
``config.PARENT_DOC_RETRIEVAL_ENABLED`` without breaking existing
single-tier code paths.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from legal_mcp import config
# Hebrew legal section headers.
# Covers both appeals committee decisions and external court rulings —
# court rulings use slightly different vocabulary (פסק דין, נימוקים, סוף דבר).
SECTION_PATTERNS = [
(r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע", "facts"),
(r"טענות\s*העוררי[םן]|טענות\s*המערערי[םן]|עיקר\s*טענות\s*העוררי[םן]", "appellant_claims"),
(r"טענות\s*המשיבי[םן]|תשובת\s*המשיבי[םן]|עיקר\s*טענות\s*המשיבי[םן]", "respondent_claims"),
(r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|נימוקים", "legal_analysis"),
(r"מסקנ[הות]|סיכום|סוף\s*דבר", "conclusion"),
(r"פסק[- ]?דין|החלטה|לפיכך\s*אני\s*מחליט|התוצאה", "ruling"),
(r"מבוא|פתיחה|לפניי", "intro"),
]
@dataclass
class Chunk:
content: str
section_type: str = "other"
page_number: int | None = None
chunk_index: int = 0
def chunk_document(
text: str,
chunk_size: int = config.CHUNK_SIZE_TOKENS,
overlap: int = config.CHUNK_OVERLAP_TOKENS,
page_offsets: list[int] | None = None,
) -> list[Chunk]:
"""Split a legal document into chunks, respecting section boundaries.
When ``page_offsets`` is supplied (from a PDF extraction), each chunk
is tagged with the page number of its first character — used by the
multimodal hybrid retriever to join (text chunk, image at same page)
and surface text+image matches.
"""
if not text.strip():
return []
sections = _split_into_sections(text)
chunks: list[Chunk] = []
idx = 0
for section_type, section_text in sections:
section_chunks = _split_section(section_text, chunk_size, overlap)
for chunk_text in section_chunks:
chunks.append(Chunk(
content=chunk_text,
section_type=section_type,
chunk_index=idx,
))
idx += 1
if page_offsets:
_assign_pages(chunks, text, page_offsets)
return chunks
def _assign_pages(chunks: list[Chunk], text: str, page_offsets: list[int]) -> None:
"""Locate each chunk's first character in ``text`` and tag with the
page that contains that offset. Mutates chunks in-place.
Chunks have overlap so we search forward from a position slightly
past the previous chunk's start. Falls back to a global search if
the forward scan misses (rare — happens only when overlap is bigger
than the advance distance below).
"""
from legal_mcp.services.extractor import page_at_offset
pos = 0
for c in chunks:
idx = text.find(c.content, pos)
if idx < 0:
idx = text.find(c.content)
if idx < 0:
continue
c.page_number = page_at_offset(idx, page_offsets)
# advance past the chunk's halfway point — overlap is < 50% so
# the next chunk's starting point will be after this cursor.
pos = idx + max(1, len(c.content) // 2)
# A section shorter than this (stripped chars) is not a real section — it's
# an artifact of a header keyword matched mid-text. Such a fragment is merged
# into the preceding section rather than emitted as its own chunk. See #55:
# unanchored keywords like "דיון"/"החלטה"/"מסקנה" appearing inside a sentence
# used to carve tiny boundary chunks ("דיון). במסגרת ה") that polluted search.
MIN_SECTION_CHARS = 60
# A split chunk shorter than this (stripped chars) must not stand alone — it
# rides with adjacent content instead. This is the chunk-level analogue of
# MIN_SECTION_CHARS and matches the query-time filter that hides <50-char
# chunks. Without it, a section that opens with a short header line ("דיון",
# "טענות המשיבים") followed by a paragraph larger than chunk_size flushed the
# header as its own tiny chunk (#79, follow-up to #55).
MIN_CHUNK_CHARS = 50
def _split_into_sections(text: str) -> list[tuple[str, str]]:
"""Split text into (section_type, text) pairs based on Hebrew headers.
Header keywords are matched only at the **start of a line** (after
optional whitespace / list numbering like ``5.`` or ``ג.``). A real
section header in these decisions sits on its own line; anchoring to
the line start prevents common words ("דיון", "החלטה", "מסקנה") that
appear mid-sentence from being treated as section boundaries — which
previously produced tiny fragment chunks (#55).
"""
# Find all section headers and their positions
markers: list[tuple[int, str]] = []
for pattern, section_type in SECTION_PATTERNS:
# ^ + MULTILINE: line start only. Optional leading spaces/tabs and an
# optional ordinal prefix ("5.", "5)", "ג.") before the keyword.
anchored = rf"^[ \t]*(?:\d+[.)]\s*|[א-ת][.)]\s*)?(?:{pattern})"
for match in re.finditer(anchored, text, re.MULTILINE):
markers.append((match.start(), section_type))
if not markers:
# No sections found - treat as single block
return [("other", text)]
markers.sort(key=lambda x: x[0])
sections: list[tuple[str, str]] = []
# Text before first section
if markers[0][0] > 0:
intro_text = text[: markers[0][0]].strip()
if intro_text:
sections.append(("intro", intro_text))
# Each section. A section whose text is too short to stand alone is
# merged into the previous section (keeping the previous type) so a
# near-adjacent pair of headers can't produce a fragment chunk.
for i, (pos, section_type) in enumerate(markers):
end = markers[i + 1][0] if i + 1 < len(markers) else len(text)
section_text = text[pos:end].strip()
if not section_text:
continue
if len(section_text) < MIN_SECTION_CHARS and sections:
prev_type, prev_text = sections[-1]
sections[-1] = (prev_type, f"{prev_text}\n{section_text}")
else:
sections.append((section_type, section_text))
return sections
def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]:
"""Split a section into overlapping chunks by paragraphs.
Uses approximate token counting (Hebrew ~1.5 chars per token).
"""
if not text.strip():
return []
paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
chunks: list[str] = []
current: list[str] = []
current_tokens = 0
current_chars = 0
for para in paragraphs:
para_tokens = _estimate_tokens(para)
# Don't flush a buffer that is still below MIN_CHUNK_CHARS — let it
# absorb this paragraph even if that overflows chunk_size. A short
# header line ("דיון") must ride with the following paragraph rather
# than be emitted as a tiny fragment chunk (#79).
if (
current_tokens + para_tokens > chunk_size
and current
and current_chars >= MIN_CHUNK_CHARS
):
chunks.append("\n".join(current))
# Keep overlap
overlap_paras: list[str] = []
overlap_tokens = 0
for p in reversed(current):
pt = _estimate_tokens(p)
if overlap_tokens + pt > overlap:
break
overlap_paras.insert(0, p)
overlap_tokens += pt
current = overlap_paras
current_tokens = overlap_tokens
current_chars = sum(len(p) for p in current)
current.append(para)
current_tokens += para_tokens
current_chars += len(para)
if current:
chunks.append("\n".join(current))
# Fold a trailing tiny chunk back into its predecessor — a short trailing
# line (e.g. a stray quote fragment) shouldn't stand alone either (#79).
if len(chunks) >= 2 and len(chunks[-1].strip()) < MIN_CHUNK_CHARS:
tail = chunks.pop()
chunks[-1] = f"{chunks[-1]}\n{tail}"
return chunks
def _estimate_tokens(text: str) -> int:
"""Rough token estimate for Hebrew text (~1.5 chars per token)."""
return max(1, len(text) // 2)
# ── Parent-doc retrieval (TaskMaster #48) ────────────────────────────
# Hierarchical chunker — emits a list of (child, parent) pairs:
# * each "child" carries the smaller text used for embedding/search
# * each "parent" is shared by ~5 consecutive children (1500/300)
# The list is FLAT — both parents and children live in the same return
# list, distinguished by ``role``. A child's ``parent_local_id`` points
# back to its parent's ``local_id``, so the ingest pipeline can resolve
# the FK after the parent row is INSERTed and its DB UUID is known.
#
# Parents are built FIRST (one window of ``parent_size`` tokens per
# section, sliding by the parent window — no overlap between parents),
# then each parent is sub-divided into overlapping children. This keeps
# the parent boundary aligned with semantic sections (so a "discussion"
# parent doesn't contain stray "ruling" prose) while still allowing
# child overlap for recall.
@dataclass
class HierarchicalChunk:
"""One chunk in the two-tier hierarchy.
Both children and parents share this shape; ``role`` distinguishes
them. Children get an embedding at ingest time; parents do not —
they exist only to carry context back to the LLM at retrieval time.
``local_id`` is a stable in-batch identifier (sequential int) used
only by the ingest pipeline to wire children to their parent's DB
UUID after the parent INSERT returns. It is NOT persisted.
"""
content: str
role: str # 'child' | 'parent'
section_type: str = "other"
page_number: int | None = None
chunk_index: int = 0
local_id: int = -1
parent_local_id: int | None = None
def chunk_document_hierarchical(
text: str,
child_size: int = config.PARENT_DOC_CHILD_SIZE_TOKENS,
parent_size: int = config.PARENT_DOC_PARENT_SIZE_TOKENS,
overlap: int = config.PARENT_DOC_CHILD_OVERLAP_TOKENS,
page_offsets: list[int] | None = None,
) -> list[HierarchicalChunk]:
"""Split a document into a two-tier (child, parent) hierarchy.
Returns a flat list where each element is either a parent or a
child. Children carry ``parent_local_id`` pointing back to their
parent's ``local_id``. Caller (ingest pipeline) must insert parents
first, capture their DB UUIDs by ``local_id``, then insert children
with the resolved UUID in ``parent_chunk_id``.
Args:
text: full document text.
child_size: child chunk size in tokens (≈ 300 by default).
parent_size: parent chunk size in tokens (≈ 1500 by default).
Parents contain ``parent_size // child_size`` children on
average.
overlap: child-to-child overlap inside a parent (≈ 50 tokens).
Parents themselves do not overlap each other.
page_offsets: PDF page offsets for tagging chunks with page #.
Notes:
* Parents respect section boundaries (header detection from
:data:`SECTION_PATTERNS`). A "facts" parent will not include
"ruling" text.
* Empty text returns an empty list.
* Both child and parent rows are tagged with the page of their
first character.
"""
if not text.strip():
return []
if child_size <= 0 or parent_size <= 0:
raise ValueError("child_size and parent_size must be positive")
if child_size > parent_size:
raise ValueError("child_size must be <= parent_size")
sections = _split_into_sections(text)
out: list[HierarchicalChunk] = []
parent_idx = 0 # global parent ordinal (chunk_index for parents)
child_idx = 0 # global child ordinal (chunk_index for children)
local_id = 0 # sequential id within this document
for section_type, section_text in sections:
# Step 1: split section into parent-sized windows (no overlap).
parent_texts = _split_section(section_text, parent_size, overlap=0)
for parent_text in parent_texts:
parent_local = local_id
local_id += 1
parent_chunk = HierarchicalChunk(
content=parent_text,
role="parent",
section_type=section_type,
chunk_index=parent_idx,
local_id=parent_local,
parent_local_id=None,
)
out.append(parent_chunk)
parent_idx += 1
# Step 2: sub-divide this parent into overlapping children.
child_texts = _split_section(parent_text, child_size, overlap)
for ch_text in child_texts:
ch = HierarchicalChunk(
content=ch_text,
role="child",
section_type=section_type,
chunk_index=child_idx,
local_id=local_id,
parent_local_id=parent_local,
)
out.append(ch)
local_id += 1
child_idx += 1
if page_offsets:
_assign_pages_hierarchical(out, text, page_offsets)
return out
def _assign_pages_hierarchical(
chunks: list[HierarchicalChunk],
text: str,
page_offsets: list[int],
) -> None:
"""Page-tag both children and parents.
Same forward-scan strategy as :func:`_assign_pages` but works on
the hierarchical list. Parents may span pages; we tag them with
the page of their first character (matches how the multimodal
retriever joins on page numbers).
"""
from legal_mcp.services.extractor import page_at_offset
pos = 0
for c in chunks:
idx = text.find(c.content, pos)
if idx < 0:
idx = text.find(c.content)
if idx < 0:
continue
c.page_number = page_at_offset(idx, page_offsets)
# Advance past halfway — children share text with their parent
# and with each other (overlap), so a small forward step lets
# the next find() still pick up the right occurrence.
pos = idx + max(1, len(c.content) // 4)