"""כינוס פרופוזיציות לטיעונים משפטיים מובחנים — argument de-duplication. Workflow: 1. ``claims_extractor`` extracts ~20-30 raw propositions per litigation brief into the ``claims`` table. 2. This module groups those raw propositions, per party, into 6-12 distinct legal arguments via Claude headless (`claude_session`). 3. The result is stored in ``legal_arguments`` plus ``legal_argument_ propositions`` (M:M join) so we keep traceability back to the source claims. Manually de-duping 184 propositions in 3 cases yielded 82 arguments (~24/case) — see ``data/cases/{1017,1018,1019}-03-26/documents/research/ legal-arguments.md`` for the gold standard. **Architectural constraint**: ``claude_session`` only works from the local MCP server (Claude CLI is not installed in the FastAPI container). Calls from ``web/`` must go through MCP tools; calls from MCP tools land here directly. """ from __future__ import annotations import json import logging from uuid import UUID from legal_mcp.services import claude_session, db logger = logging.getLogger(__name__) # Allowed enum values mirror the DB CHECK constraints. ALLOWED_PARTIES = {"appellant", "respondent", "committee", "permit_applicant", "unknown"} ALLOWED_PRIORITIES = {"threshold", "substantive", "procedural", "relief"} # Hebrew labels for the prompt (Claude needs context in the same # language as the source material). PARTY_LABELS_HE = { "appellant": "עוררים", "respondent": "משיבים", "committee": "ועדה מקומית", "permit_applicant": "מבקשי היתר", "unknown": "צד לא מזוהה", } AGGREGATE_PROMPT_TEMPLATE = """אתה מנתח כתבי טענות בתחום תכנון ובנייה (ועדת ערר). לפניך {n} פרופוזיציות גולמיות שחולצו ממסמכי {party_he} בתיק ערר. מטרתך: לקבץ אותן ל-{target_min}-{target_max} **טיעונים משפטיים מובחנים** (ארגומנטים אמיתיים, לא חזרה מילולית של הפרופוזיציות). ## כללי איגוד: 1. **טיעון אמיתי = רעיון משפטי אחד** — לא רשימה של פרופוזיציות, אלא טענה משפטית עצמאית. 2. **מקבצים פרופוזיציות שתומכות באותו רעיון משפטי** — גם אם הניסוח שלהן שונה. 3. **מפרידים בין סוגי טענות**: - **threshold** = טענות סף (זכות עמידה, סמכות, מועדים, שיהוי) - **substantive** = טענות מהותיות (תחולת חוק, פרשנות, חישוב) - **procedural** = פגמי הליך (פרסום, פרוטוקול, ניגוד עניינים) - **relief** = סעדים מבוקשים / סיכומים 4. **כותרת קצרה ובהירה** — תיאורית, לא משפטית מפורטת. 5-15 מילים. 5. **גוף הטיעון בפסקה אחת** — 3-7 שורות עברית, נאמן למקור. 6. **שמירת ה-claim_ids המקוריים** — לכל טיעון, רשום אילו פרופוזיציות תומכות בו. ## פלט: החזר JSON בלבד (ללא markdown, ללא הסברים), array של אובייקטים: ``` [ {{ "title": "כותרת קצרה של הטיעון", "body": "גוף הטיעון בפסקה אחת", "topic": "סוגיה משפטית קצרה (לדוגמה: 'זכות עמידה', 'תחולת תמ\\"א 38')", "priority": "threshold|substantive|procedural|relief", "claim_ids": ["uuid-1", "uuid-2"] }} ] ``` ## הפרופוזיציות: {propositions_json} """ def _build_prompt(party: str, propositions: list[dict]) -> str: """Compose the per-party aggregation prompt.""" n = len(propositions) # Conservative target: ~1 argument per 2-3 propositions, clamped 4-12. target_min = max(4, n // 4) target_max = max(target_min + 1, min(12, n // 2 + 1)) party_he = PARTY_LABELS_HE.get(party, party) # Strip noise from propositions for the prompt — Claude only needs # the id and the text to do the grouping. compact = [ {"id": str(p["id"]), "text": p["claim_text"]} for p in propositions ] propositions_json = json.dumps(compact, ensure_ascii=False, indent=2) return AGGREGATE_PROMPT_TEMPLATE.format( n=n, party_he=party_he, target_min=target_min, target_max=target_max, propositions_json=propositions_json, ) def _normalize_argument(raw: dict, fallback_topic: str = "") -> dict | None: """Validate & normalize a single argument dict from Claude. Returns None if the row is unusable (missing required fields). """ if not isinstance(raw, dict): return None title = (raw.get("title") or "").strip() body = (raw.get("body") or "").strip() if not title or not body: return None priority = raw.get("priority", "substantive") if priority not in ALLOWED_PRIORITIES: priority = "substantive" topic = (raw.get("topic") or fallback_topic or "").strip() or None claim_ids_raw = raw.get("claim_ids") or [] claim_ids: list[UUID] = [] if isinstance(claim_ids_raw, list): for cid in claim_ids_raw: try: claim_ids.append(UUID(str(cid))) except (ValueError, TypeError): continue return { "title": title, "body": body, "topic": topic, "priority": priority, "claim_ids": claim_ids, } async def _aggregate_party( party: str, propositions: list[dict], ) -> list[dict]: """Ask Claude to group one party's propositions; return normalized rows.""" if not propositions: return [] prompt = _build_prompt(party, propositions) try: raw_result = await claude_session.query_json(prompt) except RuntimeError as e: # Surface CLI-unavailable specifically so the caller can report # cleanly instead of crashing the whole job. raise RuntimeError( f"argument_aggregator: claude_session.query_json failed for party " f"'{party}': {e}" ) from e if not isinstance(raw_result, list): logger.warning( "argument_aggregator: Claude returned non-list (%s) for party '%s'", type(raw_result).__name__, party, ) return [] out: list[dict] = [] for entry in raw_result: norm = _normalize_argument(entry) if norm: out.append(norm) return out async def aggregate_claims_to_arguments( case_id: UUID, force: bool = False, ) -> dict: """For a given case, group existing claims into distinct legal arguments. Args: case_id: The case UUID. force: If True, delete existing ``legal_arguments`` for the case before aggregating. Otherwise short-circuit if any rows exist. Returns: A summary dict: ``{"status": "completed"|"skipped"|"no_claims"|"llm_unavailable", "by_party": {party: count}, "total": int, "message": ...}`` """ pool = await db.get_pool() async with pool.acquire() as conn: existing = await conn.fetchval( "SELECT COUNT(*) FROM legal_arguments WHERE case_id = $1", case_id, ) if existing and not force: return { "status": "skipped", "message": f"Found {existing} existing arguments. Use force=True to re-run.", "total": existing, } if force and existing: await conn.execute( "DELETE FROM legal_arguments WHERE case_id = $1", case_id, ) # Pull all claims for this case, grouped by party. rows = await conn.fetch( """SELECT id, party_role, claim_text, claim_index, source_document FROM claims WHERE case_id = $1 ORDER BY party_role, claim_index""", case_id, ) if not rows: return { "status": "no_claims", "message": "No claims found for this case. Run extract_claims first.", "total": 0, } # Group propositions by party. by_party: dict[str, list[dict]] = {} for r in rows: party = r["party_role"] # Map deprecated 'appraiser' or unknown labels to 'unknown'. if party not in ALLOWED_PARTIES: party = "unknown" by_party.setdefault(party, []).append(dict(r)) party_counts: dict[str, int] = {} inserted = 0 errors: list[str] = [] for party, props in by_party.items(): try: arguments = await _aggregate_party(party, props) except RuntimeError as e: # Most likely cause: Claude CLI not installed (running from # the container). Don't crash — record the gap and continue. msg = str(e) if "Claude CLI not found" in msg: return { "status": "llm_unavailable", "message": ( "Claude CLI not available. This service must run from " "the local MCP server (not the FastAPI container)." ), "total": 0, } errors.append(f"{party}: {msg}") continue if not arguments: party_counts[party] = 0 continue async with pool.acquire() as conn: async with conn.transaction(): for idx, arg in enumerate(arguments): arg_id = await conn.fetchval( """INSERT INTO legal_arguments (case_id, party, argument_index, argument_title, argument_body, legal_topic, priority) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id""", case_id, party, idx + 1, arg["title"], arg["body"], arg["topic"], arg["priority"], ) for cid in arg["claim_ids"]: try: await conn.execute( """INSERT INTO legal_argument_propositions (argument_id, claim_id) VALUES ($1, $2) ON CONFLICT DO NOTHING""", arg_id, cid, ) except Exception as e: # noqa: BLE001 # Likely FK violation if the LLM hallucinated # a claim_id. Log and continue. logger.warning( "argument_aggregator: skipped bad claim_id %s for arg %s: %s", cid, arg_id, e, ) inserted += 1 party_counts[party] = len(arguments) result: dict = { "status": "completed", "total": inserted, "by_party": party_counts, "propositions_processed": len(rows), } if errors: result["errors"] = errors result["status"] = "completed_with_errors" return result async def get_legal_arguments( case_id: UUID, party: str = "", ) -> list[dict]: """Return aggregated legal arguments for a case, optionally filtered by party. Each row includes ``supporting_claims`` (list of source claim_ids). """ pool = await db.get_pool() async with pool.acquire() as conn: if party and party in ALLOWED_PARTIES: rows = await conn.fetch( """SELECT id, case_id, party, argument_index, argument_title, argument_body, legal_topic, priority, cited_precedents, created_at, updated_at FROM legal_arguments WHERE case_id = $1 AND party = $2 ORDER BY priority, argument_index""", case_id, party, ) else: rows = await conn.fetch( """SELECT id, case_id, party, argument_index, argument_title, argument_body, legal_topic, priority, cited_precedents, created_at, updated_at FROM legal_arguments WHERE case_id = $1 ORDER BY party, priority, argument_index""", case_id, ) # Pull supporting claims (id + full text) for each argument in one # round-trip. ``supporting_claims`` stays id-only for backwards compat # (counts, MCP consumers); ``supporting_propositions`` carries the text # so the UI can show the raw propositions without an extra fetch. arg_ids = [r["id"] for r in rows] supporting: dict[UUID, list[str]] = {} propositions: dict[UUID, list[dict]] = {} if arg_ids: joins = await conn.fetch( """SELECT lap.argument_id, lap.claim_id, c.claim_text, c.source_document, c.claim_index FROM legal_argument_propositions lap JOIN claims c ON c.id = lap.claim_id WHERE lap.argument_id = ANY($1::uuid[]) ORDER BY c.claim_index""", arg_ids, ) for j in joins: supporting.setdefault(j["argument_id"], []).append(str(j["claim_id"])) propositions.setdefault(j["argument_id"], []).append({ "id": str(j["claim_id"]), "text": j["claim_text"], "source_document": j["source_document"], }) out: list[dict] = [] for r in rows: d = dict(r) d["id"] = str(d["id"]) d["case_id"] = str(d["case_id"]) d["supporting_claims"] = supporting.get(r["id"], []) d["supporting_propositions"] = propositions.get(r["id"], []) out.append(d) return out