Files
legal-ai/web/paperclip_client.py
Chaim 1e4c5c1518
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 3m16s
Add Paperclip agent activity mirror to case detail page
New "Agents" tab in case detail shows all Paperclip agent comments,
issue status, and agent status for each case — eliminating the need
to switch between Legal-AI and Paperclip UIs.

Backend: 4 new DB query functions in paperclip_client.py (issues,
comments, agents, post_comment) + 2 new API endpoints (GET/POST
/api/cases/{case_number}/agents). Comment posting uses Board API
with DB+wakeup fallback to ensure CEO routing.

Frontend: agents.ts hooks (10s polling), AgentActivityFeed component
(markdown timeline + comment input), AgentStatusWidget (sidebar),
4th tab in case detail page.

Also includes new-company-setup-guide.md documenting the process
for setting up the betterment levy (CMPA) company.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 10:44:42 +00:00

488 lines
18 KiB
Python

"""Paperclip integration via direct DB access (embedded PostgreSQL).
Creates projects, issues, and plugin state entries to fully link
a legal-ai case into Paperclip's workflow.
"""
from __future__ import annotations
import json
import logging
import os
import uuid
import asyncpg
import httpx
logger = logging.getLogger(__name__)
PAPERCLIP_DB_URL = os.environ.get(
"PAPERCLIP_DB_URL", "postgresql://paperclip:paperclip@127.0.0.1:54329/paperclip"
)
PLUGIN_ID = "53461b5a-7f58-411a-9952-72f9c8d4a328" # marcusgroup.legal-ai
CEO_AGENT_ID = "752cebdd-6748-4a04-aacd-c7ab0294ef33"
PAPERCLIP_API_URL = os.environ.get("PAPERCLIP_API_URL", "http://localhost:3100")
PAPERCLIP_BOARD_API_KEY = os.environ.get("PAPERCLIP_BOARD_API_KEY", "")
# Company IDs from Paperclip DB
COMPANIES = {
"licensing": "42a7acd0-30c5-4cbd-ac97-7424f65df294", # CMP — רישוי ובניה
"betterment": "8639e837-4c9d-47fa-a76b-95788d651896", # CMPA — היטלי השבחה
}
# Fallback mapping — used only when DB lookup returns no results
_FALLBACK_APPEAL_TYPE_TO_COMPANY = {
"רישוי": COMPANIES["licensing"],
"היטל השבחה": COMPANIES["betterment"],
"פיצויים": COMPANIES["betterment"],
"building_permit": COMPANIES["licensing"],
"betterment_levy": COMPANIES["betterment"],
"compensation_197": COMPANIES["betterment"],
"compensation": COMPANIES["betterment"],
"licensing": COMPANIES["licensing"],
}
# Legal-AI DB URL for reading tag_company_mappings
_LEGAL_DB_URL = os.environ.get("POSTGRES_URL") or os.environ.get(
"DATABASE_URL", "postgresql://legal:legal@127.0.0.1:5432/legal_ai"
)
async def _get_company_id(appeal_type: str) -> str:
"""Resolve appeal_type tag to a Paperclip company ID via DB mappings, with fallback."""
try:
conn = await asyncpg.connect(_LEGAL_DB_URL)
try:
row = await conn.fetchrow(
"SELECT company_id FROM tag_company_mappings WHERE tag = $1 LIMIT 1",
appeal_type,
)
if row:
return row["company_id"]
finally:
await conn.close()
except Exception:
logger.debug("DB lookup for tag mapping failed, using fallback for '%s'", appeal_type)
return _FALLBACK_APPEAL_TYPE_TO_COMPANY.get(appeal_type, COMPANIES["licensing"])
async def create_project(
case_number: str,
title: str,
description: str = "",
appeal_type: str = "רישוי",
color: str = "#6366f1",
) -> dict:
"""Create a project in the Paperclip embedded DB, or return existing one."""
company_id = await _get_company_id(appeal_type)
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
# Resolve prefix from company issue_prefix in Paperclip DB
comp_row = await conn.fetchrow(
"SELECT issue_prefix FROM companies WHERE id = $1::uuid", company_id,
)
prefix = comp_row["issue_prefix"] if comp_row and comp_row["issue_prefix"] else "CMP"
# Check for existing project with this case number
existing = await conn.fetchrow(
"SELECT id, name FROM projects WHERE name LIKE $1 AND company_id = $2::uuid",
f"%{case_number}%", company_id,
)
if existing:
return {
"id": str(existing["id"]),
"company_id": company_id,
"name": existing["name"],
"url": f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{existing['id']}/issues",
"existing": True,
}
project_id = str(uuid.uuid4())
project_name = f"ערר {case_number}{title}"[:200]
await conn.execute(
"""INSERT INTO projects (id, company_id, name, description, status, color)
VALUES ($1, $2::uuid, $3, $4, 'backlog', $5)""",
project_id, company_id, project_name, description[:500] if description else "", color,
)
# Create initial issue linked to the project
issue_id, identifier = await _create_issue(
conn, company_id, project_id, case_number, title, prefix,
)
# Link issue to legal-ai case via plugin state
await _link_case_to_issue(conn, issue_id, case_number)
# Verify project creation and close the setup issue
await _verify_and_close_setup_issue(conn, project_id, issue_id, identifier, case_number)
return {
"id": project_id,
"company_id": company_id,
"name": project_name,
"issue_id": issue_id,
"issue_identifier": identifier,
"url": f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{project_id}/issues",
"existing": False,
}
finally:
await conn.close()
async def _create_issue(
conn: asyncpg.Connection,
company_id: str,
project_id: str,
case_number: str,
title: str,
prefix: str,
) -> tuple[str, str]:
"""Create an issue in the project and return (issue_id, identifier)."""
issue_id = str(uuid.uuid4())
# Get next issue number for this company
row = await conn.fetchrow(
"UPDATE companies SET issue_counter = issue_counter + 1 WHERE id = $1::uuid RETURNING issue_counter",
company_id,
)
issue_number = row["issue_counter"]
identifier = f"{prefix}-{issue_number}"
await conn.execute(
"""INSERT INTO issues (id, company_id, project_id, title, description, status, priority, issue_number, identifier)
VALUES ($1, $2::uuid, $3::uuid, $4, $5, 'todo', 'medium', $6, $7)""",
issue_id, company_id, project_id,
f"[ערר {case_number}] {title}"[:200],
f"תיק ערר {case_number}\nנוצר אוטומטית מממשק העלאת מסמכים",
issue_number, identifier,
)
logger.info("Created Paperclip issue %s: [ערר %s] %s", identifier, case_number, title)
return issue_id, identifier
async def _link_case_to_issue(conn: asyncpg.Connection, issue_id: str, case_number: str) -> None:
"""Store the legal-ai case number in plugin state, linked to the issue."""
await conn.execute(
"""INSERT INTO plugin_state (plugin_id, scope_kind, scope_id, namespace, state_key, value_json)
VALUES ($1::uuid, 'issue', $2, 'default', 'legal-case-number', $3::jsonb)
ON CONFLICT (plugin_id, scope_kind, scope_id, namespace, state_key) DO UPDATE SET value_json = $3::jsonb""",
PLUGIN_ID, issue_id, json.dumps(case_number),
)
logger.info("Linked issue %s to case %s via plugin state", issue_id, case_number)
async def _verify_and_close_setup_issue(
conn: asyncpg.Connection,
project_id: str,
issue_id: str,
identifier: str,
case_number: str,
) -> None:
"""Verify the project was created correctly, then transition the setup issue to done."""
# Move to in_progress while verifying
await conn.execute(
"UPDATE issues SET status = 'in_progress', started_at = now() WHERE id = $1",
issue_id,
)
logger.info("%s: בביצוע — מאמת יצירת פרויקט", identifier)
# Verify: project exists, issue is linked, plugin state exists
checks = []
project = await conn.fetchrow("SELECT id, name FROM projects WHERE id = $1::uuid", project_id)
checks.append(("פרויקט נוצר", project is not None))
issue = await conn.fetchrow(
"SELECT id, project_id FROM issues WHERE id = $1 AND project_id = $2::uuid",
issue_id, project_id,
)
checks.append(("משימה משויכת לפרויקט", issue is not None))
plugin_link = await conn.fetchrow(
"SELECT value_json FROM plugin_state WHERE scope_id = $1 AND state_key = 'legal-case-number'",
issue_id,
)
checks.append(("קישור למערכת המשפטית", plugin_link is not None))
all_ok = all(ok for _, ok in checks)
report_lines = [f"{'' if ok else ''} {name}" for name, ok in checks]
report = "\n".join(report_lines)
if all_ok:
await conn.execute(
"UPDATE issues SET status = 'done', completed_at = now() WHERE id = $1",
issue_id,
)
# Document the verification in a comment
await conn.execute(
"""INSERT INTO issue_comments (id, company_id, issue_id, body)
VALUES ($1, (SELECT company_id FROM issues WHERE id = $2), $2,
$3)""",
str(uuid.uuid4()), issue_id,
f"## אימות יצירת פרויקט — ערר {case_number}\n\n{report}\n\nהפרויקט נוצר בהצלחה. משימה נסגרה אוטומטית.",
)
logger.info("%s: הושלם — פרויקט אומת ונסגר", identifier)
else:
# Leave in_progress with a warning comment
failed = [name for name, ok in checks if not ok]
await conn.execute(
"""INSERT INTO issue_comments (id, company_id, issue_id, body)
VALUES ($1, (SELECT company_id FROM issues WHERE id = $2), $2,
$3)""",
str(uuid.uuid4()), issue_id,
f"## אימות יצירת פרויקט — ערר {case_number}\n\n{report}\n\n⚠️ בדיקות שנכשלו: {', '.join(failed)}",
)
logger.warning("%s: אימות נכשל — %s", identifier, ", ".join(failed))
async def get_project_url(case_number: str) -> str | None:
"""Find existing Paperclip project for a case number."""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
row = await conn.fetchrow(
"SELECT id, company_id FROM projects WHERE name LIKE $1",
f"%{case_number}%",
)
if row:
comp_row = await conn.fetchrow(
"SELECT issue_prefix FROM companies WHERE id = $1::uuid", str(row["company_id"]),
)
prefix = comp_row["issue_prefix"] if comp_row and comp_row["issue_prefix"] else "CMP"
return f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{row['id']}/issues"
return None
finally:
await conn.close()
async def create_workflow_issue(case_number: str, title: str) -> dict:
"""Create a workflow issue in the existing Paperclip project for a case.
Returns dict with issue_id, identifier, project_url.
Raises ValueError if no project found.
"""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
# Find existing project
row = await conn.fetchrow(
"SELECT id, company_id FROM projects WHERE name LIKE $1",
f"%{case_number}%",
)
if not row:
raise ValueError(f"No Paperclip project found for case {case_number}")
project_id = str(row["id"])
company_id = str(row["company_id"])
# Get company prefix
comp_row = await conn.fetchrow(
"SELECT issue_prefix FROM companies WHERE id = $1::uuid", company_id,
)
prefix = comp_row["issue_prefix"] if comp_row and comp_row["issue_prefix"] else "CMP"
# Create the workflow issue
issue_id, identifier = await _create_issue(
conn, company_id, project_id, case_number,
f"התחל תהליך ניסוח — {title}"[:200], prefix,
)
# Link to legal-ai case via plugin state
await _link_case_to_issue(conn, issue_id, case_number)
project_url = f"https://pc.nautilus.marcusgroup.org/{prefix}/projects/{project_id}/issues"
logger.info("Created workflow issue %s for case %s", identifier, case_number)
return {
"issue_id": issue_id,
"identifier": identifier,
"project_url": project_url,
}
finally:
await conn.close()
async def get_case_issues(case_number: str) -> list[dict]:
"""Get all Paperclip issues linked to a legal-ai case number."""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT i.id, i.title, i.status, i.identifier, i.priority,
i.assignee_agent_id, a.name AS assignee_name,
i.started_at, i.completed_at, i.created_at, i.company_id
FROM issues i
JOIN plugin_state ps ON ps.scope_id = i.id::text
LEFT JOIN agents a ON i.assignee_agent_id = a.id
WHERE ps.plugin_id = $1::uuid
AND ps.state_key = 'legal-case-number'
AND ps.value_json = $2::jsonb
ORDER BY i.created_at""",
PLUGIN_ID, json.dumps(case_number),
)
return [
{
"id": str(r["id"]),
"title": r["title"],
"status": r["status"],
"identifier": r["identifier"],
"priority": r["priority"],
"assignee_name": r["assignee_name"],
"started_at": r["started_at"].isoformat() if r["started_at"] else None,
"completed_at": r["completed_at"].isoformat() if r["completed_at"] else None,
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
"company_id": str(r["company_id"]),
}
for r in rows
]
finally:
await conn.close()
async def get_issue_comments(issue_ids: list[str]) -> list[dict]:
"""Get all comments on a list of Paperclip issues, with agent metadata."""
if not issue_ids:
return []
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT ic.id, ic.issue_id, ic.body, ic.created_at,
ic.author_agent_id, ic.author_user_id,
a.name AS agent_name, a.role AS agent_role, a.icon AS agent_icon
FROM issue_comments ic
LEFT JOIN agents a ON ic.author_agent_id = a.id
WHERE ic.issue_id = ANY($1::uuid[])
ORDER BY ic.created_at""",
issue_ids,
)
return [
{
"id": str(r["id"]),
"issue_id": str(r["issue_id"]),
"body": r["body"],
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
"author_agent_id": str(r["author_agent_id"]) if r["author_agent_id"] else None,
"author_user_id": r["author_user_id"],
"agent_name": r["agent_name"],
"agent_role": r["agent_role"],
"agent_icon": r["agent_icon"],
}
for r in rows
]
finally:
await conn.close()
async def get_agents_for_company(company_id: str) -> list[dict]:
"""Get all agents belonging to a Paperclip company."""
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
rows = await conn.fetch(
"""SELECT id, name, role, title, status, icon, last_heartbeat_at
FROM agents
WHERE company_id = $1::uuid
ORDER BY role, name""",
company_id,
)
return [
{
"id": str(r["id"]),
"name": r["name"],
"role": r["role"],
"title": r["title"],
"status": r["status"],
"icon": r["icon"],
"last_heartbeat_at": r["last_heartbeat_at"].isoformat() if r["last_heartbeat_at"] else None,
}
for r in rows
]
finally:
await conn.close()
async def post_comment(issue_id: str, company_id: str, body: str) -> dict:
"""Post a comment on a Paperclip issue.
Tries the Board API first (triggers plugin events for CEO routing).
Falls back to direct DB insert + CEO wakeup if API fails.
"""
# Try Board API first — this triggers the event bus
if PAPERCLIP_BOARD_API_KEY:
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
f"{PAPERCLIP_API_URL}/api/board/issues/{issue_id}/comments",
json={"body": body},
headers={"Authorization": f"Bearer {PAPERCLIP_BOARD_API_KEY}"},
)
if resp.status_code < 400:
result = resp.json()
logger.info("Posted comment via Board API on issue %s", issue_id)
return {"comment_id": result.get("id", ""), "issue_id": issue_id, "method": "api"}
except Exception:
logger.debug("Board API comment failed for issue %s, falling back to DB", issue_id)
# Fallback: direct DB insert + explicit CEO wakeup
comment_id = str(uuid.uuid4())
conn = await asyncpg.connect(PAPERCLIP_DB_URL)
try:
await conn.execute(
"""INSERT INTO issue_comments (id, company_id, issue_id, author_user_id, body)
VALUES ($1, $2::uuid, $3::uuid, 'chaim', $4)""",
comment_id, company_id, issue_id, body,
)
logger.info("Posted comment via DB fallback on issue %s", issue_id)
finally:
await conn.close()
# Wake CEO so it processes the comment
try:
url = f"{PAPERCLIP_API_URL}/api/agents/{CEO_AGENT_ID}/wakeup"
payload = {
"source": "on_demand",
"triggerDetail": "manual",
"reason": f"user_comment_{issue_id}",
"payload": {"issueId": issue_id, "mutation": "comment"},
}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
url, json=payload,
headers={"Authorization": f"Bearer {PAPERCLIP_BOARD_API_KEY}"},
)
resp.raise_for_status()
except Exception:
logger.warning("Failed to wake CEO after DB comment on issue %s", issue_id)
return {"comment_id": comment_id, "issue_id": issue_id, "method": "db_fallback"}
async def wake_ceo_agent(issue_id: str, case_number: str) -> dict:
"""Wake the CEO agent via Paperclip's wakeup API.
MUST use API, never direct DB insert (agent won't wake from DB insert).
"""
if not PAPERCLIP_BOARD_API_KEY:
raise RuntimeError("PAPERCLIP_BOARD_API_KEY not set — cannot wake CEO agent")
url = f"{PAPERCLIP_API_URL}/api/agents/{CEO_AGENT_ID}/wakeup"
payload = {
"source": "on_demand",
"triggerDetail": "manual",
"reason": f"start_workflow_{case_number}",
"payload": {
"issueId": issue_id,
"mutation": "workflow_start",
},
}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
url,
json=payload,
headers={"Authorization": f"Bearer {PAPERCLIP_BOARD_API_KEY}"},
)
resp.raise_for_status()
result = resp.json()
logger.info("CEO agent wakeup for case %s: %s", case_number, result)
return result