Research analysis cards with inline chair-position editor
New feature on case view: the analysis-and-research.md produced by the
legal-analyst agent is now rendered as structured cards in the UI,
with inline editing of "עמדת ועדת הערר" that writes directly back to
the markdown file (atomic rename).
Backend (research_md.py):
- parse(Path) → dict with header, prose sections, threshold_claims[],
issues[], conclusions, other_sections
- Tolerant field extractor handles both block ("**LABEL:**\ncontent")
and inline ("**LABEL:** content") variants
- Detects [ימולא ע"י יו"ר הוועדה] placeholder → empty chair_position
- update_chair_position(path, section_id, text) locates the exact
subsection by ordinal, replaces or appends the chair field, writes
atomically via temp file + os.replace
- Section IDs: threshold_N / issue_N (1-based)
Endpoints:
- GET /api/cases/{n}/research/analysis — returns parsed JSON or 404
- PATCH /api/cases/{n}/research/analysis/chair-position — {section_id, position}
Frontend (#page-case):
- New card "ניתוח משפטי ומחקר" below local-files card
- Prose sections as justified text panels (background + gold border)
- Threshold claims and issues as collapsible <details> items with
gold right-border on open, numbered pills
- Each item shows all extracted fields with label above content
- Chair position editor: gold-wash background, 📝 icon label, textarea
with placeholder prompt
- onblur → PATCH with save indicator: ⏳ שומר → ✓ נשמר HH:MM → fade
- Status pill next to each item title: "ממתין לעמדה" / "✓ עמדה נקבעה"
- First threshold claim opens by default, rest closed
- Card hidden entirely when no analysis file exists (404)
Tested against real file: case 1033-25 with 3 threshold claims and
6 issues, all chair positions correctly empty, update writes only the
targeted section, atomic rewrite preserves all other content.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
355
mcp-server/src/legal_mcp/services/research_md.py
Normal file
355
mcp-server/src/legal_mcp/services/research_md.py
Normal file
@@ -0,0 +1,355 @@
|
||||
"""Parser for analysis-and-research.md produced by the legal-analyst agent.
|
||||
|
||||
Extracts the structured content (threshold claims, issues, sections) into
|
||||
a JSON-serializable dict for UI rendering, and supports atomic in-place
|
||||
updates of the "עמדת ועדת הערר" (chair position) field in each subsection.
|
||||
|
||||
The parser is intentionally tolerant: the file format is under active
|
||||
development, so we extract what we find rather than enforcing a strict
|
||||
schema. Missing sections return empty/None values.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
# Placeholder strings — any of these means "not yet filled"
|
||||
CHAIR_POSITION_PLACEHOLDERS = (
|
||||
"[ימולא ע\"י יו\"ר הוועדה]",
|
||||
"[ימולא ע'י יו'ר הוועדה]",
|
||||
"[ימולא על ידי יו\"ר הוועדה]",
|
||||
"[לא מולא]",
|
||||
"[טרם מולא]",
|
||||
)
|
||||
|
||||
CHAIR_POSITION_LABEL = "עמדת ועדת הערר"
|
||||
|
||||
# Matches "## N. title" or "## title" for main sections
|
||||
MAIN_SECTION_RE = re.compile(r"^##\s+(\d+)\.?\s+(.+?)$", re.MULTILINE)
|
||||
|
||||
# Matches "### title" for subsections (threshold claims, issues)
|
||||
SUBSECTION_RE = re.compile(r"^###\s+(.+?)$", re.MULTILINE)
|
||||
|
||||
# Matches "**LABEL:**" field markers — handles both inline and block variants:
|
||||
# "**עמדת המבקשת:** Some text on same line"
|
||||
# "**שאלות משפטיות:**\n1. First question"
|
||||
# The label itself must not contain ** or newlines.
|
||||
FIELD_LABEL_RE = re.compile(r"^\*\*([^\n*]+?):\*\*[ \t]*", re.MULTILINE)
|
||||
|
||||
# Matches the case number in the H1
|
||||
CASE_NUMBER_RE = re.compile(r"#\s*ניתוח.*?ערר\s+([\d/\-]+)", re.MULTILINE)
|
||||
|
||||
# Matches the date line
|
||||
DATE_RE = re.compile(r"^תאריך:\s*(.+?)\s*$", re.MULTILINE)
|
||||
|
||||
|
||||
def _is_placeholder(text: str) -> bool:
|
||||
"""Check if a field value is one of the placeholder strings (empty)."""
|
||||
stripped = text.strip()
|
||||
if not stripped:
|
||||
return True
|
||||
for ph in CHAIR_POSITION_PLACEHOLDERS:
|
||||
if ph in stripped:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _normalize_chair_position(text: str) -> str:
|
||||
"""Return empty string for placeholders, otherwise the text."""
|
||||
if _is_placeholder(text):
|
||||
return ""
|
||||
return text.strip()
|
||||
|
||||
|
||||
def _split_main_sections(content: str) -> list[tuple[str, str, str]]:
|
||||
"""Split content into (number, title, body) tuples for each H2 section.
|
||||
|
||||
Handles both numbered (## 1. title) and unnumbered (## title) H2s.
|
||||
Body is everything up to the next H2.
|
||||
"""
|
||||
# Find all H2 positions
|
||||
h2_positions = []
|
||||
for m in re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE):
|
||||
title = m.group(1).strip()
|
||||
num_match = re.match(r"^(\d+)\.?\s+(.+)", title)
|
||||
if num_match:
|
||||
number = num_match.group(1)
|
||||
title = num_match.group(2).strip()
|
||||
else:
|
||||
number = ""
|
||||
h2_positions.append((m.start(), m.end(), number, title))
|
||||
|
||||
sections = []
|
||||
for i, (_start, end, number, title) in enumerate(h2_positions):
|
||||
next_start = h2_positions[i + 1][0] if i + 1 < len(h2_positions) else len(content)
|
||||
body = content[end:next_start].strip()
|
||||
sections.append((number, title, body))
|
||||
return sections
|
||||
|
||||
|
||||
def _split_subsections(body: str) -> list[tuple[str, str]]:
|
||||
"""Split a section body by H3 subsections.
|
||||
|
||||
Returns list of (title, content) — content is everything until next H3.
|
||||
Leading text before first H3 is discarded at this level.
|
||||
"""
|
||||
h3_positions = []
|
||||
for m in re.finditer(r"^###\s+(.+?)$", body, re.MULTILINE):
|
||||
h3_positions.append((m.start(), m.end(), m.group(1).strip()))
|
||||
|
||||
if not h3_positions:
|
||||
return []
|
||||
|
||||
subs = []
|
||||
for i, (_start, end, title) in enumerate(h3_positions):
|
||||
next_start = h3_positions[i + 1][0] if i + 1 < len(h3_positions) else len(body)
|
||||
content = body[end:next_start].strip()
|
||||
# Strip trailing horizontal rule "---"
|
||||
content = re.sub(r"\s*---\s*$", "", content).strip()
|
||||
subs.append((title, content))
|
||||
return subs
|
||||
|
||||
|
||||
def _extract_fields(text: str) -> list[dict]:
|
||||
"""Extract bold-label fields from a subsection body.
|
||||
|
||||
Returns list of {"label": str, "content": str} in document order.
|
||||
A field runs from its "**LABEL:**" marker until the next one (or EOS).
|
||||
"""
|
||||
matches = list(FIELD_LABEL_RE.finditer(text))
|
||||
if not matches:
|
||||
return []
|
||||
|
||||
fields = []
|
||||
for i, m in enumerate(matches):
|
||||
label = m.group(1).strip()
|
||||
content_start = m.end()
|
||||
content_end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
|
||||
content = text[content_start:content_end].strip()
|
||||
# Strip trailing horizontal rule
|
||||
content = re.sub(r"\s*---\s*$", "", content).strip()
|
||||
fields.append({"label": label, "content": content})
|
||||
return fields
|
||||
|
||||
|
||||
def _build_subsection_dict(
|
||||
title: str, body: str, id_prefix: str, number: int
|
||||
) -> dict:
|
||||
"""Build a structured dict for a threshold claim or issue subsection.
|
||||
|
||||
- id: stable identifier used by update endpoint (e.g. 'threshold_1')
|
||||
- title: the H3 title
|
||||
- number: 1-based ordinal
|
||||
- fields: ordered list of {label, content} pairs
|
||||
- chair_position: extracted separately for UI editing (normalized empty)
|
||||
"""
|
||||
fields = _extract_fields(body)
|
||||
|
||||
# Split title at ": " for cleaner display
|
||||
display_title = title
|
||||
if ": " in title:
|
||||
parts = title.split(": ", 1)
|
||||
display_title = parts[1] if len(parts) > 1 else title
|
||||
|
||||
chair_position = ""
|
||||
regular_fields = []
|
||||
for f in fields:
|
||||
if f["label"] == CHAIR_POSITION_LABEL:
|
||||
chair_position = _normalize_chair_position(f["content"])
|
||||
else:
|
||||
regular_fields.append(f)
|
||||
|
||||
return {
|
||||
"id": f"{id_prefix}_{number}",
|
||||
"number": number,
|
||||
"title": display_title,
|
||||
"raw_title": title,
|
||||
"fields": regular_fields,
|
||||
"chair_position": chair_position,
|
||||
}
|
||||
|
||||
|
||||
def parse(file_path: Path) -> dict[str, Any]:
|
||||
"""Parse analysis-and-research.md into a structured dict.
|
||||
|
||||
Returns a dict with header info, plain-text sections, threshold_claims[],
|
||||
issues[], and conclusions. Tolerant to missing sections.
|
||||
"""
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
|
||||
# Header info from H1 and date line
|
||||
case_match = CASE_NUMBER_RE.search(content)
|
||||
case_number = case_match.group(1) if case_match else ""
|
||||
date_match = DATE_RE.search(content)
|
||||
date_str = date_match.group(1) if date_match else ""
|
||||
|
||||
stat = file_path.stat()
|
||||
mtime_iso = datetime.fromtimestamp(stat.st_mtime).isoformat()
|
||||
|
||||
result: dict[str, Any] = {
|
||||
"header": {
|
||||
"case_number": case_number,
|
||||
"date": date_str,
|
||||
"file_path": str(file_path),
|
||||
"file_size": stat.st_size,
|
||||
"modified_at": mtime_iso,
|
||||
},
|
||||
"represented_party": "",
|
||||
"procedural_background": "",
|
||||
"agreed_facts": "",
|
||||
"disputed_facts": "",
|
||||
"threshold_claims": [],
|
||||
"issues": [],
|
||||
"conclusions": "",
|
||||
"other_sections": [],
|
||||
}
|
||||
|
||||
sections = _split_main_sections(content)
|
||||
|
||||
for number, title, body in sections:
|
||||
title_norm = title.strip()
|
||||
|
||||
if "צד מיוצג" in title_norm:
|
||||
result["represented_party"] = body
|
||||
elif "רקע דיוני" in title_norm:
|
||||
result["procedural_background"] = body
|
||||
elif "עובדות מוסכמות" in title_norm:
|
||||
result["agreed_facts"] = body
|
||||
elif "עובדות שנויות במחלוקת" in title_norm or "שנויות" in title_norm:
|
||||
result["disputed_facts"] = body
|
||||
elif "טענות סף" in title_norm or "טענות הסף" in title_norm:
|
||||
subs = _split_subsections(body)
|
||||
for i, (sub_title, sub_body) in enumerate(subs, start=1):
|
||||
result["threshold_claims"].append(
|
||||
_build_subsection_dict(sub_title, sub_body, "threshold", i)
|
||||
)
|
||||
elif "סוגיות להכרעה" in title_norm or "סוגיות" in title_norm:
|
||||
subs = _split_subsections(body)
|
||||
for i, (sub_title, sub_body) in enumerate(subs, start=1):
|
||||
result["issues"].append(
|
||||
_build_subsection_dict(sub_title, sub_body, "issue", i)
|
||||
)
|
||||
elif "מסקנות" in title_norm or "סיכום" in title_norm:
|
||||
result["conclusions"] = body
|
||||
else:
|
||||
# Unknown section — keep as-is for display
|
||||
result["other_sections"].append(
|
||||
{"number": number, "title": title_norm, "body": body}
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ── Chair position in-place update ───────────────────────────────
|
||||
|
||||
|
||||
def _find_subsection_by_id(
|
||||
content: str, section_id: str
|
||||
) -> tuple[int, int, str] | None:
|
||||
"""Locate a subsection's body range in the raw content.
|
||||
|
||||
Given section_id like 'threshold_2' or 'issue_3', walks the file
|
||||
structure and returns (body_start, body_end, body_text) for that
|
||||
subsection. Returns None if not found.
|
||||
"""
|
||||
parts = section_id.split("_")
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
kind, idx_str = parts
|
||||
try:
|
||||
target_idx = int(idx_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
if kind == "threshold":
|
||||
main_keywords = ("טענות סף", "טענות הסף")
|
||||
elif kind == "issue":
|
||||
main_keywords = ("סוגיות להכרעה", "סוגיות")
|
||||
else:
|
||||
return None
|
||||
|
||||
# Find the main section that contains threshold claims or issues
|
||||
sections_iter = list(re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE))
|
||||
for i, m in enumerate(sections_iter):
|
||||
title = m.group(1).strip()
|
||||
if not any(kw in title for kw in main_keywords):
|
||||
continue
|
||||
|
||||
body_start = m.end()
|
||||
body_end = (
|
||||
sections_iter[i + 1].start() if i + 1 < len(sections_iter) else len(content)
|
||||
)
|
||||
section_body = content[body_start:body_end]
|
||||
|
||||
# Find H3 subsections within
|
||||
h3s = list(re.finditer(r"^###\s+.+?$", section_body, re.MULTILINE))
|
||||
if target_idx < 1 or target_idx > len(h3s):
|
||||
return None
|
||||
|
||||
sub_start_rel = h3s[target_idx - 1].end()
|
||||
sub_end_rel = (
|
||||
h3s[target_idx].start() if target_idx < len(h3s) else len(section_body)
|
||||
)
|
||||
|
||||
abs_start = body_start + sub_start_rel
|
||||
abs_end = body_start + sub_end_rel
|
||||
return abs_start, abs_end, content[abs_start:abs_end]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def update_chair_position(
|
||||
file_path: Path, section_id: str, new_text: str
|
||||
) -> dict[str, Any]:
|
||||
"""Atomically update the chair_position field of one subsection.
|
||||
|
||||
Writes to a temporary file then renames into place (atomic on Linux).
|
||||
Returns {"saved": bool, "section_id": ..., "preview": ...}.
|
||||
Raises FileNotFoundError or ValueError on error.
|
||||
"""
|
||||
if not file_path.exists():
|
||||
raise FileNotFoundError(str(file_path))
|
||||
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
found = _find_subsection_by_id(content, section_id)
|
||||
if not found:
|
||||
raise ValueError(f"section {section_id} not found")
|
||||
|
||||
_abs_start, _abs_end, subsection_body = found
|
||||
|
||||
# Find the "**עמדת ועדת הערר:**" label within this subsection
|
||||
label_pattern = re.compile(
|
||||
r"(\*\*" + re.escape(CHAIR_POSITION_LABEL) + r":\*\*)\s*\n?([^*]*?)(?=\n\*\*|\n##|\n---|\Z)",
|
||||
re.DOTALL,
|
||||
)
|
||||
m = label_pattern.search(subsection_body)
|
||||
if not m:
|
||||
# Label not present — append it at the end of the subsection
|
||||
# (just before the trailing --- if any)
|
||||
new_block = f"\n\n**{CHAIR_POSITION_LABEL}:**\n{new_text.strip()}\n"
|
||||
new_subsection = subsection_body.rstrip() + new_block
|
||||
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
|
||||
else:
|
||||
# Replace the existing content of the chair_position field
|
||||
replacement = f"{m.group(1)}\n{new_text.strip() if new_text.strip() else CHAIR_POSITION_PLACEHOLDERS[0]}\n"
|
||||
new_subsection = (
|
||||
subsection_body[: m.start()] + replacement + subsection_body[m.end():]
|
||||
)
|
||||
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
|
||||
|
||||
# Atomic write
|
||||
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
|
||||
tmp_path.write_text(new_content, encoding="utf-8")
|
||||
os.replace(tmp_path, file_path)
|
||||
|
||||
preview = new_text.strip()[:120]
|
||||
return {
|
||||
"saved": True,
|
||||
"section_id": section_id,
|
||||
"preview": preview,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
Reference in New Issue
Block a user