Files
legal-ai/web/paperclip_client.py
Chaim 799b950961
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 8s
feat(curator): trigger Knowledge Curator from api_mark_final, drop CEO F2
The previous F2 stage in legal-ceo.md fired after the first DOCX export
— too early, since the user often iterates with עריכה-* uploads after
the first export. The true "this is dafna's chosen final" signal is the
"סמן כסופי" button in the UI, which calls api_mark_final.

This commit moves the curator wakeup from CEO's instructions to a
direct hook in api_mark_final:

- web/paperclip_client.py: add CURATOR_AGENTS dict (CMP + CMPA UUIDs)
  and wake_curator_for_final() helper. Looks up main case issue,
  creates a child issue assigned to the curator, tags plugin_state for
  case visibility, and triggers wakeup via Paperclip API.
- web/app.py: api_mark_final now calls workflow_tools.ingest_final_version
  (so case_law table finally gets populated for search_decisions) and
  pc_wake_curator_for_final. Both are best-effort — failure does not
  block marking final.
- legal-ceo.md: remove F2 stage, leave only the agents-table reference
  noting the curator runs from api_mark_final.
- hermes-curator.md: update activation description to reflect the new
  flow.

Result: curator runs only when chaim deliberately clicks "סמן כסופי",
on the actual final file, with no risk of analyzing a draft that will
later change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 14:47:03 +00:00

