feat(training): Style Studio — upload, rich corpus, lessons, curator portrait, chat
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 2m7s

Six-phase upgrade of /training from a read-only dashboard into a full
Style Studio for managing Daphna's style corpus.

- Upload Sheet on /training: file → proofread preview → commit (no more
  CLI-only `upload-training` skill).
- Rich corpus metadata: GET /api/training/corpus returns summary, outcome,
  key_principles, page_count, parties (regex), legal_citation, lessons_count.
  PATCH endpoint for chair edits. CorpusDetailDrawer with 4 tabs (details
  /content/lessons/patterns) replaces the bare table row.
- LLM metadata enrichment: style_metadata_extractor + MCP tools
  (style_corpus_enrich, style_corpus_pending_enrichment) fill summary
  /outcome/key_principles via claude_session (free, host-side).
- Per-decision lessons: new decision_lessons table + 4 REST endpoints +
  LessonsTab in drawer; hermes-curator now auto-posts findings as
  decision_lessons(source=curator).
- Curator Portrait tab: prompt rendered with link to Gitea, recent
  curator findings, style_analyzer training prompts, propose-change
  form that writes proposals to data/curator-proposals/ for manual
  chair review (no auto-mutation of the agent file).
- Style chat tab: SSE-streamed conversations with the style agent.
  New host-side pm2 service (legal-chat-service, port 8770) wraps
  claude CLI with stream-json + --resume continuation; FastAPI proxies
  via host.docker.internal. Zero API cost — uses chaim's claude.ai
  subscription. chat_conversations + chat_messages persist history.

Architecture: keeps the existing rule that claude_session only runs
on the host (not the container). The new legal-chat-service is the
canonical bridge between the container and the local CLI for the chat
feature; everything else (upload, metadata, lessons) stays within the
container's existing capabilities.

Audit script (scripts/audit_training_corpus.py) included for verifying
which corpus rows still need enrichment.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 10:06:22 +00:00
parent 0629f19d5f
commit bb0cd7c6a2
23 changed files with 4568 additions and 75 deletions

View File

