Files
legal-ai/mcp-server/src/legal_mcp/services/research_md.py
Chaim 0c4886afe6 Wire legal-writer to chair directions from analysis-and-research.md
Closes the loop so דפנה's positions (written inline in the UI and
saved to analysis-and-research.md) automatically become binding
direction for the legal-writer agent — no manual copy-paste,
no bypass.

Backend:
- research_md.extract_chair_directions(path) returns a compact dict
  with status (missing/empty/partial/complete), filled_count,
  empty_count, and a reduced list of threshold_claims + issues each
  with {id, number, title, direction}. Designed to be directly usable
  as direction_doc by the writer.
- New MCP tool: drafting.get_chair_directions(case_number) wraps the
  helper, resolves the case research file path via config.find_case_dir,
  returns formatted JSON.
- Registered in server.py as mcp__legal-ai__get_chair_directions.

legal-writer agent update:
- Adds get_chair_directions to the tools list.
- New mandatory "שלב 1ב" before any block writing: call
  get_chair_directions, branch on status.
  - missing → halt, report "legal-analyst לא רץ עדיין"
  - empty → halt, instruct Dafna to fill positions via the UI URL
  - partial → halt unless user confirms; write only filled sections
  - complete → proceed
- New "שלב 1ג" constructs an internal direction_doc from the
  received chair rulings before writing block י.
- Block י section expanded with 5 binding rules:
  1. Open each discussion with Dafna's ruling as the thesis
  2. Frame the reasoning in her style (use get_style_guide phrases)
  3. Match her tone (decisive vs nuanced)
  4. Must NOT contradict her position — if she disagreed with your
     own inclination, her position rules
  5. Use legal_questions from the analysis file as the analytical
     structure (principle question first, concrete application second)
- New bullet section for block יא: summarize each chair ruling
  briefly, state final outcome, close with the signed date formula.

Verified all four status paths (missing/empty/partial/complete) via
local test. Now Dafna's workflow is fully end-to-end: she reads the
analyst report in the UI, fills "עמדת ועדת הערר" in each card, hits
blur to auto-save, then triggers legal-writer — which picks up her
positions as direction without any file shuffle.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 13:04:30 +00:00

