Files
legal-ai/mcp-server/src/legal_mcp/services/research_md.py
Chaim 0d8cc31a2b
All checks were successful
G12 Leak-Guard / leak-guard (pull_request) Successful in 5s
feat(storage): seal INV-STG1 write path — 15 dual-write seals + CI leak-guard + tripwire
אחרי ה-cutover ל-s3-only, אודיט מצא 15 אתרי-כתיבת-בלוב שעוקפים את storage.py (uploads/
finalize/exports/training/research-backup/precedents/bulletins/draft) — קובץ ינחת
בתיקיות-הישנות אך **לא** ב-MinIO → יאבד בניקוי, לא מוגש, לא מגובה. ה-pipeline (ingest/
extract) עדיין קורא לפי file_path מהדיסק, אז ביטול-מוחלט של כתיבה-לדיסק דורש read-wiring
מלא (Phase 2, משימה נפרדת). תיקון בטוח עכשיו = **dual-write seal**.

- storage.py: `mirror`/`mirror_file` (+ sync) — best-effort persist ל-S3 כשה-backend
  s3/dual (no-op ב-filesystem; כשל S3 נרשם, לא שובר request — DualBackend philosophy).
- web/app.py: helpers `_seal_blob`/`_seal_blob_file` + 14 אתרים אטומים (storage.mirror
  אחרי כתיבת-הדיסק; הדיסק נשאר ל-pipeline). block_writer.py: draft אטום (async).
- **CI leak-guard** (test_storage_write_leak_guard): נכשל על כל כתיבת-בלוב-לדיסק
  (write_bytes/write_text/shutil.copy*/open(wb)) ב-web/+services ללא מרקר `# noqa: STG1`.
  כל ה-benign (fallbacks/tmp/staging/git-metadata/flag/state) מסומנים עם נימוק. storage.py
  מוחרג (הוא המימוש).
- **tripwire** (scripts/storage_leak_tripwire.py): ניטור-ריצה — בלובים בדיסק שלא ב-MinIO
  (json-key match, bucket per-file). אומת חי: 0 דליפות.

invariants: INV-STG1 (כל I/O דרך storage / ממורר אליו) · INV-STG6 · feedback_silent_swallow
(mirror רושם warning, לא bare-except). Phase 2 (read-wire ה-pipeline → להפיל את עותק-הדיסק)
= follow-up. tests: 4 mirror + 1 leak-guard + 6 serve_blob + 18 storage קיימות עוברות.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 19:57:12 +00:00

