"""Legal document chunker - splits text into sections and chunks for RAG. The default :func:`chunk_document` emits a single tier of overlapping chunks (legacy single-tier indexing). :func:`chunk_document_hierarchical` emits two tiers — small "child" chunks for retrieval matching, plus larger "parent" chunks that supply broader context to the LLM (parent- doc retrieval, TaskMaster #48). The hierarchical variant lives alongside the legacy one so callers can opt in via ``config.PARENT_DOC_RETRIEVAL_ENABLED`` without breaking existing single-tier code paths. """ from __future__ import annotations import re from dataclasses import dataclass, field from legal_mcp import config # Hebrew legal section headers. # Covers both appeals committee decisions and external court rulings — # court rulings use slightly different vocabulary (פסק דין, נימוקים, סוף דבר). SECTION_PATTERNS = [ (r"רקע\s*עובדתי|רקע\s*כללי|העובדות|הרקע", "facts"), (r"טענות\s*העוררי[םן]|טענות\s*המערערי[םן]|עיקר\s*טענות\s*העוררי[םן]", "appellant_claims"), (r"טענות\s*המשיבי[םן]|תשובת\s*המשיבי[םן]|עיקר\s*טענות\s*המשיבי[םן]", "respondent_claims"), (r"דיון\s*והכרעה|דיון|הכרעה|ניתוח\s*משפטי|המסגרת\s*המשפטית|נימוקים", "legal_analysis"), (r"מסקנ[הות]|סיכום|סוף\s*דבר", "conclusion"), (r"פסק[- ]?דין|החלטה|לפיכך\s*אני\s*מחליט|התוצאה", "ruling"), (r"מבוא|פתיחה|לפניי", "intro"), ] @dataclass class Chunk: content: str section_type: str = "other" page_number: int | None = None chunk_index: int = 0 def chunk_document( text: str, chunk_size: int = config.CHUNK_SIZE_TOKENS, overlap: int = config.CHUNK_OVERLAP_TOKENS, page_offsets: list[int] | None = None, ) -> list[Chunk]: """Split a legal document into chunks, respecting section boundaries. When ``page_offsets`` is supplied (from a PDF extraction), each chunk is tagged with the page number of its first character — used by the multimodal hybrid retriever to join (text chunk, image at same page) and surface text+image matches. """ if not text.strip(): return [] sections = _split_into_sections(text) chunks: list[Chunk] = [] idx = 0 for section_type, section_text in sections: section_chunks = _split_section(section_text, chunk_size, overlap) for chunk_text in section_chunks: chunks.append(Chunk( content=chunk_text, section_type=section_type, chunk_index=idx, )) idx += 1 if page_offsets: _assign_pages(chunks, text, page_offsets) return chunks def _assign_pages(chunks: list[Chunk], text: str, page_offsets: list[int]) -> None: """Locate each chunk's first character in ``text`` and tag with the page that contains that offset. Mutates chunks in-place. Chunks have overlap so we search forward from a position slightly past the previous chunk's start. Falls back to a global search if the forward scan misses (rare — happens only when overlap is bigger than the advance distance below). """ from legal_mcp.services.extractor import page_at_offset pos = 0 for c in chunks: idx = text.find(c.content, pos) if idx < 0: idx = text.find(c.content) if idx < 0: continue c.page_number = page_at_offset(idx, page_offsets) # advance past the chunk's halfway point — overlap is < 50% so # the next chunk's starting point will be after this cursor. pos = idx + max(1, len(c.content) // 2) # A section shorter than this (stripped chars) is not a real section — it's # an artifact of a header keyword matched mid-text. Such a fragment is merged # into the preceding section rather than emitted as its own chunk. See #55: # unanchored keywords like "דיון"/"החלטה"/"מסקנה" appearing inside a sentence # used to carve tiny boundary chunks ("דיון). במסגרת ה") that polluted search. MIN_SECTION_CHARS = 60 # A split chunk shorter than this (stripped chars) must not stand alone — it # rides with adjacent content instead. This is the chunk-level analogue of # MIN_SECTION_CHARS and matches the query-time filter that hides <50-char # chunks. Without it, a section that opens with a short header line ("דיון", # "טענות המשיבים") followed by a paragraph larger than chunk_size flushed the # header as its own tiny chunk (#79, follow-up to #55). MIN_CHUNK_CHARS = 50 def _split_into_sections(text: str) -> list[tuple[str, str]]: """Split text into (section_type, text) pairs based on Hebrew headers. Header keywords are matched only at the **start of a line** (after optional whitespace / list numbering like ``5.`` or ``ג.``). A real section header in these decisions sits on its own line; anchoring to the line start prevents common words ("דיון", "החלטה", "מסקנה") that appear mid-sentence from being treated as section boundaries — which previously produced tiny fragment chunks (#55). """ # Find all section headers and their positions markers: list[tuple[int, str]] = [] for pattern, section_type in SECTION_PATTERNS: # ^ + MULTILINE: line start only. Optional leading spaces/tabs and an # optional ordinal prefix ("5.", "5)", "ג.") before the keyword. anchored = rf"^[ \t]*(?:\d+[.)]\s*|[א-ת][.)]\s*)?(?:{pattern})" for match in re.finditer(anchored, text, re.MULTILINE): markers.append((match.start(), section_type)) if not markers: # No sections found - treat as single block return [("other", text)] markers.sort(key=lambda x: x[0]) sections: list[tuple[str, str]] = [] # Text before first section if markers[0][0] > 0: intro_text = text[: markers[0][0]].strip() if intro_text: sections.append(("intro", intro_text)) # Each section. A section whose text is too short to stand alone is # merged into the previous section (keeping the previous type) so a # near-adjacent pair of headers can't produce a fragment chunk. for i, (pos, section_type) in enumerate(markers): end = markers[i + 1][0] if i + 1 < len(markers) else len(text) section_text = text[pos:end].strip() if not section_text: continue if len(section_text) < MIN_SECTION_CHARS and sections: prev_type, prev_text = sections[-1] sections[-1] = (prev_type, f"{prev_text}\n{section_text}") else: sections.append((section_type, section_text)) return sections def _split_section(text: str, chunk_size: int, overlap: int) -> list[str]: """Split a section into overlapping chunks by paragraphs. Uses approximate token counting (Hebrew ~1.5 chars per token). """ if not text.strip(): return [] paragraphs = [p.strip() for p in text.split("\n") if p.strip()] chunks: list[str] = [] current: list[str] = [] current_tokens = 0 current_chars = 0 for para in paragraphs: para_tokens = _estimate_tokens(para) # Don't flush a buffer that is still below MIN_CHUNK_CHARS — let it # absorb this paragraph even if that overflows chunk_size. A short # header line ("דיון") must ride with the following paragraph rather # than be emitted as a tiny fragment chunk (#79). if ( current_tokens + para_tokens > chunk_size and current and current_chars >= MIN_CHUNK_CHARS ): chunks.append("\n".join(current)) # Keep overlap overlap_paras: list[str] = [] overlap_tokens = 0 for p in reversed(current): pt = _estimate_tokens(p) if overlap_tokens + pt > overlap: break overlap_paras.insert(0, p) overlap_tokens += pt current = overlap_paras current_tokens = overlap_tokens current_chars = sum(len(p) for p in current) current.append(para) current_tokens += para_tokens current_chars += len(para) if current: chunks.append("\n".join(current)) # Fold a trailing tiny chunk back into its predecessor — a short trailing # line (e.g. a stray quote fragment) shouldn't stand alone either (#79). if len(chunks) >= 2 and len(chunks[-1].strip()) < MIN_CHUNK_CHARS: tail = chunks.pop() chunks[-1] = f"{chunks[-1]}\n{tail}" return chunks def _estimate_tokens(text: str) -> int: """Rough token estimate for Hebrew text (~1.5 chars per token).""" return max(1, len(text) // 2) # ── Parent-doc retrieval (TaskMaster #48) ──────────────────────────── # Hierarchical chunker — emits a list of (child, parent) pairs: # * each "child" carries the smaller text used for embedding/search # * each "parent" is shared by ~5 consecutive children (1500/300) # The list is FLAT — both parents and children live in the same return # list, distinguished by ``role``. A child's ``parent_local_id`` points # back to its parent's ``local_id``, so the ingest pipeline can resolve # the FK after the parent row is INSERTed and its DB UUID is known. # # Parents are built FIRST (one window of ``parent_size`` tokens per # section, sliding by the parent window — no overlap between parents), # then each parent is sub-divided into overlapping children. This keeps # the parent boundary aligned with semantic sections (so a "discussion" # parent doesn't contain stray "ruling" prose) while still allowing # child overlap for recall. @dataclass class HierarchicalChunk: """One chunk in the two-tier hierarchy. Both children and parents share this shape; ``role`` distinguishes them. Children get an embedding at ingest time; parents do not — they exist only to carry context back to the LLM at retrieval time. ``local_id`` is a stable in-batch identifier (sequential int) used only by the ingest pipeline to wire children to their parent's DB UUID after the parent INSERT returns. It is NOT persisted. """ content: str role: str # 'child' | 'parent' section_type: str = "other" page_number: int | None = None chunk_index: int = 0 local_id: int = -1 parent_local_id: int | None = None def chunk_document_hierarchical( text: str, child_size: int = config.PARENT_DOC_CHILD_SIZE_TOKENS, parent_size: int = config.PARENT_DOC_PARENT_SIZE_TOKENS, overlap: int = config.PARENT_DOC_CHILD_OVERLAP_TOKENS, page_offsets: list[int] | None = None, ) -> list[HierarchicalChunk]: """Split a document into a two-tier (child, parent) hierarchy. Returns a flat list where each element is either a parent or a child. Children carry ``parent_local_id`` pointing back to their parent's ``local_id``. Caller (ingest pipeline) must insert parents first, capture their DB UUIDs by ``local_id``, then insert children with the resolved UUID in ``parent_chunk_id``. Args: text: full document text. child_size: child chunk size in tokens (≈ 300 by default). parent_size: parent chunk size in tokens (≈ 1500 by default). Parents contain ``parent_size // child_size`` children on average. overlap: child-to-child overlap inside a parent (≈ 50 tokens). Parents themselves do not overlap each other. page_offsets: PDF page offsets for tagging chunks with page #. Notes: * Parents respect section boundaries (header detection from :data:`SECTION_PATTERNS`). A "facts" parent will not include "ruling" text. * Empty text returns an empty list. * Both child and parent rows are tagged with the page of their first character. """ if not text.strip(): return [] if child_size <= 0 or parent_size <= 0: raise ValueError("child_size and parent_size must be positive") if child_size > parent_size: raise ValueError("child_size must be <= parent_size") sections = _split_into_sections(text) out: list[HierarchicalChunk] = [] parent_idx = 0 # global parent ordinal (chunk_index for parents) child_idx = 0 # global child ordinal (chunk_index for children) local_id = 0 # sequential id within this document for section_type, section_text in sections: # Step 1: split section into parent-sized windows (no overlap). parent_texts = _split_section(section_text, parent_size, overlap=0) for parent_text in parent_texts: parent_local = local_id local_id += 1 parent_chunk = HierarchicalChunk( content=parent_text, role="parent", section_type=section_type, chunk_index=parent_idx, local_id=parent_local, parent_local_id=None, ) out.append(parent_chunk) parent_idx += 1 # Step 2: sub-divide this parent into overlapping children. child_texts = _split_section(parent_text, child_size, overlap) for ch_text in child_texts: ch = HierarchicalChunk( content=ch_text, role="child", section_type=section_type, chunk_index=child_idx, local_id=local_id, parent_local_id=parent_local, ) out.append(ch) local_id += 1 child_idx += 1 if page_offsets: _assign_pages_hierarchical(out, text, page_offsets) return out def _assign_pages_hierarchical( chunks: list[HierarchicalChunk], text: str, page_offsets: list[int], ) -> None: """Page-tag both children and parents. Same forward-scan strategy as :func:`_assign_pages` but works on the hierarchical list. Parents may span pages; we tag them with the page of their first character (matches how the multimodal retriever joins on page numbers). """ from legal_mcp.services.extractor import page_at_offset pos = 0 for c in chunks: idx = text.find(c.content, pos) if idx < 0: idx = text.find(c.content) if idx < 0: continue c.page_number = page_at_offset(idx, page_offsets) # Advance past halfway — children share text with their parent # and with each other (overlap), so a small forward step lets # the next find() still pick up the right occurrence. pos = idx + max(1, len(c.content) // 4)