437 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Parser for analysis-and-research.md produced by the legal-analyst agent.
Extracts the structured content (threshold claims, issues, sections) into
a JSON-serializable dict for UI rendering, and supports atomic in-place
updates of the "עמדת ועדת הערר" (chair position) field in each subsection.
The parser is intentionally tolerant: the file format is under active
development, so we extract what we find rather than enforcing a strict
schema. Missing sections return empty/None values.
"""
from __future__ import annotations
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Any
# Placeholder strings — any of these means "not yet filled"
CHAIR_POSITION_PLACEHOLDERS = (
"[ימולא ע\"י יו\"ר הוועדה]",
"[ימולא ע'י יו'ר הוועדה]",
"[ימולא על ידי יו\"ר הוועדה]",
"[לא מולא]",
"[טרם מולא]",
)
CHAIR_POSITION_LABEL = "עמדת ועדת הערר"
# Matches "## N. title" or "## title" for main sections
MAIN_SECTION_RE = re.compile(r"^##\s+(\d+)\.?\s+(.+?)$", re.MULTILINE)
# Matches "### title" for subsections (threshold claims, issues)
SUBSECTION_RE = re.compile(r"^###\s+(.+?)$", re.MULTILINE)
# Matches "**LABEL:**" field markers — handles both inline and block variants:
# "**עמדת המבקשת:** Some text on same line"
# "**שאלות משפטיות:**\n1. First question"
# The label itself must not contain ** or newlines.
FIELD_LABEL_RE = re.compile(r"^\*\*([^\n*]+?):\*\*[ \t]*", re.MULTILINE)
# Matches the case number in the H1
CASE_NUMBER_RE = re.compile(r"#\s*ניתוח.*?ערר\s+([\d/\-]+)", re.MULTILINE)
# Matches the date line
DATE_RE = re.compile(r"^תאריך:\s*(.+?)\s*$", re.MULTILINE)
def _is_placeholder(text: str) -> bool:
"""Check if a field value is one of the placeholder strings (empty)."""
stripped = text.strip()
if not stripped:
return True
for ph in CHAIR_POSITION_PLACEHOLDERS:
if ph in stripped:
return True
return False
def _normalize_chair_position(text: str) -> str:
"""Return empty string for placeholders, otherwise the text."""
if _is_placeholder(text):
return ""
return text.strip()
def _split_main_sections(content: str) -> list[tuple[str, str, str]]:
"""Split content into (number, title, body) tuples for each H2 section.
Handles both numbered (## 1. title) and unnumbered (## title) H2s.
Body is everything up to the next H2.
"""
# Find all H2 positions
h2_positions = []
for m in re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE):
title = m.group(1).strip()
num_match = re.match(r"^(\d+)\.?\s+(.+)", title)
if num_match:
number = num_match.group(1)
title = num_match.group(2).strip()
else:
number = ""
h2_positions.append((m.start(), m.end(), number, title))
sections = []
for i, (_start, end, number, title) in enumerate(h2_positions):
next_start = h2_positions[i + 1][0] if i + 1 < len(h2_positions) else len(content)
body = content[end:next_start].strip()
sections.append((number, title, body))
return sections
def _split_subsections(body: str) -> list[tuple[str, str]]:
"""Split a section body by H3 subsections.
Returns list of (title, content) — content is everything until next H3.
Leading text before first H3 is discarded at this level.
"""
h3_positions = []
for m in re.finditer(r"^###\s+(.+?)$", body, re.MULTILINE):
h3_positions.append((m.start(), m.end(), m.group(1).strip()))
if not h3_positions:
return []
subs = []
for i, (_start, end, title) in enumerate(h3_positions):
next_start = h3_positions[i + 1][0] if i + 1 < len(h3_positions) else len(body)
content = body[end:next_start].strip()
# Strip trailing horizontal rule "---"
content = re.sub(r"\s*---\s*$", "", content).strip()
subs.append((title, content))
return subs
def _extract_fields(text: str) -> list[dict]:
"""Extract bold-label fields from a subsection body.
Returns list of {"label": str, "content": str} in document order.
A field runs from its "**LABEL:**" marker until the next one (or EOS).
"""
matches = list(FIELD_LABEL_RE.finditer(text))
if not matches:
return []
fields = []
for i, m in enumerate(matches):
label = m.group(1).strip()
content_start = m.end()
content_end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
content = text[content_start:content_end].strip()
# Strip trailing horizontal rule
content = re.sub(r"\s*---\s*$", "", content).strip()
fields.append({"label": label, "content": content})
return fields
def _build_subsection_dict(
title: str, body: str, id_prefix: str, number: int
) -> dict:
"""Build a structured dict for a threshold claim or issue subsection.
- id: stable identifier used by update endpoint (e.g. 'threshold_1')
- title: the H3 title
- number: 1-based ordinal
- fields: ordered list of {label, content} pairs
- chair_position: extracted separately for UI editing (normalized empty)
"""
fields = _extract_fields(body)
# Split title at ": " for cleaner display
display_title = title
if ": " in title:
parts = title.split(": ", 1)
display_title = parts[1] if len(parts) > 1 else title
chair_position = ""
regular_fields = []
for f in fields:
if f["label"] == CHAIR_POSITION_LABEL:
chair_position = _normalize_chair_position(f["content"])
else:
regular_fields.append(f)
return {
"id": f"{id_prefix}_{number}",
"number": number,
"title": display_title,
"raw_title": title,
"fields": regular_fields,
"chair_position": chair_position,
}
def parse(file_path: Path) -> dict[str, Any]:
"""Parse analysis-and-research.md into a structured dict.
Returns a dict with header info, plain-text sections, threshold_claims[],
issues[], and conclusions. Tolerant to missing sections.
"""
content = file_path.read_text(encoding="utf-8")
# Header info from H1 and date line
case_match = CASE_NUMBER_RE.search(content)
case_number = case_match.group(1) if case_match else ""
date_match = DATE_RE.search(content)
date_str = date_match.group(1) if date_match else ""
stat = file_path.stat()
mtime_iso = datetime.fromtimestamp(stat.st_mtime).isoformat()
result: dict[str, Any] = {
"header": {
"case_number": case_number,
"date": date_str,
"file_path": str(file_path),
"file_size": stat.st_size,
"modified_at": mtime_iso,
},
"represented_party": "",
"procedural_background": "",
"agreed_facts": "",
"disputed_facts": "",
"threshold_claims": [],
"issues": [],
"conclusions": "",
"other_sections": [],
}
sections = _split_main_sections(content)
for number, title, body in sections:
title_norm = title.strip()
if "צד מיוצג" in title_norm:
result["represented_party"] = body
elif "רקע דיוני" in title_norm:
result["procedural_background"] = body
elif "עובדות מוסכמות" in title_norm:
result["agreed_facts"] = body
elif "עובדות שנויות במחלוקת" in title_norm or "שנויות" in title_norm:
result["disputed_facts"] = body
elif "טענות סף" in title_norm or "טענות הסף" in title_norm:
subs = _split_subsections(body)
for i, (sub_title, sub_body) in enumerate(subs, start=1):
result["threshold_claims"].append(
_build_subsection_dict(sub_title, sub_body, "threshold", i)
)
elif "סוגיות להכרעה" in title_norm or "סוגיות" in title_norm:
subs = _split_subsections(body)
for i, (sub_title, sub_body) in enumerate(subs, start=1):
result["issues"].append(
_build_subsection_dict(sub_title, sub_body, "issue", i)
)
elif "מסקנות" in title_norm or "סיכום" in title_norm:
result["conclusions"] = body
else:
# Unknown section — keep as-is for display
result["other_sections"].append(
{"number": number, "title": title_norm, "body": body}
)
return result
# ── Chair position in-place update ───────────────────────────────
def _find_subsection_by_id(
content: str, section_id: str
) -> tuple[int, int, str] | None:
"""Locate a subsection's body range in the raw content.
Given section_id like 'threshold_2' or 'issue_3', walks the file
structure and returns (body_start, body_end, body_text) for that
subsection. Returns None if not found.
"""
parts = section_id.split("_")
if len(parts) != 2:
return None
kind, idx_str = parts
try:
target_idx = int(idx_str)
except ValueError:
return None
if kind == "threshold":
main_keywords = ("טענות סף", "טענות הסף")
elif kind == "issue":
main_keywords = ("סוגיות להכרעה", "סוגיות")
else:
return None
# Find the main section that contains threshold claims or issues
sections_iter = list(re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE))
for i, m in enumerate(sections_iter):
title = m.group(1).strip()
if not any(kw in title for kw in main_keywords):
continue
body_start = m.end()
body_end = (
sections_iter[i + 1].start() if i + 1 < len(sections_iter) else len(content)
)
section_body = content[body_start:body_end]
# Find H3 subsections within
h3s = list(re.finditer(r"^###\s+.+?$", section_body, re.MULTILINE))
if target_idx < 1 or target_idx > len(h3s):
return None
sub_start_rel = h3s[target_idx - 1].end()
sub_end_rel = (
h3s[target_idx].start() if target_idx < len(h3s) else len(section_body)
)
abs_start = body_start + sub_start_rel
abs_end = body_start + sub_end_rel
return abs_start, abs_end, content[abs_start:abs_end]
return None
def update_chair_position(
file_path: Path, section_id: str, new_text: str
) -> dict[str, Any]:
"""Atomically update the chair_position field of one subsection.
Writes to a temporary file then renames into place (atomic on Linux).
Returns {"saved": bool, "section_id": ..., "preview": ...}.
Raises FileNotFoundError or ValueError on error.
"""
if not file_path.exists():
raise FileNotFoundError(str(file_path))
content = file_path.read_text(encoding="utf-8")
found = _find_subsection_by_id(content, section_id)
if not found:
raise ValueError(f"section {section_id} not found")
_abs_start, _abs_end, subsection_body = found
# Find the "**עמדת ועדת הערר:**" label within this subsection
label_pattern = re.compile(
r"(\*\*" + re.escape(CHAIR_POSITION_LABEL) + r":\*\*)\s*\n?([^*]*?)(?=\n\*\*|\n##|\n---|\Z)",
re.DOTALL,
)
m = label_pattern.search(subsection_body)
if not m:
# Label not present — append it at the end of the subsection
# (just before the trailing --- if any)
new_block = f"\n\n**{CHAIR_POSITION_LABEL}:**\n{new_text.strip()}\n"
new_subsection = subsection_body.rstrip() + new_block
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
else:
# Replace the existing content of the chair_position field
replacement = f"{m.group(1)}\n{new_text.strip() if new_text.strip() else CHAIR_POSITION_PLACEHOLDERS[0]}\n"
new_subsection = (
subsection_body[: m.start()] + replacement + subsection_body[m.end():]
)
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
# Atomic write
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
tmp_path.write_text(new_content, encoding="utf-8")
os.replace(tmp_path, file_path)
preview = new_text.strip()[:120]
return {
"saved": True,
"section_id": section_id,
"preview": preview,
"timestamp": datetime.now().isoformat(),
}
# ── Chair directions extraction (for downstream agents) ─────────
def extract_chair_directions(file_path: Path) -> dict[str, Any]:
"""Extract only the chair positions from analysis-and-research.md.
Returns a compact dict that the legal-writer agent can use as direction:
{
"case_number": "1033-25",
"file_path": "...",
"file_exists": True,
"total_items": 9,
"filled_count": 3,
"empty_count": 6,
"status": "partial", # "empty" | "partial" | "complete"
"threshold_claims": [
{"id": "threshold_1", "number": 1, "title": "...", "direction": "..."},
...
],
"issues": [
{"id": "issue_1", "number": 1, "title": "...", "direction": "..."},
...
]
}
Used by legal-writer to convert chair positions into direction docs
before generating blocks of the decision.
"""
if not file_path.exists():
return {
"file_exists": False,
"status": "missing",
"error": "analysis-and-research.md not found",
"threshold_claims": [],
"issues": [],
"total_items": 0,
"filled_count": 0,
"empty_count": 0,
}
parsed = parse(file_path)
def reduce_item(item: dict) -> dict:
return {
"id": item["id"],
"number": item["number"],
"title": item["title"],
"direction": item.get("chair_position", "") or "",
}
threshold = [reduce_item(t) for t in parsed.get("threshold_claims", [])]
issues = [reduce_item(i) for i in parsed.get("issues", [])]
all_items = threshold + issues
total = len(all_items)
filled = sum(1 for x in all_items if x["direction"].strip())
empty = total - filled
if total == 0:
status = "missing"
elif filled == 0:
status = "empty"
elif filled == total:
status = "complete"
else:
status = "partial"
return {
"file_exists": True,
"file_path": str(file_path),
"case_number": parsed.get("header", {}).get("case_number", ""),
"status": status,
"total_items": total,
"filled_count": filled,
"empty_count": empty,
"threshold_claims": threshold,
"issues": issues,
}