440 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Parser for analysis-and-research.md produced by the legal-analyst agent.
Extracts the structured content (threshold claims, issues, sections) into
a JSON-serializable dict for UI rendering, and supports atomic in-place
updates of the "עמדת ועדת הערר" (chair position) field in each subsection.
The parser is intentionally tolerant: the file format is under active
development, so we extract what we find rather than enforcing a strict
schema. Missing sections return empty/None values.
"""
from __future__ import annotations
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Any
# Placeholder strings — any of these means "not yet filled"
CHAIR_POSITION_PLACEHOLDERS = (
"[ימולא ע\"י יו\"ר הוועדה]",
"[ימולא ע'י יו'ר הוועדה]",
"[ימולא על ידי יו\"ר הוועדה]",
"[לא מולא]",
"[טרם מולא]",
)
CHAIR_POSITION_LABEL = "עמדת ועדת הערר"
# Matches "## N. title" or "## title" for main sections
MAIN_SECTION_RE = re.compile(r"^##\s+(\d+)\.?\s+(.+?)$", re.MULTILINE)
# Matches "### title" for subsections (threshold claims, issues)
SUBSECTION_RE = re.compile(r"^###\s+(.+?)$", re.MULTILINE)
# Matches "**LABEL:**" field markers — handles both inline and block variants:
# "**עמדת המבקשת:** Some text on same line"
# "**שאלות משפטיות:**\n1. First question"
# The label itself must not contain ** or newlines.
FIELD_LABEL_RE = re.compile(r"^\*\*([^\n*]+?):\*\*[ \t]*", re.MULTILINE)
# Matches the case number in the H1
CASE_NUMBER_RE = re.compile(r"#\s*ניתוח.*?ערר\s+([\d/\-]+)", re.MULTILINE)
# Matches the date line
DATE_RE = re.compile(r"^תאריך:\s*(.+?)\s*$", re.MULTILINE)
def _is_placeholder(text: str) -> bool:
"""Check if a field value is one of the placeholder strings (empty)."""
stripped = text.strip()
if not stripped:
return True
for ph in CHAIR_POSITION_PLACEHOLDERS:
if ph in stripped:
return True
# Extended placeholders: [ימולא ע"י יו"ר הוועדה — extra descriptive text]
if re.match(r'^\[ימולא\b', stripped):
return True
return False
def _normalize_chair_position(text: str) -> str:
"""Return empty string for placeholders, otherwise the text."""
if _is_placeholder(text):
return ""
return text.strip()
def _split_main_sections(content: str) -> list[tuple[str, str, str]]:
"""Split content into (number, title, body) tuples for each H2 section.
Handles both numbered (## 1. title) and unnumbered (## title) H2s.
Body is everything up to the next H2.
"""
# Find all H2 positions
h2_positions = []
for m in re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE):
title = m.group(1).strip()
num_match = re.match(r"^(\d+)\.?\s+(.+)", title)
if num_match:
number = num_match.group(1)
title = num_match.group(2).strip()
else:
number = ""
h2_positions.append((m.start(), m.end(), number, title))
sections = []
for i, (_start, end, number, title) in enumerate(h2_positions):
next_start = h2_positions[i + 1][0] if i + 1 < len(h2_positions) else len(content)
body = content[end:next_start].strip()
sections.append((number, title, body))
return sections
def _split_subsections(body: str) -> list[tuple[str, str]]:
"""Split a section body by H3 subsections.
Returns list of (title, content) — content is everything until next H3.
Leading text before first H3 is discarded at this level.
"""
h3_positions = []
for m in re.finditer(r"^###\s+(.+?)$", body, re.MULTILINE):
h3_positions.append((m.start(), m.end(), m.group(1).strip()))
if not h3_positions:
return []
subs = []
for i, (_start, end, title) in enumerate(h3_positions):
next_start = h3_positions[i + 1][0] if i + 1 < len(h3_positions) else len(body)
content = body[end:next_start].strip()
# Strip trailing horizontal rule "---"
content = re.sub(r"\s*---\s*$", "", content).strip()
subs.append((title, content))
return subs
def _extract_fields(text: str) -> list[dict]:
"""Extract bold-label fields from a subsection body.
Returns list of {"label": str, "content": str} in document order.
A field runs from its "**LABEL:**" marker until the next one (or EOS).
"""
matches = list(FIELD_LABEL_RE.finditer(text))
if not matches:
return []
fields = []
for i, m in enumerate(matches):
label = m.group(1).strip()
content_start = m.end()
content_end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
content = text[content_start:content_end].strip()
# Strip trailing horizontal rule
content = re.sub(r"\s*---\s*$", "", content).strip()
fields.append({"label": label, "content": content})
return fields
def _build_subsection_dict(
title: str, body: str, id_prefix: str, number: int
) -> dict:
"""Build a structured dict for a threshold claim or issue subsection.
- id: stable identifier used by update endpoint (e.g. 'threshold_1')
- title: the H3 title
- number: 1-based ordinal
- fields: ordered list of {label, content} pairs
- chair_position: extracted separately for UI editing (normalized empty)
"""
fields = _extract_fields(body)
# Split title at ": " for cleaner display
display_title = title
if ": " in title:
parts = title.split(": ", 1)
display_title = parts[1] if len(parts) > 1 else title
chair_position = ""
regular_fields = []
for f in fields:
if f["label"] == CHAIR_POSITION_LABEL:
chair_position = _normalize_chair_position(f["content"])
else:
regular_fields.append(f)
return {
"id": f"{id_prefix}_{number}",
"number": number,
"title": display_title,
"raw_title": title,
"fields": regular_fields,
"chair_position": chair_position,
}
def parse(file_path: Path) -> dict[str, Any]:
"""Parse analysis-and-research.md into a structured dict.
Returns a dict with header info, plain-text sections, threshold_claims[],
issues[], and conclusions. Tolerant to missing sections.
"""
content = file_path.read_text(encoding="utf-8")
# Header info from H1 and date line
case_match = CASE_NUMBER_RE.search(content)
case_number = case_match.group(1) if case_match else ""
date_match = DATE_RE.search(content)
date_str = date_match.group(1) if date_match else ""
stat = file_path.stat()
mtime_iso = datetime.fromtimestamp(stat.st_mtime).isoformat()
result: dict[str, Any] = {
"header": {
"case_number": case_number,
"date": date_str,
"file_path": str(file_path),
"file_size": stat.st_size,
"modified_at": mtime_iso,
},
"represented_party": "",
"procedural_background": "",
"agreed_facts": "",
"disputed_facts": "",
"threshold_claims": [],
"issues": [],
"conclusions": "",
"other_sections": [],
}
sections = _split_main_sections(content)
for number, title, body in sections:
title_norm = title.strip()
if "צד מיוצג" in title_norm:
result["represented_party"] = body
elif "רקע דיוני" in title_norm:
result["procedural_background"] = body
elif "עובדות מוסכמות" in title_norm:
result["agreed_facts"] = body
elif "עובדות שנויות במחלוקת" in title_norm or "שנויות" in title_norm:
result["disputed_facts"] = body
elif "טענות סף" in title_norm or "טענות הסף" in title_norm:
subs = _split_subsections(body)
for i, (sub_title, sub_body) in enumerate(subs, start=1):
result["threshold_claims"].append(
_build_subsection_dict(sub_title, sub_body, "threshold", i)
)
elif "סוגיות להכרעה" in title_norm or "סוגיות" in title_norm:
subs = _split_subsections(body)
for i, (sub_title, sub_body) in enumerate(subs, start=1):
result["issues"].append(
_build_subsection_dict(sub_title, sub_body, "issue", i)
)
elif "מסקנות" in title_norm or "סיכום" in title_norm:
result["conclusions"] = body
else:
# Unknown section — keep as-is for display
result["other_sections"].append(
{"number": number, "title": title_norm, "body": body}
)
return result
# ── Chair position in-place update ───────────────────────────────
def _find_subsection_by_id(
content: str, section_id: str
) -> tuple[int, int, str] | None:
"""Locate a subsection's body range in the raw content.
Given section_id like 'threshold_2' or 'issue_3', walks the file
structure and returns (body_start, body_end, body_text) for that
subsection. Returns None if not found.
"""
parts = section_id.split("_")
if len(parts) != 2:
return None
kind, idx_str = parts
try:
target_idx = int(idx_str)
except ValueError:
return None
if kind == "threshold":
main_keywords = ("טענות סף", "טענות הסף")
elif kind == "issue":
main_keywords = ("סוגיות להכרעה", "סוגיות")
else:
return None
# Find the main section that contains threshold claims or issues
sections_iter = list(re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE))
for i, m in enumerate(sections_iter):
title = m.group(1).strip()
if not any(kw in title for kw in main_keywords):
continue
body_start = m.end()
body_end = (
sections_iter[i + 1].start() if i + 1 < len(sections_iter) else len(content)
)
section_body = content[body_start:body_end]
# Find H3 subsections within
h3s = list(re.finditer(r"^###\s+.+?$", section_body, re.MULTILINE))
if target_idx < 1 or target_idx > len(h3s):
return None
sub_start_rel = h3s[target_idx - 1].end()
sub_end_rel = (
h3s[target_idx].start() if target_idx < len(h3s) else len(section_body)
)
abs_start = body_start + sub_start_rel
abs_end = body_start + sub_end_rel
return abs_start, abs_end, content[abs_start:abs_end]
return None
def update_chair_position(
file_path: Path, section_id: str, new_text: str
) -> dict[str, Any]:
"""Atomically update the chair_position field of one subsection.
Writes to a temporary file then renames into place (atomic on Linux).
Returns {"saved": bool, "section_id": ..., "preview": ...}.
Raises FileNotFoundError or ValueError on error.
"""
if not file_path.exists():
raise FileNotFoundError(str(file_path))
content = file_path.read_text(encoding="utf-8")
found = _find_subsection_by_id(content, section_id)
if not found:
raise ValueError(f"section {section_id} not found")
_abs_start, _abs_end, subsection_body = found
# Find the "**עמדת ועדת הערר:**" label within this subsection
label_pattern = re.compile(
r"(\*\*" + re.escape(CHAIR_POSITION_LABEL) + r":\*\*)\s*\n?([^*]*?)(?=\n\*\*|\n##|\n---|\Z)",
re.DOTALL,
)
m = label_pattern.search(subsection_body)
if not m:
# Label not present — append it at the end of the subsection
# (just before the trailing --- if any)
new_block = f"\n\n**{CHAIR_POSITION_LABEL}:**\n{new_text.strip()}\n"
new_subsection = subsection_body.rstrip() + new_block
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
else:
# Replace the existing content of the chair_position field
replacement = f"{m.group(1)}\n{new_text.strip() if new_text.strip() else CHAIR_POSITION_PLACEHOLDERS[0]}\n"
new_subsection = (
subsection_body[: m.start()] + replacement + subsection_body[m.end():]
)
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
# Atomic write
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
tmp_path.write_text(new_content, encoding="utf-8") # noqa: STG1 — atomic .tmp; in-place edit, S3 re-sync in Phase-2 read-wiring
os.replace(tmp_path, file_path)
preview = new_text.strip()[:120]
return {
"saved": True,
"section_id": section_id,
"preview": preview,
"timestamp": datetime.now().isoformat(),
}
# ── Chair directions extraction (for downstream agents) ─────────
def extract_chair_directions(file_path: Path) -> dict[str, Any]:
"""Extract only the chair positions from analysis-and-research.md.
Returns a compact dict that the legal-writer agent can use as direction:
{
"case_number": "1033-25",
"file_path": "...",
"file_exists": True,
"total_items": 9,
"filled_count": 3,
"empty_count": 6,
"status": "partial", # "empty" | "partial" | "complete"
"threshold_claims": [
{"id": "threshold_1", "number": 1, "title": "...", "direction": "..."},
...
],
"issues": [
{"id": "issue_1", "number": 1, "title": "...", "direction": "..."},
...
]
}
Used by legal-writer to convert chair positions into direction docs
before generating blocks of the decision.
"""
if not file_path.exists():
return {
"file_exists": False,
"status": "missing",
"error": "analysis-and-research.md not found",
"threshold_claims": [],
"issues": [],
"total_items": 0,
"filled_count": 0,
"empty_count": 0,
}
parsed = parse(file_path)
def reduce_item(item: dict) -> dict:
return {
"id": item["id"],
"number": item["number"],
"title": item["title"],
"direction": item.get("chair_position", "") or "",
}
threshold = [reduce_item(t) for t in parsed.get("threshold_claims", [])]
issues = [reduce_item(i) for i in parsed.get("issues", [])]
all_items = threshold + issues
total = len(all_items)
filled = sum(1 for x in all_items if x["direction"].strip())
empty = total - filled
if total == 0:
status = "missing"
elif filled == 0:
status = "empty"
elif filled == total:
status = "complete"
else:
status = "partial"
return {
"file_exists": True,
"file_path": str(file_path),
"case_number": parsed.get("header", {}).get("case_number", ""),
"status": status,
"total_items": total,
"filled_count": filled,
"empty_count": empty,
"threshold_claims": threshold,
"issues": issues,
}