998 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Paperclip integration via direct DB access (embedded PostgreSQL).
Creates projects, issues, and plugin state entries to fully link
a legal-ai case into Paperclip's workflow.
"""
from __future__ import annotations
import json
import logging
import os
import uuid
import asyncpg
from web.paperclip_api import pc_request
logger = logging.getLogger(__name__)
PAPERCLIP_DB_URL = os.environ.get(
"PAPERCLIP_DB_URL", "postgresql://paperclip:paperclip@127.0.0.1:54329/paperclip"
)
PLUGIN_ID = "53461b5a-7f58-411a-9952-72f9c8d4a328" # marcusgroup.legal-ai
# PAPERCLIP_API_URL — moved to web.paperclip_api (used only by pc_request now).
# Direct DB calls below use PAPERCLIP_DB_URL instead.
PAPERCLIP_BOARD_API_KEY = os.environ.get("PAPERCLIP_BOARD_API_KEY", "")
# Default workspace attached to every new Paperclip project — points agents at
# the legal-ai source tree on the host. Override via env if the path differs.
LEGAL_AI_WORKSPACE_CWD = os.environ.get("LEGAL_AI_WORKSPACE_CWD", "/home/chaim/legal-ai")
LEGAL_AI_WORKSPACE_NAME = os.environ.get("LEGAL_AI_WORKSPACE_NAME", "legal-ai")
# Company IDs from Paperclip DB
COMPANIES = {
"licensing": "42a7acd0-30c5-4cbd-ac97-7424f65df294", # CMP — רישוי ובניה
"betterment": "8639e837-4c9d-47fa-a76b-95788d651896", # CMPA — היטלי השבחה
}
# CEO agent per company — used for wakeup routing
CEO_AGENTS = {
COMPANIES["licensing"]: "752cebdd-6748-4a04-aacd-c7ab0294ef33", # CMP CEO
COMPANIES["betterment"]: "cdbfa8bc-3d61-41a4-a2e7-677ec7d34562", # CMPA CEO
}
# Default for backwards compat
CEO_AGENT_ID = CEO_AGENTS[COMPANIES["licensing"]]
# Knowledge Curator (Hermes) agent per company — woken after a case is
# marked final, to analyze the signed decision and propose updates to
# the style guide / lessons. POC stage 1.
CURATOR_AGENTS = {
COMPANIES["licensing"]: "60dce831-5c5b-4bae-bda9-5282d506f0dc", # CMP curator
COMPANIES["betterment"]: "d6f7c55d-570a-46b8-8d72-1286d07da0d8", # CMPA curator
}
# Fallback mapping — used only when DB lookup returns no results
_FALLBACK_APPEAL_TYPE_TO_COMPANY = {
"רישוי": COMPANIES["licensing"],
"היטל השבחה": COMPANIES["betterment"],
"פיצויים": COMPANIES["betterment"],
"building_permit": COMPANIES["licensing"],
"betterment_levy": COMPANIES["betterment"],
"compensation_197": COMPANIES["betterment"],
"compensation": COMPANIES["betterment"],
"licensing": COMPANIES["licensing"],
}
# Legal-AI DB URL for reading tag_company_mappings
_LEGAL_DB_URL = os.environ.get("POSTGRES_URL") or os.environ.get(
"DATABASE_URL", "postgresql://legal:legal@127.0.0.1:5432/legal_ai"
)
async def _get_company_id(appeal_type: str) -> str:
"""Resolve appeal_type tag to a Paperclip company ID via DB mappings, with fallback."""
try:
conn = await asyncpg.connect(_LEGAL_DB_URL)
try:
row = await conn.fetchrow(
"SELECT company_id FROM tag_company_mappings WHERE tag = $1 LIMIT 1",
appeal_type,
)
if row:
return row["company_id"]
finally:
await conn.close()
except Exception:
logger.debug("DB lookup for tag mapping failed, using fallback for '%s'", appeal_type)
return _FALLBACK_APPEAL_TYPE_TO_COMPANY.get(appeal_type, COMPANIES["licensing"])
async def create_project(
case_number: str,
title: str,
description: str = "",
appeal_type: str = "רישוי",
color: str = "#6366f1",
) -> dict:
"""Create a project in the Paperclip embedded DB, or return existing one."""
company_id = await _get_company_id(appeal_type)
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
# Resolve prefix from company issue_prefix in Paperclip DB
comp_row = await conn.fetchrow(
"SELECT issue_prefix FROM companies WHERE id = $1::uuid", company_id,
)
prefix = comp_row["issue_prefix"] if comp_row and comp_row["issue_prefix"] else "CMP"
# Check for existing project with this case number
existing = await conn.fetchrow(
"SELECT id, name FROM projects WHERE name LIKE $1 AND company_id = $2::uuid",
f"%{case_number}%", company_id,
)
if existing:
# Backfill: ensure legacy projects also have a default workspace.
await _ensure_default_workspace(conn, str(existing["id"]), company_id)
return {
"id": str(existing["id"]),
"company_id": company_id,
"name": existing["name"],
"url": f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{existing['id']}/issues",
"existing": True,
}
project_id = str(uuid.uuid4())
project_name = f"ערר {case_number}{title}"[:200]
await conn.execute(
"""INSERT INTO projects (id, company_id, name, description, status, color)
VALUES ($1, $2::uuid, $3, $4, 'backlog', $5)""",
project_id, company_id, project_name, description[:500] if description else "", color,
)
# Default primary workspace — points agents at the legal-ai source tree.
await _ensure_default_workspace(conn, project_id, company_id)
# Create initial issue linked to the project
issue_id, identifier = await _create_issue(
conn, company_id, project_id, case_number, title, prefix,
)
# Link issue to legal-ai case via plugin state
await _link_case_to_issue(conn, issue_id, case_number)
# Verify project creation and close the setup issue
await _verify_and_close_setup_issue(conn, project_id, issue_id, identifier, case_number)
return {
"id": project_id,
"company_id": company_id,
"name": project_name,
"issue_id": issue_id,
"issue_identifier": identifier,
"url": f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{project_id}/issues",
"existing": False,
}
finally:
await conn.close()
async def archive_project(case_number: str) -> dict:
"""Set archived_at on the Paperclip project matching this case number,
and cancel any open issues so the legal-ai UI's agent widget stops
reporting "agents are working" on a closed case.
The project is identified by `name LIKE '%{case_number}%'` (consistent with
`create_project`'s lookup). Idempotent — re-archiving a project that's
already archived returns the existing timestamp without re-cancelling
issues that have already been completed.
"""
# Issue statuses considered "open" — anything not done/cancelled.
OPEN_STATUSES = ("backlog", "todo", "in_progress", "blocked", "in_review")
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
row = await conn.fetchrow(
"""UPDATE projects
SET archived_at = COALESCE(archived_at, now()),
updated_at = now()
WHERE name LIKE $1 RETURNING id, name, archived_at""",
f"%{case_number}%",
)
if not row:
return {"status": "not_found", "case_number": case_number}
cancelled = await conn.fetch(
"""UPDATE issues
SET status = 'cancelled',
cancelled_at = now(),
updated_at = now()
WHERE project_id = $1 AND status = ANY($2::text[])
RETURNING identifier, title""",
row["id"], list(OPEN_STATUSES),
)
return {
"status": "archived",
"project_id": str(row["id"]),
"name": row["name"],
"archived_at": row["archived_at"].isoformat() if row["archived_at"] else None,
"issues_cancelled": [
{"identifier": r["identifier"], "title": r["title"]}
for r in cancelled
],
}
finally:
await conn.close()
async def restore_project(case_number: str) -> dict:
"""Clear archived_at on the Paperclip project matching this case number.
Idempotent — if already active, returns success without changes.
"""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
row = await conn.fetchrow(
"""UPDATE projects
SET archived_at = NULL,
updated_at = now()
WHERE name LIKE $1 RETURNING id, name""",
f"%{case_number}%",
)
if not row:
return {"status": "not_found", "case_number": case_number}
return {
"status": "restored",
"project_id": str(row["id"]),
"name": row["name"],
}
finally:
await conn.close()
async def _ensure_default_workspace(
conn: asyncpg.Connection,
project_id: str,
company_id: str,
) -> None:
"""Idempotently attach a primary workspace to the project so the
"סביבות עבודה" tab appears in the Paperclip UI and agents wake up with
cwd=`/home/chaim/legal-ai`. No-op if any workspace already exists.
"""
existing = await conn.fetchval(
"SELECT id FROM project_workspaces WHERE project_id = $1::uuid LIMIT 1",
project_id,
)
if existing:
return
await conn.execute(
"""INSERT INTO project_workspaces
(company_id, project_id, name, cwd, is_primary, source_type, visibility)
VALUES ($1::uuid, $2::uuid, $3, $4, TRUE, 'local_path', 'default')""",
company_id, project_id, LEGAL_AI_WORKSPACE_NAME, LEGAL_AI_WORKSPACE_CWD,
)
logger.info(
"Attached default workspace (cwd=%s) to project %s",
LEGAL_AI_WORKSPACE_CWD, project_id,
)
async def _create_issue(
conn: asyncpg.Connection,
company_id: str,
project_id: str,
case_number: str,
title: str,
prefix: str,
) -> tuple[str, str]:
"""Create an issue in the project and return (issue_id, identifier)."""
issue_id = str(uuid.uuid4())
# Get next issue number for this company
row = await conn.fetchrow(
"UPDATE companies SET issue_counter = issue_counter + 1 WHERE id = $1::uuid RETURNING issue_counter",
company_id,
)
issue_number = row["issue_counter"]
identifier = f"{prefix}-{issue_number}"
# Assign to the company's CEO so Paperclip's wakeup gate
# (heartbeat staleness check) accepts the wakeup. Without this,
# the run is cancelled with "issue assignee changed before the
# queued run could start" and the agent never starts.
ceo_agent_id = CEO_AGENTS.get(company_id, CEO_AGENT_ID)
await conn.execute(
"""INSERT INTO issues (id, company_id, project_id, title, description, status, priority, issue_number, identifier, assignee_agent_id)
VALUES ($1, $2::uuid, $3::uuid, $4, $5, 'todo', 'medium', $6, $7, $8::uuid)""",
issue_id, company_id, project_id,
f"[ערר {case_number}] {title}"[:200],
f"תיק ערר {case_number}\nנוצר אוטומטית מממשק העלאת מסמכים",
issue_number, identifier, ceo_agent_id,
)
logger.info("Created Paperclip issue %s: [ערר %s] %s", identifier, case_number, title)
return issue_id, identifier
async def _link_case_to_issue(conn: asyncpg.Connection, issue_id: str, case_number: str) -> None:
"""Store the legal-ai case number in plugin state, linked to the issue."""
await conn.execute(
"""INSERT INTO plugin_state (plugin_id, scope_kind, scope_id, namespace, state_key, value_json)
VALUES ($1::uuid, 'issue', $2, 'default', 'legal-case-number', $3::jsonb)
ON CONFLICT (plugin_id, scope_kind, scope_id, namespace, state_key) DO UPDATE SET value_json = $3::jsonb""",
PLUGIN_ID, issue_id, json.dumps(case_number),
)
logger.info("Linked issue %s to case %s via plugin state", issue_id, case_number)
async def _verify_and_close_setup_issue(
conn: asyncpg.Connection,
project_id: str,
issue_id: str,
identifier: str,
case_number: str,
) -> None:
"""Verify the project was created correctly, then transition the setup issue to done."""
# Move to in_progress while verifying
await conn.execute(
"UPDATE issues SET status = 'in_progress', started_at = now() WHERE id = $1",
issue_id,
)
logger.info("%s: בביצוע — מאמת יצירת פרויקט", identifier)
# Verify: project exists, issue is linked, plugin state exists
checks = []
project = await conn.fetchrow("SELECT id, name FROM projects WHERE id = $1::uuid", project_id)
checks.append(("פרויקט נוצר", project is not None))
issue = await conn.fetchrow(
"SELECT id, project_id FROM issues WHERE id = $1 AND project_id = $2::uuid",
issue_id, project_id,
)
checks.append(("משימה משויכת לפרויקט", issue is not None))
plugin_link = await conn.fetchrow(
"SELECT value_json FROM plugin_state WHERE scope_id = $1 AND state_key = 'legal-case-number'",
issue_id,
)
checks.append(("קישור למערכת המשפטית", plugin_link is not None))
all_ok = all(ok for _, ok in checks)
report_lines = [f"{'' if ok else ''} {name}" for name, ok in checks]
report = "\n".join(report_lines)
if all_ok:
await conn.execute(
"UPDATE issues SET status = 'done', completed_at = now() WHERE id = $1",
issue_id,
)
# Document the verification in a comment
await conn.execute(
"""INSERT INTO issue_comments (id, company_id, issue_id, body)
VALUES ($1, (SELECT company_id FROM issues WHERE id = $2), $2,
$3)""",
str(uuid.uuid4()), issue_id,
f"## אימות יצירת פרויקט — ערר {case_number}\n\n{report}\n\nהפרויקט נוצר בהצלחה. משימה נסגרה אוטומטית.",
)
logger.info("%s: הושלם — פרויקט אומת ונסגר", identifier)
else:
# Leave in_progress with a warning comment
failed = [name for name, ok in checks if not ok]
await conn.execute(
"""INSERT INTO issue_comments (id, company_id, issue_id, body)
VALUES ($1, (SELECT company_id FROM issues WHERE id = $2), $2,
$3)""",
str(uuid.uuid4()), issue_id,
f"## אימות יצירת פרויקט — ערר {case_number}\n\n{report}\n\n⚠️ בדיקות שנכשלו: {', '.join(failed)}",
)
logger.warning("%s: אימות נכשל — %s", identifier, ", ".join(failed))
async def get_project_url(case_number: str) -> str | None:
"""Find existing Paperclip project for a case number."""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
row = await conn.fetchrow(
"SELECT id, company_id FROM projects WHERE name LIKE $1",
f"%{case_number}%",
)
if row:
comp_row = await conn.fetchrow(
"SELECT issue_prefix FROM companies WHERE id = $1::uuid", str(row["company_id"]),
)
prefix = comp_row["issue_prefix"] if comp_row and comp_row["issue_prefix"] else "CMP"
return f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{row['id']}/issues"
return None
finally:
await conn.close()
async def create_workflow_issue(case_number: str, title: str) -> dict:
"""Create a workflow issue in the existing Paperclip project for a case.
Returns dict with issue_id, identifier, project_url.
Raises ValueError if no project found.
"""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
# Find existing project
row = await conn.fetchrow(
"SELECT id, company_id FROM projects WHERE name LIKE $1",
f"%{case_number}%",
)
if not row:
raise ValueError(f"No Paperclip project found for case {case_number}")
project_id = str(row["id"])
company_id = str(row["company_id"])
# Get company prefix
comp_row = await conn.fetchrow(
"SELECT issue_prefix FROM companies WHERE id = $1::uuid", company_id,
)
prefix = comp_row["issue_prefix"] if comp_row and comp_row["issue_prefix"] else "CMP"
# Create the workflow issue
issue_id, identifier = await _create_issue(
conn, company_id, project_id, case_number,
f"התחל תהליך ניסוח — {title}"[:200], prefix,
)
# Link to legal-ai case via plugin state
await _link_case_to_issue(conn, issue_id, case_number)
project_url = f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{project_id}/issues"
logger.info("Created workflow issue %s for case %s", identifier, case_number)
return {
"issue_id": issue_id,
"identifier": identifier,
"company_id": company_id,
"project_url": project_url,
}
finally:
await conn.close()
async def get_case_issues(case_number: str) -> list[dict]:
"""Get all Paperclip issues linked to a legal-ai case number.
Matches via two paths to avoid missing historical issues:
(a) the original setup linkage in plugin_state (state_key = legal-case-number)
(b) issues whose title contains "[ערר {case_number}]" or "ערר {case_number}"
— that's how sub-agents conventionally tag follow-up issues.
Returns the union of both, deduplicated by issue id, ordered by creation time.
"""
title_patterns = [f"%[ערר {case_number}]%", f"%ערר {case_number}%"]
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT DISTINCT ON (i.id)
i.id, i.title, i.status, i.identifier, i.priority,
i.assignee_agent_id, a.name AS assignee_name,
i.started_at, i.completed_at, i.created_at, i.company_id
FROM issues i
LEFT JOIN agents a ON i.assignee_agent_id = a.id
LEFT JOIN plugin_state ps ON ps.scope_id = i.id::text
AND ps.plugin_id = $1::uuid
AND ps.state_key = 'legal-case-number'
AND ps.value_json = $2::jsonb
WHERE ps.scope_id IS NOT NULL
OR i.title LIKE ANY($3::text[])
ORDER BY i.id, i.created_at""",
PLUGIN_ID, json.dumps(case_number), title_patterns,
)
# Sort by created_at after dedup
sorted_rows = sorted(rows, key=lambda r: r["created_at"])
return [
{
"id": str(r["id"]),
"title": r["title"],
"status": r["status"],
"identifier": r["identifier"],
"priority": r["priority"],
"assignee_name": r["assignee_name"],
"started_at": r["started_at"].isoformat() if r["started_at"] else None,
"completed_at": r["completed_at"].isoformat() if r["completed_at"] else None,
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
"company_id": str(r["company_id"]),
}
for r in sorted_rows
]
finally:
await conn.close()
async def get_issue_comments(issue_ids: list[str]) -> list[dict]:
"""Get all comments on a list of Paperclip issues, with agent metadata."""
if not issue_ids:
return []
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT ic.id, ic.issue_id, ic.body, ic.created_at,
ic.author_agent_id, ic.author_user_id,
a.name AS agent_name, a.role AS agent_role, a.icon AS agent_icon
FROM issue_comments ic
LEFT JOIN agents a ON ic.author_agent_id = a.id
WHERE ic.issue_id = ANY($1::uuid[])
ORDER BY ic.created_at""",
issue_ids,
)
return [
{
"id": str(r["id"]),
"issue_id": str(r["issue_id"]),
"body": r["body"],
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
"author_agent_id": str(r["author_agent_id"]) if r["author_agent_id"] else None,
"author_user_id": r["author_user_id"],
"agent_name": r["agent_name"],
"agent_role": r["agent_role"],
"agent_icon": r["agent_icon"],
}
for r in rows
]
finally:
await conn.close()
async def get_agents_for_company(company_id: str) -> list[dict]:
"""Get all agents belonging to a Paperclip company."""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT id, name, role, title, status, icon, last_heartbeat_at
FROM agents
WHERE company_id = $1::uuid
ORDER BY role, name""",
company_id,
)
return [
{
"id": str(r["id"]),
"name": r["name"],
"role": r["role"],
"title": r["title"],
"status": r["status"],
"icon": r["icon"],
"last_heartbeat_at": r["last_heartbeat_at"].isoformat() if r["last_heartbeat_at"] else None,
}
for r in rows
]
finally:
await conn.close()
async def get_agents_for_case(company_id: str, issue_ids: list[str]) -> list[dict]:
"""Get agents with per-case status (running on *this* case vs globally)."""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT a.id, a.name, a.role, a.title, a.icon,
a.status AS global_status, a.last_heartbeat_at,
EXISTS(
SELECT 1 FROM heartbeat_runs hr
JOIN agent_wakeup_requests wr ON hr.wakeup_request_id = wr.id
WHERE hr.agent_id = a.id
AND hr.status = 'running'
AND wr.payload->>'issueId' = ANY($2::text[])
) AS active_on_case
FROM agents a
WHERE a.company_id = $1::uuid
ORDER BY a.role, a.name""",
company_id, issue_ids,
)
return [
{
"id": str(r["id"]),
"name": r["name"],
"role": r["role"],
"title": r["title"],
"status": "running" if r["active_on_case"] else (
"idle" if r["global_status"] == "running" else r["global_status"]
),
"icon": r["icon"],
"last_heartbeat_at": r["last_heartbeat_at"].isoformat() if r["last_heartbeat_at"] else None,
}
for r in rows
]
finally:
await conn.close()
async def post_comment(issue_id: str, company_id: str, body: str) -> dict:
"""Post a comment on a Paperclip issue.
Tries the Board API first (triggers plugin events for CEO routing).
Falls back to direct DB insert + CEO wakeup if API fails.
"""
# Try Board API first — this triggers the event bus
if PAPERCLIP_BOARD_API_KEY:
try:
resp = await pc_request(
"POST",
f"/api/board/issues/{issue_id}/comments",
json={"body": body},
)
if resp.status_code < 400:
result = resp.json()
logger.info("Posted comment via Board API on issue %s", issue_id)
return {"comment_id": result.get("id", ""), "issue_id": issue_id, "method": "api"}
except Exception:
logger.debug("Board API comment failed for issue %s, falling back to DB", issue_id)
# Fallback: direct DB insert + explicit CEO wakeup
comment_id = str(uuid.uuid4())
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
await conn.execute(
"""INSERT INTO issue_comments (id, company_id, issue_id, author_user_id, body)
VALUES ($1, $2::uuid, $3::uuid, 'chaim', $4)""",
comment_id, company_id, issue_id, body,
)
logger.info("Posted comment via DB fallback on issue %s", issue_id)
finally:
await conn.close()
# Wake the correct CEO for this company
ceo_id = CEO_AGENTS.get(company_id, CEO_AGENT_ID)
try:
await pc_request(
"POST",
f"/api/agents/{ceo_id}/wakeup",
json={
"source": "on_demand",
"triggerDetail": "manual",
"reason": f"user_comment_{issue_id}",
"payload": {"issueId": issue_id, "mutation": "comment"},
},
raise_on_error=True,
)
except Exception:
logger.warning("Failed to wake CEO after DB comment on issue %s", issue_id)
return {"comment_id": comment_id, "issue_id": issue_id, "method": "db_fallback"}
async def get_issue_interactions(issue_ids: list[str]) -> list[dict]:
"""Fetch issue-thread interactions (agent → user button prompts).
Returns all `pending` interactions plus any resolved within the last 24h
so the user sees a brief tail of recent answers without flooding the feed.
Ordered by ``created_at`` so callers can interleave with comments.
"""
if not issue_ids:
return []
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT id, issue_id, kind, status, title, summary,
payload, result, created_at, resolved_at
FROM issue_thread_interactions
WHERE issue_id = ANY($1::uuid[])
AND (status = 'pending'
OR resolved_at > now() - interval '24 hours')
ORDER BY created_at""",
issue_ids,
)
out: list[dict] = []
for r in rows:
payload = r["payload"]
result = r["result"]
if isinstance(payload, str):
try:
payload = json.loads(payload)
except Exception:
payload = {}
if isinstance(result, str):
try:
result = json.loads(result)
except Exception:
result = None
out.append({
"id": str(r["id"]),
"issue_id": str(r["issue_id"]),
"kind": r["kind"],
"status": r["status"],
"title": r["title"],
"summary": r["summary"],
"payload": payload or {},
"result": result,
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
"resolved_at": r["resolved_at"].isoformat() if r["resolved_at"] else None,
})
return out
finally:
await conn.close()
async def respond_to_interaction(
issue_id: str, interaction_id: str, payload: dict,
) -> dict:
"""Submit a user response to an `ask_user_questions` interaction.
Paperclip auto-wakes the issue assignee on success
(`queueResolvedInteractionContinuationWakeup`).
"""
resp = await pc_request(
"POST",
f"/api/issues/{issue_id}/interactions/{interaction_id}/respond",
json=payload,
raise_on_error=True,
)
return resp.json()
async def accept_interaction(
issue_id: str, interaction_id: str, payload: dict,
) -> dict:
"""Accept a `request_confirmation` or `suggest_tasks` interaction."""
resp = await pc_request(
"POST",
f"/api/issues/{issue_id}/interactions/{interaction_id}/accept",
json=payload,
raise_on_error=True,
)
return resp.json()
async def reject_interaction(
issue_id: str, interaction_id: str, payload: dict,
) -> dict:
"""Reject a `request_confirmation` or `suggest_tasks` interaction."""
resp = await pc_request(
"POST",
f"/api/issues/{issue_id}/interactions/{interaction_id}/reject",
json=payload,
raise_on_error=True,
)
return resp.json()
# Singleton project for the precedent-library extraction queue. One issue per
# uploaded precedent — assigned to the CEO who runs the local-MCP extractor.
_LIBRARY_PROJECT_NAME = "ספריית פסיקה — תור חילוץ"
async def _get_or_create_library_project(
conn: asyncpg.Connection, company_id: str,
) -> str:
"""Return the project_id for the per-company library extraction queue,
creating it (with a workspace pointing at legal-ai) if it doesn't exist."""
row = await conn.fetchrow(
"SELECT id FROM projects WHERE company_id = $1::uuid AND name = $2 LIMIT 1",
company_id, _LIBRARY_PROJECT_NAME,
)
if row:
return str(row["id"])
project_id = str(uuid.uuid4())
await conn.execute(
"""INSERT INTO projects (id, company_id, name, description, status, color)
VALUES ($1, $2::uuid, $3, $4, 'backlog', $5)""",
project_id, company_id, _LIBRARY_PROJECT_NAME,
"תור אוטומטי לחילוץ הלכות ומטא-דאטה מפסיקה שהועלתה לספריה. "
"כל issue כאן מייצג פסק דין שצריך לעבד — להריץ "
"mcp__legal-ai__precedent_process_pending.",
"#a17a3a", # gold-ish
)
await _ensure_default_workspace(conn, project_id, company_id)
logger.info("Created library extraction project %s for company %s", project_id, company_id)
return project_id
async def wake_for_precedent_extraction(
case_law_id: str,
citation: str,
practice_area: str = "",
) -> dict:
"""Trigger Claude/Paperclip to run halacha+metadata extraction for a
freshly-uploaded precedent.
Creates a Paperclip issue under the per-company "ספריית פסיקה" project,
assigns it to the company CEO, links the case_law_id via plugin_state,
and wakes the CEO via the Board API. The CEO instructions tell it to
run `mcp__legal-ai__precedent_process_pending` and close the issue.
Best-effort: any failure is logged and swallowed so a partial Paperclip
outage doesn't block the upload itself. The user can always invoke
`precedent_process_pending` manually.
"""
if not PAPERCLIP_BOARD_API_KEY:
logger.warning(
"PAPERCLIP_BOARD_API_KEY not set — skipping precedent-extraction wakeup"
)
return {"ok": False, "skipped": "no_api_key"}
company_id = await _get_company_id(practice_area)
ceo_id = CEO_AGENTS.get(company_id, CEO_AGENT_ID)
try:
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
project_id = await _get_or_create_library_project(conn, company_id)
# Bump issue counter & build identifier (matches _create_issue style).
row = await conn.fetchrow(
"UPDATE companies SET issue_counter = issue_counter + 1 "
"WHERE id = $1::uuid RETURNING issue_counter",
company_id,
)
issue_number = row["issue_counter"]
prefix = "CMP" if company_id == COMPANIES["licensing"] else "CMPA"
identifier = f"{prefix}-{issue_number}"
issue_id = str(uuid.uuid4())
short_citation = (citation[:120] + "") if len(citation) > 120 else citation
description = (
f"פסק דין חדש הועלה לספריית הפסיקה.\n\n"
f"**case_law_id:** `{case_law_id}`\n"
f"**citation:** {citation}\n\n"
f"---\n\n"
f"**משימה:** הרץ את הכלי `mcp__legal-ai__precedent_process_pending` "
f"פעמיים — פעם עם `kind='metadata'` ופעם עם `kind='halacha'`. "
f"הכלי יעבד את כל הפסיקות בתור (כולל זו), כך שגם אם הופעל מאוחר "
f"יותר עבור פסיקות אחרות — אין בעיה.\n\n"
f"לאחר ריצה: סמן את ה-issue כ-done ופתח comment קצר עם מספר ההלכות "
f"שחולצו ושדות המטא-דאטה שהושלמו."
)
await conn.execute(
"""INSERT INTO issues
(id, company_id, project_id, title, description,
status, priority, issue_number, identifier, assignee_agent_id)
VALUES ($1, $2::uuid, $3::uuid, $4, $5, 'todo', 'medium',
$6, $7, $8::uuid)""",
issue_id, company_id, project_id,
f"[ספרייה] חלץ הלכות: {short_citation}"[:200],
description,
issue_number, identifier, ceo_id,
)
# Link case_law_id via plugin_state so the agent can find it.
await conn.execute(
"""INSERT INTO plugin_state
(plugin_id, scope_kind, scope_id, namespace, state_key, value_json)
VALUES ($1::uuid, 'issue', $2, 'default', 'precedent-case-law-id', $3::jsonb)
ON CONFLICT (plugin_id, scope_kind, scope_id, namespace, state_key)
DO UPDATE SET value_json = $3::jsonb""",
PLUGIN_ID, issue_id, json.dumps(case_law_id),
)
finally:
await conn.close()
except Exception as e:
logger.exception("wake_for_precedent_extraction: DB step failed: %s", e)
return {"ok": False, "error": f"db: {e}"}
# Wake the CEO. Per Paperclip rules: must use API + carry issueId in payload.
payload = {
"source": "automation",
"triggerDetail": "precedent_library_upload",
"reason": f"precedent_extraction_{identifier}",
"payload": {
"issueId": issue_id,
"mutation": "precedent_extraction",
"caseLawId": case_law_id,
},
}
try:
resp = await pc_request(
"POST",
f"/api/agents/{ceo_id}/wakeup",
json=payload,
raise_on_error=True,
)
result = resp.json()
logger.info(
"Precedent-extraction wakeup queued: issue=%s case_law_id=%s",
identifier, case_law_id,
)
return {"ok": True, "issue_id": issue_id, "identifier": identifier, "wakeup": result}
except Exception as e:
logger.exception("wake_for_precedent_extraction: wakeup API failed: %s", e)
return {"ok": False, "error": f"wakeup: {e}", "issue_id": issue_id, "identifier": identifier}
async def wake_ceo_agent(issue_id: str, case_number: str, company_id: str = "") -> dict:
"""Wake the CEO agent via Paperclip's wakeup API.
MUST use API, never direct DB insert (agent won't wake from DB insert).
Routes to the correct CEO based on company_id.
"""
if not PAPERCLIP_BOARD_API_KEY:
raise RuntimeError("PAPERCLIP_BOARD_API_KEY not set — cannot wake CEO agent")
ceo_id = CEO_AGENTS.get(company_id, CEO_AGENT_ID)
payload = {
"source": "on_demand",
"triggerDetail": "manual",
"reason": f"start_workflow_{case_number}",
"payload": {
"issueId": issue_id,
"mutation": "workflow_start",
},
}
resp = await pc_request(
"POST",
f"/api/agents/{ceo_id}/wakeup",
json=payload,
raise_on_error=True,
)
result = resp.json()
logger.info("CEO agent wakeup for case %s: %s", case_number, result)
return result
async def wake_curator_for_final(
case_number: str,
final_filename: str,
company_id: str = "",
) -> dict:
"""Wake the Knowledge Curator (Hermes) when a case is marked final.
Creates a child issue under the main case issue, assigns it to the
curator, and triggers wakeup. Best-effort — silently skips if no
curator is configured for the company or no main issue is found.
Returns ``{"status": "ok"|"skipped", ...}``.
"""
if not PAPERCLIP_BOARD_API_KEY:
logger.warning("PAPERCLIP_BOARD_API_KEY not set — skipping curator wakeup")
return {"status": "skipped", "reason": "no_api_key"}
curator_id = CURATOR_AGENTS.get(company_id)
if not curator_id:
logger.info("No curator configured for company %s — skipping", company_id)
return {"status": "skipped", "reason": "no_curator", "company_id": company_id}
issues = await get_case_issues(case_number)
if not issues:
logger.warning("No Paperclip issues found for case %s — skipping curator", case_number)
return {"status": "skipped", "reason": "no_issue"}
main_issue = next((i for i in issues if i.get("status") == "in_progress"), None) or issues[0]
main_issue_id = main_issue["id"]
description = (
f"דפנה סימנה את ההחלטה הסופית של תיק {case_number} כסופית.\n"
f"קובץ סופי: `{final_filename}`\n\n"
f"סקור את ההחלטה מול skills/decision/SKILL.md ו-docs/legal-decision-lessons.md.\n"
f"חפש 3-5 דפוסי סגנון/דיון שלא תועדו. כתוב comment בעברית, ניטרלי, "
f"ממוספר. עדכן את MEMORY.md שלך. סגור את ה-issue (status=done)."
)
child_resp = await pc_request(
"POST",
f"/api/issues/{main_issue_id}/children",
json={
"title": f"[ערר {case_number}] סקירת ידע — Knowledge Curator",
"description": description,
"status": "in_progress",
"priority": "low",
"assigneeAgentId": curator_id,
},
raise_on_error=True,
)
sub_issue = child_resp.json()
sub_issue_id = sub_issue["id"]
# Tag plugin_state for case-number visibility on the case page
try:
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
await _link_case_to_issue(conn, sub_issue_id, case_number)
finally:
await conn.close()
except Exception as e:
logger.warning("plugin_state link failed for sub_issue=%s: %s", sub_issue_id, e)
# Trigger wakeup (use API per Paperclip rule — never DB insert)
wake_resp = await pc_request(
"POST",
f"/api/agents/{curator_id}/wakeup",
json={
"source": "on_demand",
"triggerDetail": "manual",
"reason": f"final_marked_{case_number}",
"payload": {
"issueId": sub_issue_id,
"mutation": "assignment",
},
},
raise_on_error=True,
)
logger.info(
"Curator wakeup for case %s: sub_issue=%s curator=%s wake=%s",
case_number, sub_issue_id, curator_id, wake_resp.status_code,
)
return {
"status": "ok",
"sub_issue_id": sub_issue_id,
"curator_id": curator_id,
"main_issue_id": main_issue_id,
}