Files
legal-ai/mcp-server/src/legal_mcp/services/research_md.py

646 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Parser for analysis-and-research.md produced by the legal-analyst agent.
Extracts the structured content (threshold claims, issues, sections) into
a JSON-serializable dict for UI rendering, and supports atomic in-place
updates of the "עמדת ועדת הערר" (chair position) field in each subsection.
The parser is intentionally tolerant: the file format is under active
development, so we extract what we find rather than enforcing a strict
schema. Missing sections return empty/None values.
"""
from __future__ import annotations
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Any
# Placeholder strings — any of these means "not yet filled"
CHAIR_POSITION_PLACEHOLDERS = (
"[ימולא ע\"י יו\"ר הוועדה]",
"[ימולא ע'י יו'ר הוועדה]",
"[ימולא על ידי יו\"ר הוועדה]",
"[לא מולא]",
"[טרם מולא]",
)
# Any text starting with these prefixes is also a placeholder
# (the analyst sometimes adds explanatory text after the bracket)
CHAIR_POSITION_PLACEHOLDER_PREFIXES = (
"[ימולא",
"ימולא ע",
)
CHAIR_POSITION_LABEL = "עמדת ועדת הערר"
# Matches "## N. title" or "## title" for main sections
MAIN_SECTION_RE = re.compile(r"^##\s+(\d+)\.?\s+(.+?)$", re.MULTILINE)
# Matches "### title" for subsections (threshold claims, issues)
SUBSECTION_RE = re.compile(r"^###\s+(.+?)$", re.MULTILINE)
# Matches "**LABEL:**" field markers — handles both inline and block variants:
# "**עמדת המבקשת:** Some text on same line"
# "**שאלות משפטיות:**\n1. First question"
# The label itself must not contain ** or newlines.
FIELD_LABEL_RE = re.compile(r"^\*\*([^\n*]+?):\*\*[ \t]*", re.MULTILINE)
# Matches the case number in the H1
CASE_NUMBER_RE = re.compile(r"#\s*ניתוח.*?ערר\s+([\d/\-]+)", re.MULTILINE)
# Matches the date line
DATE_RE = re.compile(r"^תאריך:\s*(.+?)\s*$", re.MULTILINE)
RESEARCH_FINDINGS_FILENAME = "research-findings.md"
def _is_placeholder(text: str) -> bool:
"""Check if a field value is one of the placeholder strings (empty)."""
stripped = text.strip()
if not stripped:
return True
for ph in CHAIR_POSITION_PLACEHOLDERS:
if ph in stripped:
return True
for prefix in CHAIR_POSITION_PLACEHOLDER_PREFIXES:
if stripped.startswith(prefix):
return True
return False
def _normalize_chair_position(text: str) -> str:
"""Return empty string for placeholders, otherwise the text."""
if _is_placeholder(text):
return ""
return text.strip()
def _split_main_sections(content: str) -> list[tuple[str, str, str]]:
"""Split content into (number, title, body) tuples for each H2 section.
Handles both numbered (## 1. title) and unnumbered (## title) H2s.
Body is everything up to the next H2.
"""
# Find all H2 positions
h2_positions = []
for m in re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE):
title = m.group(1).strip()
num_match = re.match(r"^(\d+)\.?\s+(.+)", title)
if num_match:
number = num_match.group(1)
title = num_match.group(2).strip()
else:
number = ""
h2_positions.append((m.start(), m.end(), number, title))
sections = []
for i, (_start, end, number, title) in enumerate(h2_positions):
next_start = h2_positions[i + 1][0] if i + 1 < len(h2_positions) else len(content)
body = content[end:next_start].strip()
sections.append((number, title, body))
return sections
def _split_subsections(body: str) -> list[tuple[str, str]]:
"""Split a section body by H3 subsections.
Returns list of (title, content) — content is everything until next H3.
Leading text before first H3 is discarded at this level.
"""
h3_positions = []
for m in re.finditer(r"^###\s+(.+?)$", body, re.MULTILINE):
h3_positions.append((m.start(), m.end(), m.group(1).strip()))
if not h3_positions:
return []
subs = []
for i, (_start, end, title) in enumerate(h3_positions):
next_start = h3_positions[i + 1][0] if i + 1 < len(h3_positions) else len(body)
content = body[end:next_start].strip()
# Strip trailing horizontal rule "---"
content = re.sub(r"\s*---\s*$", "", content).strip()
subs.append((title, content))
return subs
def _extract_fields(text: str) -> list[dict]:
"""Extract bold-label fields from a subsection body.
Returns list of {"label": str, "content": str} in document order.
A field runs from its "**LABEL:**" marker until the next one (or EOS).
"""
matches = list(FIELD_LABEL_RE.finditer(text))
if not matches:
return []
fields = []
for i, m in enumerate(matches):
label = m.group(1).strip()
content_start = m.end()
content_end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
content = text[content_start:content_end].strip()
# Strip trailing horizontal rule
content = re.sub(r"\s*---\s*$", "", content).strip()
fields.append({"label": label, "content": content})
return fields
def _build_subsection_dict(
title: str, body: str, id_prefix: str, number: int
) -> dict:
"""Build a structured dict for a threshold claim or issue subsection.
- id: stable identifier used by update endpoint (e.g. 'threshold_1')
- title: the H3 title
- number: 1-based ordinal
- fields: ordered list of {label, content} pairs
- chair_position: extracted separately for UI editing (normalized empty)
"""
fields = _extract_fields(body)
# Split title at ": " for cleaner display
display_title = title
if ": " in title:
parts = title.split(": ", 1)
display_title = parts[1] if len(parts) > 1 else title
chair_position = ""
regular_fields = []
for f in fields:
if f["label"] == CHAIR_POSITION_LABEL:
chair_position = _normalize_chair_position(f["content"])
else:
regular_fields.append(f)
return {
"id": f"{id_prefix}_{number}",
"number": number,
"title": display_title,
"raw_title": title,
"fields": regular_fields,
"chair_position": chair_position,
}
def parse(file_path: Path) -> dict[str, Any]:
"""Parse analysis-and-research.md into a structured dict.
Returns a dict with header info, plain-text sections, threshold_claims[],
issues[], and conclusions. Tolerant to missing sections.
"""
content = file_path.read_text(encoding="utf-8")
# Header info from H1 and date line
case_match = CASE_NUMBER_RE.search(content)
case_number = case_match.group(1) if case_match else ""
date_match = DATE_RE.search(content)
date_str = date_match.group(1) if date_match else ""
stat = file_path.stat()
mtime_iso = datetime.fromtimestamp(stat.st_mtime).isoformat()
result: dict[str, Any] = {
"header": {
"case_number": case_number,
"date": date_str,
"file_path": str(file_path),
"file_size": stat.st_size,
"modified_at": mtime_iso,
},
"represented_party": "",
"procedural_background": "",
"agreed_facts": "",
"disputed_facts": "",
"threshold_claims": [],
"issues": [],
"conclusions": "",
"other_sections": [],
}
sections = _split_main_sections(content)
for number, title, body in sections:
title_norm = title.strip()
if "צד מיוצג" in title_norm:
result["represented_party"] = body
elif "רקע דיוני" in title_norm:
result["procedural_background"] = body
elif "עובדות מוסכמות" in title_norm:
result["agreed_facts"] = body
elif "עובדות שנויות במחלוקת" in title_norm or "שנויות" in title_norm:
result["disputed_facts"] = body
elif "טענות סף" in title_norm or "טענות הסף" in title_norm:
subs = _split_subsections(body)
for i, (sub_title, sub_body) in enumerate(subs, start=1):
result["threshold_claims"].append(
_build_subsection_dict(sub_title, sub_body, "threshold", i)
)
elif "סוגיות להכרעה" in title_norm or "סוגיות" in title_norm:
subs = _split_subsections(body)
for i, (sub_title, sub_body) in enumerate(subs, start=1):
result["issues"].append(
_build_subsection_dict(sub_title, sub_body, "issue", i)
)
elif "מסקנות" in title_norm or "סיכום" in title_norm:
result["conclusions"] = body
else:
# Unknown section — keep as-is for display
result["other_sections"].append(
{"number": number, "title": title_norm, "body": body}
)
return result
# ── Chair position in-place update ───────────────────────────────
def _find_subsection_by_id(
content: str, section_id: str
) -> tuple[int, int, str] | None:
"""Locate a subsection's body range in the raw content.
Given section_id like 'threshold_2' or 'issue_3', walks the file
structure and returns (body_start, body_end, body_text) for that
subsection. Returns None if not found.
"""
parts = section_id.split("_")
if len(parts) != 2:
return None
kind, idx_str = parts
try:
target_idx = int(idx_str)
except ValueError:
return None
if kind == "threshold":
main_keywords = ("טענות סף", "טענות הסף")
elif kind == "issue":
main_keywords = ("סוגיות להכרעה", "סוגיות")
else:
return None
# Find the main section that contains threshold claims or issues
sections_iter = list(re.finditer(r"^##\s+(.+?)$", content, re.MULTILINE))
for i, m in enumerate(sections_iter):
title = m.group(1).strip()
if not any(kw in title for kw in main_keywords):
continue
body_start = m.end()
body_end = (
sections_iter[i + 1].start() if i + 1 < len(sections_iter) else len(content)
)
section_body = content[body_start:body_end]
# Find H3 subsections within
h3s = list(re.finditer(r"^###\s+.+?$", section_body, re.MULTILINE))
if target_idx < 1 or target_idx > len(h3s):
return None
sub_start_rel = h3s[target_idx - 1].end()
sub_end_rel = (
h3s[target_idx].start() if target_idx < len(h3s) else len(section_body)
)
abs_start = body_start + sub_start_rel
abs_end = body_start + sub_end_rel
return abs_start, abs_end, content[abs_start:abs_end]
return None
def update_chair_position(
file_path: Path, section_id: str, new_text: str
) -> dict[str, Any]:
"""Atomically update the chair_position field of one subsection.
Writes to a temporary file then renames into place (atomic on Linux).
Returns {"saved": bool, "section_id": ..., "preview": ...}.
Raises FileNotFoundError or ValueError on error.
"""
if not file_path.exists():
raise FileNotFoundError(str(file_path))
content = file_path.read_text(encoding="utf-8")
found = _find_subsection_by_id(content, section_id)
if not found:
raise ValueError(f"section {section_id} not found")
_abs_start, _abs_end, subsection_body = found
# Find the "**עמדת ועדת הערר:**" label within this subsection
label_pattern = re.compile(
r"(\*\*" + re.escape(CHAIR_POSITION_LABEL) + r":\*\*)\s*\n?([^*]*?)(?=\n\*\*|\n##|\n---|\Z)",
re.DOTALL,
)
m = label_pattern.search(subsection_body)
if not m:
# Label not present — append it at the end of the subsection
# (just before the trailing --- if any)
new_block = f"\n\n**{CHAIR_POSITION_LABEL}:**\n{new_text.strip()}\n"
new_subsection = subsection_body.rstrip() + new_block
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
else:
# Replace the existing content of the chair_position field
replacement = f"{m.group(1)}\n{new_text.strip() if new_text.strip() else CHAIR_POSITION_PLACEHOLDERS[0]}\n"
new_subsection = (
subsection_body[: m.start()] + replacement + subsection_body[m.end():]
)
new_content = content[:_abs_start] + new_subsection + content[_abs_end:]
# Atomic write
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
tmp_path.write_text(new_content, encoding="utf-8")
os.replace(tmp_path, file_path)
preview = new_text.strip()[:120]
return {
"saved": True,
"section_id": section_id,
"preview": preview,
"timestamp": datetime.now().isoformat(),
}
# ── Chair directions extraction (for downstream agents) ─────────
def extract_chair_directions(file_path: Path) -> dict[str, Any]:
"""Extract only the chair positions from analysis-and-research.md.
Returns a compact dict that the legal-writer agent can use as direction:
{
"case_number": "1033-25",
"file_path": "...",
"file_exists": True,
"total_items": 9,
"filled_count": 3,
"empty_count": 6,
"status": "partial", # "empty" | "partial" | "complete"
"threshold_claims": [
{"id": "threshold_1", "number": 1, "title": "...", "direction": "..."},
...
],
"issues": [
{"id": "issue_1", "number": 1, "title": "...", "direction": "..."},
...
]
}
Used by legal-writer to convert chair positions into direction docs
before generating blocks of the decision.
"""
if not file_path.exists():
return {
"file_exists": False,
"status": "missing",
"error": "analysis-and-research.md not found",
"threshold_claims": [],
"issues": [],
"total_items": 0,
"filled_count": 0,
"empty_count": 0,
}
parsed = parse(file_path)
def reduce_item(item: dict) -> dict:
return {
"id": item["id"],
"number": item["number"],
"title": item["title"],
"direction": item.get("chair_position", "") or "",
}
threshold = [reduce_item(t) for t in parsed.get("threshold_claims", [])]
issues = [reduce_item(i) for i in parsed.get("issues", [])]
all_items = threshold + issues
total = len(all_items)
filled = sum(1 for x in all_items if x["direction"].strip())
empty = total - filled
if total == 0:
status = "missing"
elif filled == 0:
status = "empty"
elif filled == total:
status = "complete"
else:
status = "partial"
return {
"file_exists": True,
"file_path": str(file_path),
"case_number": parsed.get("header", {}).get("case_number", ""),
"status": status,
"total_items": total,
"filled_count": filled,
"empty_count": empty,
"threshold_claims": threshold,
"issues": issues,
}
# ── Full analysis extraction (for legal-writer) ──────────────────
# Map Hebrew field labels → stable English keys for JSON output
_FIELD_KEY_MAP = {
"טענה": "claims",
"טענה (claim)": "claims",
"טענות": "claims",
"תשובה": "responses",
"תשובה (response)": "responses",
"תשובות": "responses",
"תגובה": "replies",
"תגובה (reply)": "replies",
"תגובות": "replies",
# Analyst sometimes appends party name to the label
# e.g. "תגובה (reply — קובר)" — catch the pattern dynamically below
"ניתוח אסטרטגי": "strategic_analysis",
"חוזקות": "strengths",
"חולשות": "weaknesses",
"הזדמנויות": "opportunities",
"שאלות משפטיות": "legal_questions",
"חיפוש תקדימים": "precedent_search",
"חקיקה רלוונטית": "relevant_legislation",
"תקדימים מהקורפוס הפנימי": "internal_precedents",
}
def _fields_to_dict(fields: list[dict]) -> dict[str, str]:
"""Convert ordered field list to a dict with stable English keys.
Unknown labels are kept as-is (Hebrew) so no data is lost.
Handles dynamic labels like "תגובה (reply — קובר)" by matching prefix.
"""
result: dict[str, str] = {}
for f in fields:
label = f["label"]
key = _FIELD_KEY_MAP.get(label)
if key is None:
# Try prefix matching for dynamic labels (e.g. "תגובה (reply — name)")
if label.startswith("תגובה"):
key = "replies"
elif label.startswith("טענה"):
key = "claims"
elif label.startswith("תשובה"):
key = "responses"
else:
key = label
result[key] = f["content"]
return result
def extract_full_analysis(file_path: Path) -> dict[str, Any]:
"""Extract the complete strategic analysis from analysis-and-research.md.
Unlike extract_chair_directions (which returns only chair positions),
this returns ALL fields per issue: claims, responses, replies,
strengths/weaknesses/opportunities, legal questions, legislation,
and internal precedents — everything the legal-writer needs to
produce block-yod (discussion).
Returns the same envelope as extract_chair_directions (status, counts)
plus full field data in each item.
"""
if not file_path.exists():
return {
"file_exists": False,
"status": "missing",
"error": "analysis-and-research.md not found",
"procedural_background": "",
"agreed_facts": "",
"disputed_facts": "",
"conclusions": "",
"threshold_claims": [],
"issues": [],
"total_items": 0,
"filled_count": 0,
"empty_count": 0,
}
parsed = parse(file_path)
def enrich_item(item: dict) -> dict:
"""Return full item with all fields as a flat dict."""
enriched = {
"id": item["id"],
"number": item["number"],
"title": item["title"],
"direction": item.get("chair_position", "") or "",
}
# Add all extracted fields with stable keys
enriched.update(_fields_to_dict(item.get("fields", [])))
return enriched
threshold = [enrich_item(t) for t in parsed.get("threshold_claims", [])]
issues = [enrich_item(i) for i in parsed.get("issues", [])]
all_items = threshold + issues
total = len(all_items)
filled = sum(1 for x in all_items if x["direction"].strip())
empty = total - filled
if total == 0:
status = "missing"
elif filled == 0:
status = "empty"
elif filled == total:
status = "complete"
else:
status = "partial"
return {
"file_exists": True,
"file_path": str(file_path),
"case_number": parsed.get("header", {}).get("case_number", ""),
"modified_at": parsed.get("header", {}).get("modified_at", ""),
"status": status,
"total_items": total,
"filled_count": filled,
"empty_count": empty,
"procedural_background": parsed.get("procedural_background", ""),
"agreed_facts": parsed.get("agreed_facts", ""),
"disputed_facts": parsed.get("disputed_facts", ""),
"conclusions": parsed.get("conclusions", ""),
"threshold_claims": threshold,
"issues": issues,
}
# ── Research findings extraction ──────────────────────────────────
def extract_research_findings(file_path: Path) -> dict[str, Any]:
"""Extract structured research findings from research-findings.md.
The file is produced by the legal-researcher agent and contains:
precedent summaries, plan mappings, timeline, and recommendations.
Returns a structured dict or a status-only dict if file is missing.
"""
if not file_path.exists():
return {
"file_exists": False,
"status": "missing",
"error": "research-findings.md not found",
}
content = file_path.read_text(encoding="utf-8")
stat = file_path.stat()
mtime_iso = datetime.fromtimestamp(stat.st_mtime).isoformat()
sections = _split_main_sections(content)
result: dict[str, Any] = {
"file_exists": True,
"file_path": str(file_path),
"modified_at": mtime_iso,
"file_size": stat.st_size,
"precedent_summaries": [],
"plan_mappings": [],
"timeline": "",
"recommendations": "",
"other_sections": [],
}
for _number, title, body in sections:
title_norm = title.strip()
if "סיכום פסיקה" in title_norm or "פסיקה" in title_norm:
subs = _split_subsections(body)
for sub_title, sub_body in subs:
fields = _extract_fields(sub_body)
result["precedent_summaries"].append({
"title": sub_title,
"fields": {f["label"]: f["content"] for f in fields},
"raw": sub_body if not fields else "",
})
elif "מיפוי תכנית" in title_norm or "תכנית" in title_norm:
subs = _split_subsections(body)
for sub_title, sub_body in subs:
fields = _extract_fields(sub_body)
result["plan_mappings"].append({
"title": sub_title,
"fields": {f["label"]: f["content"] for f in fields},
"raw": sub_body if not fields else "",
})
elif "ציר זמן" in title_norm:
result["timeline"] = body
elif "המלצות" in title_norm:
result["recommendations"] = body
else:
result["other_sections"].append({
"title": title_norm,
"body": body,
})
return result