@@ -12,6 +12,7 @@ import subprocess
import sys
import time
from contextlib import asynccontextmanager
from datetime import date as date_type
from pathlib import Path
from uuid import UUID, uuid4
@@ -945,32 +946,648 @@ async def training_corpus_delete(corpus_id: str):
return result
def _format_legal_citation(decision_number: str, decision_date: str) -> str:
"""Compose the Israeli ועדת ערר citation string from corpus metadata.
Mirrors how decisions are referenced in Daphna's own writing — e.g.
"ערר 1130-25 ועדת ערר ירושלים (26.4.2026)". Empty parts are dropped
gracefully so partially populated rows still produce a readable label.
"""
if not decision_number:
return ""
parts = [f"ערר {decision_number}", "ועדת ערר ירושלים"]
if decision_date:
try:
d = date_type.fromisoformat(decision_date)
parts.append(f"({d.day}.{d.month}.{d.year})")
except ValueError:
pass
return " ".join(parts)
_PARTIES_PATTERNS = (
# "העורר: X" or "העוררים: X". Captures up to a newline / end of stanza.
re.compile(r"העורר(?:ים|ת)?[:\s]+([^\n]{3,120})"),
re.compile(r"המבקש(?:ים|ת)?[:\s]+([^\n]{3,120})"),
re.compile(r"בעניין[:\s]+([^\n]{3,120})"),
)
_RESPONDENT_PATTERNS = (
re.compile(r"המשיב(?:ים|ה|ות)?[:\s]+([^\n]{3,120})"),
re.compile(r"נגד\s*\n+\s*([^\n]{3,120})"),
)
def _extract_parties(text: str) -> dict[str, str]:
"""Best-effort regex extraction of עורר/משיב from the first 5K of full_text.
We only scan the head of the document because the parties are always
declared at the top in Israeli legal decisions. The result is a hint
for display — never authoritative — so a miss returns an empty string
rather than raising.
"""
head = (text or "")[:5000]
appellant = respondent = ""
for pat in _PARTIES_PATTERNS:
m = pat.search(head)
if m:
appellant = m.group(1).strip(" .,-—")
break
for pat in _RESPONDENT_PATTERNS:
m = pat.search(head)
if m:
respondent = m.group(1).strip(" .,-—")
break
return {"appellant": appellant, "respondent": respondent}
@app.get("/api/training/corpus")
async def training_corpus_list():
"""List all decisions currently in the style corpus."""
"""List all decisions currently in the style corpus, with enriched metadata.
Joins to ``documents`` via FK when available, falling back to the
title-token match used in the chunking pipeline so legacy rows with
``style_corpus.document_id IS NULL`` still resolve to their page_count
and chunk counts.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, decision_number, decision_date, subject_categories, "
" length(full_text) as chars, created_at "
"FROM style_corpus "
"ORDER BY created_at DESC"
"""
SELECT sc.id,
sc.decision_number,
sc.decision_date,
sc.subject_categories,
length(sc.full_text) AS chars,
substring(sc.full_text from 1 for 5000) AS head_text,
sc.summary,
sc.outcome,
sc.key_principles,
sc.appeal_subtype,
sc.practice_area,
sc.document_id,
sc.created_at,
d.page_count AS page_count,
d.title AS doc_title
FROM style_corpus sc
LEFT JOIN documents d ON d.id = sc.document_id
ORDER BY sc.created_at DESC
"""
)
return [
{
lessons_counts = await db.count_decision_lessons_per_corpus()
out = []
for r in rows:
cats = r["subject_categories"]
if isinstance(cats, str):
try:
cats = json.loads(cats)
except json.JSONDecodeError:
cats = []
kp = r["key_principles"]
if isinstance(kp, str):
try:
kp = json.loads(kp)
except json.JSONDecodeError:
kp = []
decision_date = str(r["decision_date"]) if r["decision_date"] else ""
parties = _extract_parties(r["head_text"] or "")
out.append({
"id": str(r["id"]),
"decision_number": r["decision_number"] or "",
"decision_date": str(r["decision_date"]) if r["decision_date"] else "",
"subject_categories": (
json.loads(r["subject_categories"])
if isinstance(r["subject_categories"], str)
else r["subject_categories"] or []
),
"decision_date": decision_date,
"subject_categories": cats or [],
"chars": r["chars"],
"created_at": r["created_at"].isoformat() if r["created_at"] else "",
# ── enriched fields ──
"summary": r["summary"] or "",
"outcome": r["outcome"] or "",
"key_principles": kp or [],
"appeal_subtype": r["appeal_subtype"] or "",
"practice_area": r["practice_area"] or "",
"page_count": r["page_count"] or 0,
"document_id": str(r["document_id"]) if r["document_id"] else None,
"doc_title": r["doc_title"] or "",
"parties": parties,
"legal_citation": _format_legal_citation(r["decision_number"] or "", decision_date),
"lessons_count": lessons_counts.get(str(r["id"]), 0),
})
return out
# ── Style-agent chat (delegated to legal-chat-service on host) ─────
class ChatConversationCreate(BaseModel):
title: str = "שיחה חדשה"
style_corpus_id: str | None = None # optional — scope chat to a decision
class ChatMessageRequest(BaseModel):
content: str
def _conv_to_json(row: dict) -> dict:
"""Serialize a chat_conversations row for the API."""
return {
"id": str(row["id"]),
"title": row.get("title") or "",
"style_corpus_id": str(row["style_corpus_id"]) if row.get("style_corpus_id") else None,
"decision_number": row.get("decision_number") or "",
"claude_session_id": row.get("claude_session_id"),
"message_count": row.get("message_count", 0),
"created_at": row["created_at"].isoformat() if row.get("created_at") else "",
"last_message_at": row["last_message_at"].isoformat() if row.get("last_message_at") else "",
}
def _msg_to_json(row: dict) -> dict:
return {
"id": str(row["id"]),
"role": row["role"],
"content": row["content"],
"created_at": row["created_at"].isoformat() if row.get("created_at") else "",
}
@app.post("/api/training/chat/conversations")
async def chat_create_conversation(body: ChatConversationCreate):
"""Create a new style-agent chat conversation."""
corpus_uuid: UUID | None = None
if body.style_corpus_id:
try:
corpus_uuid = UUID(body.style_corpus_id)
except ValueError:
raise HTTPException(400, "invalid style_corpus_id")
row = await db.create_chat_conversation(
title=body.title.strip() or "שיחה חדשה",
style_corpus_id=corpus_uuid,
)
if not row:
raise HTTPException(500, "failed to create conversation")
return _conv_to_json(row)
@app.get("/api/training/chat/conversations")
async def chat_list_conversations(limit: int = 50):
rows = await db.list_chat_conversations(limit=limit)
return [_conv_to_json(r) for r in rows]
@app.get("/api/training/chat/conversations/{conv_id}")
async def chat_get_conversation(conv_id: str):
try:
cid = UUID(conv_id)
except ValueError:
raise HTTPException(400, "invalid conv_id")
conv = await db.get_chat_conversation(cid)
if not conv:
raise HTTPException(404, "conversation not found")
messages = await db.list_chat_messages(cid)
return {
"conversation": _conv_to_json(conv),
"messages": [_msg_to_json(m) for m in messages],
}
@app.delete("/api/training/chat/conversations/{conv_id}")
async def chat_delete_conversation(conv_id: str):
try:
cid = UUID(conv_id)
except ValueError:
raise HTTPException(400, "invalid conv_id")
result = await db.delete_chat_conversation(cid)
if not result.get("deleted"):
raise HTTPException(404, "conversation not found")
return result
@app.post("/api/training/chat/conversations/{conv_id}/messages")
async def chat_send_message(conv_id: str, body: ChatMessageRequest):
"""Send a user message; stream the assistant response as SSE.
Proxies through ``web.chat_proxy.stream_chat_message`` to the
legal-chat-service running on the host.
"""
try:
cid = UUID(conv_id)
except ValueError:
raise HTTPException(400, "invalid conv_id")
text = (body.content or "").strip()
if not text:
raise HTTPException(400, "content is required")
from web import chat_proxy
return await chat_proxy.stream_chat_message(cid, text)
@app.get("/api/training/chat/health")
async def chat_health():
"""Probe legal-chat-service liveness from inside the container.
Useful when the UI wants to gracefully degrade ("שירות הצ'אט אינו
זמין") instead of letting messages fail mid-stream.
"""
from web import chat_proxy
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(5.0)) as client:
r = await client.get(f"{chat_proxy.CHAT_SERVICE_URL}/health")
return {"reachable": r.status_code == 200, "status": r.status_code,
"url": chat_proxy.CHAT_SERVICE_URL}
except Exception as e:
return {"reachable": False, "error": str(e),
"url": chat_proxy.CHAT_SERVICE_URL}
# ── Curator portrait — read prompt + stats + accept proposals ──────
# The curator agent's prompt is symlinked into Paperclip, but the source
# lives in the legal-ai repo. Resolve via env so the container (where the
# agent file is mounted from a different path) and the host both work.
_AGENTS_DIR = Path(os.environ.get(
"AGENTS_DIR",
str(Path(__file__).resolve().parent.parent / ".claude" / "agents"),
))
_CURATOR_PROPOSALS_DIR = Path(os.environ.get(
"CURATOR_PROPOSALS_DIR",
str(Path(__file__).resolve().parent.parent / "data" / "curator-proposals"),
))
_GITEA_REPO_BASE = os.environ.get(
"GITEA_REPO_BASE",
"https://gitea.nautilus.marcusgroup.org/ezer-mishpati/legal-ai",
)
@app.get("/api/training/curator/prompt")
async def get_curator_prompt():
"""Return the hermes-curator agent's prompt (read-only) + Gitea source URL.
The file is the canonical source of how the curator analyzes Daphna's
final decisions. Changes go through git/Gitea, not the UI — the UI just
surfaces it for transparency.
"""
path = _AGENTS_DIR / "hermes-curator.md"
if not path.exists():
raise HTTPException(404, f"curator prompt not found at {path}")
try:
content = path.read_text(encoding="utf-8")
stat = path.stat()
except OSError as e:
raise HTTPException(500, f"failed to read curator prompt: {e}")
gitea_url = (
f"{_GITEA_REPO_BASE}/src/branch/main/.claude/agents/hermes-curator.md"
)
return {
"content": content,
"filename": path.name,
"bytes": stat.st_size,
"last_modified": stat.st_mtime,
"gitea_url": gitea_url,
}
@app.get("/api/training/curator/style-analyzer-prompt")
async def get_style_analyzer_prompt():
"""Return the system prompt that style_analyzer.py uses to extract patterns.
Surfaces the *training-time* prompt (Claude Opus 1M context) so the
chair can compare it against the curator's post-export prompt. Both
are shown side-by-side in the curator-portrait tab.
"""
# Embedded as a string so we don't need to import the service module
# here (which would pull in claude_session + db). The prompt is the
# one defined in mcp-server/src/legal_mcp/services/style_analyzer.py.
try:
from legal_mcp.services import style_analyzer
return {
"analysis_prompt": style_analyzer.ANALYSIS_PROMPT,
"single_decision_prompt": style_analyzer.SINGLE_DECISION_PROMPT,
"synthesis_prompt": style_analyzer.SYNTHESIS_PROMPT,
"max_input_tokens": style_analyzer.MAX_INPUT_TOKENS,
}
for r in rows
]
except Exception as e:
raise HTTPException(500, f"failed to load style_analyzer prompt: {e}")
@app.get("/api/training/curator/stats")
async def get_curator_stats():
"""Cheap aggregate stats over decision_lessons + style_corpus.
Used by the Curator-Portrait tab to show "10 curator findings across 24
decisions". We deliberately keep this server-side and aggregate so the
UI can render a single card without fanning out N queries.
"""
pool = await db.get_pool()
async with pool.acquire() as conn:
total_lessons = await conn.fetchval(
"SELECT count(*) FROM decision_lessons WHERE source = 'curator'"
)
decisions_with_findings = await conn.fetchval(
"SELECT count(DISTINCT style_corpus_id) FROM decision_lessons "
"WHERE source = 'curator'"
)
total_corpus = await conn.fetchval("SELECT count(*) FROM style_corpus")
applied = await conn.fetchval(
"SELECT count(*) FROM decision_lessons "
"WHERE source = 'curator' AND applied_to_skill = true"
)
# Last 10 curator findings — newest first
recent_rows = await conn.fetch(
"""
SELECT dl.id, dl.lesson_text, dl.category, dl.applied_to_skill,
dl.created_at,
sc.decision_number, sc.decision_date
FROM decision_lessons dl
JOIN style_corpus sc ON sc.id = dl.style_corpus_id
WHERE dl.source = 'curator'
ORDER BY dl.created_at DESC
LIMIT 10
"""
)
return {
"total_findings": total_lessons or 0,
"decisions_with_findings": decisions_with_findings or 0,
"decisions_total": total_corpus or 0,
"findings_applied": applied or 0,
"recent_findings": [
{
"id": str(r["id"]),
"lesson_text": r["lesson_text"],
"category": r["category"],
"applied_to_skill": bool(r["applied_to_skill"]),
"decision_number": r["decision_number"] or "",
"decision_date": str(r["decision_date"]) if r["decision_date"] else "",
"created_at": r["created_at"].isoformat() if r["created_at"] else "",
}
for r in recent_rows
],
}
class CuratorProposal(BaseModel):
title: str
proposed_change: str # markdown — what to change in the prompt
rationale: str # markdown — why
@app.post("/api/training/curator/proposals")
async def create_curator_proposal(body: CuratorProposal):
"""Save a proposed change to the curator prompt as a file on disk.
No automatic commit, no overwrite — the chair (chaim) reviews the
file manually and applies it through git. This is intentional: the
prompt is too load-bearing to mutate from a web UI.
"""
title = (body.title or "").strip()
if not title:
raise HTTPException(400, "title is required")
if not body.proposed_change.strip():
raise HTTPException(400, "proposed_change is required")
_CURATOR_PROPOSALS_DIR.mkdir(parents=True, exist_ok=True)
# Slug-ish filename — strip anything that isn't a Hebrew letter, ASCII
# letter, digit, hyphen, or underscore. Hebrew letters are explicitly
# allowed because most proposals will be in Hebrew.
slug = re.sub(r"[^\w֐-׿\-]+", "-", title)[:60].strip("-_") or "proposal"
today = date_type.today().isoformat()
fname = f"{today}-{slug}.md"
path = _CURATOR_PROPOSALS_DIR / fname
# If a proposal with the same slug already exists today, append a
# numeric suffix so we don't silently overwrite.
idx = 2
while path.exists():
path = _CURATOR_PROPOSALS_DIR / f"{today}-{slug}-{idx}.md"
idx += 1
md = (
f"# הצעת שינוי לפרומפט hermes-curator\n\n"
f"- **תאריך:** {today}\n"
f"- **כותרת:** {title}\n\n"
f"## שינוי מוצע\n\n{body.proposed_change.strip()}\n\n"
f"## נימוק\n\n{body.rationale.strip() or '(לא ניתן)'}\n"
)
try:
path.write_text(md, encoding="utf-8")
except OSError as e:
raise HTTPException(500, f"failed to write proposal: {e}")
return {
"saved": True,
"filename": path.name,
"path": str(path),
"bytes": len(md.encode("utf-8")),
}
@app.get("/api/training/curator/proposals")
async def list_curator_proposals():
"""List proposed-change files in data/curator-proposals/, newest first."""
if not _CURATOR_PROPOSALS_DIR.exists():
return []
items = []
for p in sorted(_CURATOR_PROPOSALS_DIR.iterdir(),
key=lambda f: f.stat().st_mtime, reverse=True):
if not p.is_file() or p.suffix.lower() != ".md":
continue
stat = p.stat()
items.append({
"filename": p.name,
"bytes": stat.st_size,
"modified_at": stat.st_mtime,
})
return items
# ── Per-decision lessons (decision_lessons table) ──────────────────
class LessonCreate(BaseModel):
lesson_text: str
category: str = "general"
source: str = "manual"
class LessonPatch(BaseModel):
lesson_text: str | None = None
category: str | None = None
applied_to_skill: bool | None = None
_LESSON_CATEGORIES = {"style", "structure", "lexicon", "tabular", "general"}
_LESSON_SOURCES = {"manual", "curator", "chair", "style_analyzer"}
def _lesson_to_json(row: dict) -> dict:
return {
"id": str(row["id"]),
"style_corpus_id": str(row["style_corpus_id"]),
"lesson_text": row["lesson_text"],
"category": row["category"],
"source": row["source"],
"applied_to_skill": bool(row["applied_to_skill"]),
"created_by": row.get("created_by", ""),
"created_at": row["created_at"].isoformat() if row.get("created_at") else "",
"updated_at": row["updated_at"].isoformat() if row.get("updated_at") else "",
}
@app.get("/api/training/corpus/{corpus_id}/lessons")
async def list_corpus_lessons(corpus_id: str):
try:
cid = UUID(corpus_id)
except ValueError:
raise HTTPException(400, "invalid corpus_id")
rows = await db.list_decision_lessons(cid)
return [_lesson_to_json(r) for r in rows]
@app.post("/api/training/corpus/{corpus_id}/lessons")
async def add_corpus_lesson(corpus_id: str, body: LessonCreate):
try:
cid = UUID(corpus_id)
except ValueError:
raise HTTPException(400, "invalid corpus_id")
text = (body.lesson_text or "").strip()
if not text:
raise HTTPException(400, "lesson_text is required")
if body.category not in _LESSON_CATEGORIES:
raise HTTPException(400, f"invalid category; allowed: {sorted(_LESSON_CATEGORIES)}")
if body.source not in _LESSON_SOURCES:
raise HTTPException(400, f"invalid source; allowed: {sorted(_LESSON_SOURCES)}")
row = await db.add_decision_lesson(
cid, lesson_text=text, category=body.category, source=body.source,
)
if not row:
raise HTTPException(500, "failed to insert lesson")
return _lesson_to_json(row)
@app.patch("/api/training/lessons/{lesson_id}")
async def patch_corpus_lesson(lesson_id: str, body: LessonPatch):
try:
lid = UUID(lesson_id)
except ValueError:
raise HTTPException(400, "invalid lesson_id")
if body.category is not None and body.category not in _LESSON_CATEGORIES:
raise HTTPException(400, f"invalid category; allowed: {sorted(_LESSON_CATEGORIES)}")
result = await db.update_decision_lesson(
lid,
lesson_text=body.lesson_text,
category=body.category,
applied_to_skill=body.applied_to_skill,
)
if not result.get("updated"):
if result.get("reason") == "not found":
raise HTTPException(404, "lesson not found")
return result # "nothing to update" — 200 with reason
return result
@app.delete("/api/training/lessons/{lesson_id}")
async def delete_corpus_lesson(lesson_id: str):
try:
lid = UUID(lesson_id)
except ValueError:
raise HTTPException(400, "invalid lesson_id")
result = await db.delete_decision_lesson(lid)
if not result.get("deleted"):
raise HTTPException(404, "lesson not found")
return result
@app.get("/api/training/corpus/{corpus_id}/full-text")
async def training_corpus_full_text(corpus_id: str):
"""Return the proofread full_text for a single corpus row.
Kept out of the list endpoint because full_text is large (50K-650K chars
per decision) and the table view only needs counts. The drawer fetches
it on demand when the chair opens the "content" tab.
"""
try:
cid = UUID(corpus_id)
except ValueError:
raise HTTPException(400, "invalid corpus_id")
pool = await db.get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT decision_number, full_text FROM style_corpus WHERE id = $1",
cid,
)
if not row:
raise HTTPException(404, "corpus row not found")
return {
"id": corpus_id,
"decision_number": row["decision_number"] or "",
"full_text": row["full_text"] or "",
}
class TrainingCorpusPatch(BaseModel):
"""Editable metadata fields on a style_corpus row.
full_text is intentionally NOT editable — the corpus is write-once.
For corrections, re-upload the decision via /api/training/upload.
"""
decision_number: str | None = None
decision_date: str | None = None # ISO YYYY-MM-DD, or "" to clear
subject_categories: list[str] | None = None
summary: str | None = None
outcome: str | None = None
key_principles: list[str] | None = None
appeal_subtype: str | None = None
practice_area: str | None = None
@app.patch("/api/training/corpus/{corpus_id}")
async def training_corpus_patch(corpus_id: str, patch: TrainingCorpusPatch):
"""Update metadata fields on a corpus row. Only provided fields are touched."""
try:
cid = UUID(corpus_id)
except ValueError:
raise HTTPException(400, "invalid corpus_id")
fields = patch.model_dump(exclude_none=True)
if not fields:
return {"updated": False, "reason": "no fields to update"}
# Coerce decision_date "" → SQL NULL, otherwise parse as DATE.
if "decision_date" in fields:
v = fields["decision_date"]
if v == "":
fields["decision_date"] = None
else:
try:
fields["decision_date"] = date_type.fromisoformat(v)
except ValueError as e:
raise HTTPException(400, f"invalid decision_date: {e}")
# subject_categories + key_principles are JSONB columns.
if "subject_categories" in fields:
fields["subject_categories"] = json.dumps(fields["subject_categories"])
if "key_principles" in fields:
fields["key_principles"] = json.dumps(fields["key_principles"])
# Build a positional UPDATE — asyncpg doesn't support named parameters.
cols = list(fields.keys())
set_clause = ", ".join(f"{c} = ${i + 2}" for i, c in enumerate(cols))
values = [fields[c] for c in cols]
pool = await db.get_pool()
async with pool.acquire() as conn:
result = await conn.fetchrow(
f"UPDATE style_corpus SET {set_clause} "
f"WHERE id = $1 "
f"RETURNING id, decision_number, decision_date, summary, outcome",
cid, *values,
)
if not result:
raise HTTPException(404, "corpus row not found")
return {
"updated": True,
"id": str(result["id"]),
"decision_number": result["decision_number"] or "",
"decision_date": str(result["decision_date"]) if result["decision_date"] else "",
"summary_len": len(result["summary"] or ""),
"outcome_len": len(result["outcome"] or ""),
}
# Headers that defeat proxy buffering for SSE streams. `X-Accel-Buffering: no`

176
web/chat_proxy.py Normal file
View File

@@ -0,0 +1,176 @@
"""FastAPI ↔ legal-chat-service streaming bridge.
The browser hits ``/api/training/chat/conversations/{id}/messages`` on
the legal-ai container. The container is sealed off from the host's
``claude`` CLI (intentional — see ``claude_session.py`` docstring), so
we forward each request to the pm2-managed ``legal-chat-service`` over
loopback (``host.docker.internal:8770``).
Responsibilities:
- Save the user message to ``chat_messages`` before streaming starts.
- Open an HTTP streaming connection to the host service.
- Forward each SSE event to the browser as-is, accumulating the
assistant text and any ``session_id`` so we can persist them once
the stream closes.
- Persist the assistant turn + the CLI's session_id at end-of-stream.
"""
from __future__ import annotations
import json
import logging
import os
from typing import AsyncIterator
from uuid import UUID
import httpx
from fastapi import HTTPException
from fastapi.responses import StreamingResponse
from legal_mcp.services import db
from web import chat_system_prompt
logger = logging.getLogger(__name__)
# legal-chat-service lives on the host. In the container we reach it via
# host.docker.internal — which requires ``extra_hosts: host.docker.internal:host-gateway``
# in the Coolify service definition. Set ``CHAT_SERVICE_URL`` to override
# (handy for local dev outside Docker).
CHAT_SERVICE_URL = os.environ.get(
"CHAT_SERVICE_URL",
"http://host.docker.internal:8770",
)
CHAT_SERVICE_TIMEOUT_S = float(os.environ.get("CHAT_SERVICE_TIMEOUT_S", "3600"))
_SSE_HEADERS = {
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
}
async def stream_chat_message(
conversation_id: UUID,
user_message: str,
) -> StreamingResponse:
"""Open SSE stream, forward events, persist when done.
Returns a FastAPI StreamingResponse the route can return directly.
"""
conv = await db.get_chat_conversation(conversation_id)
if not conv:
raise HTTPException(404, "conversation not found")
# Persist the user turn immediately so a network drop doesn't lose it.
await db.add_chat_message(
conversation_id, role="user", content=user_message,
)
is_first_turn = not conv.get("claude_session_id")
system_block: str | None = None
if is_first_turn:
try:
system_block = await chat_system_prompt.build_system_prompt(
corpus_id=conv.get("style_corpus_id"),
)
except Exception as e:
logger.exception("system prompt build failed")
raise HTTPException(500, f"system prompt failed: {e}")
payload = {
"prompt": user_message,
"system": system_block,
"resume_session_id": conv.get("claude_session_id"),
}
async def proxy_stream() -> AsyncIterator[bytes]:
accumulated_text: list[str] = []
events_log: list[dict] = []
new_session_id: str | None = None
try:
timeout_cfg = httpx.Timeout(
CHAT_SERVICE_TIMEOUT_S,
connect=10.0,
read=CHAT_SERVICE_TIMEOUT_S,
)
async with httpx.AsyncClient(timeout=timeout_cfg) as client:
async with client.stream(
"POST",
f"{CHAT_SERVICE_URL}/chat/start",
json=payload,
) as upstream:
if upstream.status_code != 200:
body = await upstream.aread()
msg = body.decode("utf-8", errors="replace")[:300]
err = {"type": "error",
"message": f"chat-service {upstream.status_code}: {msg}"}
yield f"data: {json.dumps(err, ensure_ascii=False)}\n\n".encode("utf-8")
return
async for line in upstream.aiter_lines():
if not line:
yield b"\n"
continue
# Forward verbatim so the browser sees the same
# SSE framing the host emits.
out = line + "\n"
yield out.encode("utf-8")
# Mirror events: capture text + session_id for
# persistence. The line starts with "data: <json>"
# so we strip the prefix before parsing.
if line.startswith("data: "):
try:
event = json.loads(line[len("data: "):])
except json.JSONDecodeError:
continue
events_log.append(event)
t = event.get("type")
if t == "session_id" and event.get("value"):
new_session_id = event["value"]
elif t == "text_delta" and event.get("text"):
accumulated_text.append(event["text"])
elif t == "done" and event.get("text"):
if not accumulated_text:
accumulated_text.append(event["text"])
except httpx.ConnectError:
err = {
"type": "error",
"message": (
f"לא ניתן להגיע ל-legal-chat-service בכתובת {CHAT_SERVICE_URL}. "
"ודא ש-pm2 מריץ אותו: `pm2 status legal-chat-service`."
),
}
yield f"data: {json.dumps(err, ensure_ascii=False)}\n\n".encode("utf-8")
return
except Exception as e:
logger.exception("chat proxy failed")
err = {"type": "error", "message": str(e)}
yield f"data: {json.dumps(err, ensure_ascii=False)}\n\n".encode("utf-8")
return
# End of stream — persist the assistant turn.
try:
full_text = "".join(accumulated_text).strip()
if full_text:
await db.add_chat_message(
conversation_id,
role="assistant",
content=full_text,
raw_events=events_log,
)
if new_session_id:
await db.update_chat_conversation_session_id(
conversation_id, new_session_id,
)
except Exception:
logger.exception("failed to persist assistant turn for conv=%s", conversation_id)
return StreamingResponse(
proxy_stream(),
media_type="text/event-stream",
headers=_SSE_HEADERS,
)

205
web/chat_system_prompt.py Normal file
View File

@@ -0,0 +1,205 @@
"""Compose the system prompt the style-chat agent receives.
The chat runs against the local ``claude`` CLI on the host (via
legal-chat-service). We assemble a once-per-conversation system block
that gives the agent everything it needs to discuss decisions in
Daphna's voice:
- The style guide (``skills/decision/SKILL.md``) — how she writes
- The lessons file (``docs/legal-decision-lessons.md``) — what we've
learned across the corpus
- The corpus-analysis report (``docs/corpus-analysis.md``) — the
structural map of 24+ decisions
- A summary of every style_corpus row (number, date, subjects,
chars + summary if extracted) so the agent can reason about the
whole corpus without us shipping all of it inline
- Optional: when the conversation is scoped to a specific decision
(``style_corpus_id``), append its full_text so the chat can dive
into the text directly
Sent **once**, when the conversation is first created. On subsequent
messages the legal-chat-service uses ``claude --resume <session_id>``
and the on-disk CLI session keeps the system context intact — no need
to re-ship the 100K+ chars of skills + lessons every turn.
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from uuid import UUID
from legal_mcp.services import db
logger = logging.getLogger(__name__)
# The reference files live in the repo at known paths. In the
# container they're mounted alongside the code, so resolve relative
# to web/app.py's parent.
_REPO_ROOT = Path(os.environ.get(
"LEGAL_AI_REPO_ROOT",
str(Path(__file__).resolve().parent.parent),
))
_SKILLS_PATH = _REPO_ROOT / "skills" / "decision" / "SKILL.md"
_LESSONS_PATH = _REPO_ROOT / "docs" / "legal-decision-lessons.md"
_CORPUS_ANALYSIS_PATH = _REPO_ROOT / "docs" / "corpus-analysis.md"
def _safe_read(path: Path, cap_chars: int = 50_000) -> str:
"""Read a file (UTF-8) or return a marker that it's missing.
The cap protects against accidentally injecting an enormous file —
even at 50K, a single source file is the lion's share of the
system prompt budget.
"""
try:
text = path.read_text(encoding="utf-8")
except FileNotFoundError:
return f"(קובץ {path.name} לא נמצא בנתיב {path})"
except OSError as e:
logger.warning("could not read %s: %s", path, e)
return f"(שגיאה בקריאת {path.name}: {e})"
if len(text) > cap_chars:
return text[:cap_chars] + f"\n\n[... חתך ב-{cap_chars:,} תווים מתוך {len(text):,}]"
return text
async def _corpus_summary_block() -> str:
"""Compact one-row-per-decision summary the agent can scan."""
rows = await db.get_pool()
async with rows.acquire() as conn:
records = await conn.fetch(
"""
SELECT decision_number, decision_date, appeal_subtype,
subject_categories, length(full_text) AS chars,
coalesce(summary, '') AS summary,
coalesce(outcome, '') AS outcome
FROM style_corpus
ORDER BY decision_date NULLS LAST
"""
)
if not records:
return "(הקורפוס ריק)"
lines = []
for r in records:
cats = r["subject_categories"]
if isinstance(cats, str):
import json as _json
try:
cats = _json.loads(cats)
except _json.JSONDecodeError:
cats = []
cats_str = ", ".join(cats or []) if cats else ""
date_str = str(r["decision_date"]) if r["decision_date"] else ""
summary = (r["summary"] or "").strip()
outcome = (r["outcome"] or "").strip()
head = f"- **{r['decision_number'] or ''}** ({date_str}) [{r['appeal_subtype'] or ''}] · {r['chars']:,} תווים"
meta = f" נושאים: {cats_str}"
body = ""
if summary:
body = f"\n תקציר: {summary}"
if outcome:
body += f" — תוצאה: {outcome}"
elif outcome:
body = f"\n תוצאה: {outcome}"
lines.append(head + "\n" + meta + body)
return "\n".join(lines)
async def _decision_full_text(corpus_id: UUID) -> str:
pool = await db.get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT decision_number, decision_date, full_text "
"FROM style_corpus WHERE id = $1",
corpus_id,
)
if not row:
return ""
header = f"# החלטה {row['decision_number']} ({row['decision_date']})\n\n"
return header + (row["full_text"] or "")
SYSTEM_PROMPT_HEADER = """\
אתה סוכן הסגנון של עו"ד דפנה תמיר, יו"ר ועדת הערר לתכנון ובניה — מחוז ירושלים.
תפקידך: לעזור לחיים (העוזר המקצועי של דפנה) להבין, לנתח ולחדד את הסגנון
של דפנה. אתה לא כותב החלטות חדשות; אתה דן בסגנון של החלטות קיימות,
מזהה דפוסים, מקפיד שהכותבים העתידיים (ה-writer agent) יישארו נאמנים
לקולה.
יש לך גישה ל:
1. **מדריך הסגנון** של דפנה (skills/decision/SKILL.md) — איך היא כותבת.
2. **הלקחים הגנריים** מהקורפוס (docs/legal-decision-lessons.md) — מה
למדנו לאורך 24+ החלטות. **חובה** להישען על הקבצים האלה כשאתה דן
בסגנון, ולא להמציא תובנות חדשות מהאוויר.
3. **ניתוח הקורפוס** המבני (docs/corpus-analysis.md) — מפת תוכן ופערים.
4. **רשימת ההחלטות בקורפוס** (למטה) — סקירה תמציתית של כל החלטה
שעלתה ל-style_corpus.
5. **טקסט מלא של החלטה ספציפית** (אם השיחה הוצמדה ל-style_corpus_id).
כללי תקשורת:
- כל התשובות בעברית.
- חיים יושב מולך, לא דפנה — אבל המטרה היא לחדד את הסגנון *של דפנה*.
- אם חיים שואל "האם פסקה X מתאימה לסגנון של דפנה?" — תן ניתוח מנומק
שמסתמך על SKILL.md ועל החלטות הקורפוס. אל תמציא ראיות.
- אם אתה צריך החלטה ספציפית שאין בקורפוס — הודע לחיים שיצרף אותה.
- אם חיים אומר לך משהו חדש על דפנה ("דפנה אומרת לעולם אל תפתח החלטה
במילה X") — שמור את זה בזיכרון השיחה; אם זה מצדיק תיעוד קבוע, הצע
לחיים להוסיף את זה כ-decision_lesson (POST /api/training/lessons)
או כתוספת ל-SKILL.md.
- אל תיתן לעצמך אישיות מומצאת — אתה כלי-עזר מקצועי, לא חבר.
"""
async def build_system_prompt(
*,
corpus_id: UUID | None = None,
include_corpus_summary: bool = True,
) -> str:
"""Assemble the full system prompt for a new chat conversation.
Args:
corpus_id: When set, the full_text of that decision is appended
so the chat can dive into the text.
include_corpus_summary: Set False for low-context chats (e.g.
quick "what does Daphna do at the end of a betterment-levy
decision?" — no need to ship 24 summaries).
"""
parts: list[str] = [SYSTEM_PROMPT_HEADER]
parts.append("\n## מדריך הסגנון (skills/decision/SKILL.md)\n")
parts.append(_safe_read(_SKILLS_PATH, cap_chars=40_000))
parts.append("\n\n## לקחים מהקורפוס (docs/legal-decision-lessons.md)\n")
parts.append(_safe_read(_LESSONS_PATH, cap_chars=30_000))
parts.append("\n\n## ניתוח קורפוס מבני (docs/corpus-analysis.md)\n")
parts.append(_safe_read(_CORPUS_ANALYSIS_PATH, cap_chars=15_000))
if include_corpus_summary:
parts.append("\n\n## רשימת ההחלטות בקורפוס הסגנון\n")
try:
parts.append(await _corpus_summary_block())
except Exception as e:
logger.warning("corpus summary failed: %s", e)
parts.append("(שגיאה בטעינת רשימת הקורפוס)")
if corpus_id is not None:
parts.append("\n\n## ההחלטה הספציפית בדיון (full_text)\n")
try:
txt = await _decision_full_text(corpus_id)
if txt:
parts.append(txt[:200_000]) # hard cap
else:
parts.append("(לא נמצאה החלטה — בדוק את ה-corpus_id)")
except Exception as e:
logger.warning("decision full_text failed: %s", e)
parts.append("(שגיאה בטעינת ההחלטה)")
return "\n".join(parts)