Files
legal-ai/mcp-server/src/legal_mcp/services/db.py
Chaim d4496b96f1
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m26s
fix(mcp): eliminate "No such tool available" race at agent wakeup
When Paperclip wakes the CEO and the model issues an mcp__legal-ai__*
call within ~10s of session init, Claude Code sometimes returns
"No such tool available" because the legal-ai MCP server hasn't
finished bringing up its tool catalog yet. Observed twice today on
CMPA precedent-extraction wakeups (sessions 9989fbaf and a9c61801);
the agent fell back to bash + .venv/bin/python and finished the work,
but the race needed fixing on the server side.

Three changes that close the window:

1. Lazy schema init (services/db.py + server.py)
   `init_schema()` was awaited inside the FastMCP lifespan, blocking
   the `initialize`/`tools/list` handshake until ~10 CREATE TABLE IF
   NOT EXISTS statements ran. Under contention (two CEOs waking at
   once for different companies) this stretched. Now the lifespan
   returns immediately and `get_pool()` runs the schema migrations
   exactly once on first DB access, guarded by an asyncio.Lock.
   tools/list is answered in milliseconds regardless of DB state.

2. Lazy heavy imports
   - services/embeddings.py: voyageai (~450ms) loaded only inside
     _get_client()
   - services/extractor.py: google.cloud.vision (~550ms) loaded only
     inside _get_vision_client() and _ocr_with_google_vision()
   These two were being imported at module top from
   legal_mcp.tools.documents -> services.processor -> services.{
   extractor,embeddings}, so the FastMCP server couldn't even start
   responding until both finished. Cold start dropped from 2.7s to
   1.17s end-to-end (init + tools/list response).

3. Agent-side warmup + retry guidance (.claude/agents/legal-ceo.md)
   Even with a fast server, the model can still race on the very
   first call. The precedent-extraction section now tells the CEO
   to call workflow_status as a warmup probe and to retry after a
   short sleep if it sees "No such tool available", before falling
   back to the python bypass.

Also expanded the precedent-tool whitelists on the sub-agents that
delegate halacha/library work (commits 4a9a6b7 + 7ee90dc added the
tools to the MCP server but only the CEO got them in its allowed
list). Added to: legal-researcher (full extraction set), legal-analyst
(library_get/list + halacha review), legal-writer (library lookups +
halacha_review), legal-qa (library_get + halacha_review), and the two
that the CEO was already missing (halacha_review, halachot_pending).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 20:23:14 +00:00

2723 lines
105 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Database service - asyncpg connection pool and queries."""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import date
from uuid import UUID, uuid4
import asyncpg
from pgvector.asyncpg import register_vector
from legal_mcp import config
logger = logging.getLogger(__name__)
_pool: asyncpg.Pool | None = None
_schema_ready: bool = False
_init_lock: asyncio.Lock = asyncio.Lock()
async def get_pool() -> asyncpg.Pool:
"""Return the connection pool, creating it (and running schema init) lazily.
The MCP server's `lifespan` no longer blocks on schema init — it's done
here on first DB access. This keeps the `initialize`/`tools/list` MCP
handshake immediate so Claude Code never sees a stale "No such tool".
"""
global _pool, _schema_ready
if _pool is not None and _schema_ready:
return _pool
async with _init_lock:
if _pool is None:
# First, ensure pgvector extension exists (before registering type codec)
conn = await asyncpg.connect(config.POSTGRES_URL)
try:
await conn.execute('CREATE EXTENSION IF NOT EXISTS vector')
await conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"')
finally:
await conn.close()
_pool = await asyncpg.create_pool(
config.POSTGRES_URL,
min_size=2,
max_size=10,
init=_init_connection,
)
if not _schema_ready:
await _run_schema_migrations(_pool)
_schema_ready = True
return _pool
async def _init_connection(conn: asyncpg.Connection) -> None:
await register_vector(conn)
async def close_pool() -> None:
global _pool
if _pool:
await _pool.close()
_pool = None
# ── Schema ──────────────────────────────────────────────────────────
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS cases (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_number TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
appellants JSONB DEFAULT '[]',
respondents JSONB DEFAULT '[]',
subject TEXT DEFAULT '',
property_address TEXT DEFAULT '',
permit_number TEXT DEFAULT '',
committee_type TEXT DEFAULT 'ועדה מקומית',
status TEXT DEFAULT 'new',
hearing_date DATE,
decision_date DATE,
tags JSONB DEFAULT '[]',
notes TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS documents (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_id UUID REFERENCES cases(id) ON DELETE CASCADE,
doc_type TEXT NOT NULL,
title TEXT NOT NULL,
file_path TEXT NOT NULL,
extracted_text TEXT DEFAULT '',
extraction_status TEXT DEFAULT 'pending',
page_count INTEGER,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS document_chunks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
case_id UUID REFERENCES cases(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
section_type TEXT DEFAULT 'other',
embedding vector(1024),
page_number INTEGER,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS style_corpus (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
document_id UUID REFERENCES documents(id) ON DELETE SET NULL,
decision_number TEXT,
decision_date DATE,
subject_categories JSONB DEFAULT '[]',
full_text TEXT NOT NULL,
summary TEXT DEFAULT '',
outcome TEXT DEFAULT '',
key_principles JSONB DEFAULT '[]',
practice_area TEXT DEFAULT 'appeals_committee',
appeal_subtype TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS style_patterns (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
pattern_type TEXT NOT NULL,
pattern_text TEXT NOT NULL,
frequency INTEGER DEFAULT 1,
context TEXT DEFAULT '',
examples JSONB DEFAULT '[]',
appeal_subtype TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
ON document_chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
CREATE INDEX IF NOT EXISTS idx_chunks_case ON document_chunks(case_id);
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON document_chunks(document_id);
CREATE INDEX IF NOT EXISTS idx_docs_case ON documents(case_id);
CREATE INDEX IF NOT EXISTS idx_cases_status ON cases(status);
CREATE INDEX IF NOT EXISTS idx_cases_number ON cases(case_number);
"""
MIGRATIONS_SQL = """
ALTER TABLE cases ADD COLUMN IF NOT EXISTS expected_outcome TEXT DEFAULT '';
CREATE TABLE IF NOT EXISTS audit_log (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
action TEXT NOT NULL,
case_id UUID REFERENCES cases(id) ON DELETE SET NULL,
document_id UUID REFERENCES documents(id) ON DELETE SET NULL,
details JSONB DEFAULT '{}',
actor TEXT DEFAULT 'system',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_audit_case ON audit_log(case_id);
CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_log(action);
CREATE INDEX IF NOT EXISTS idx_audit_created ON audit_log(created_at DESC);
"""
# ── Phase 3: Workflow expansion ────────────────────────────────────
SCHEMA_V3_SQL = """
-- הרחבת decisions עם שדות חדשים
ALTER TABLE decisions ADD COLUMN IF NOT EXISTS direction_doc JSONB DEFAULT NULL;
ALTER TABLE decisions ADD COLUMN IF NOT EXISTS outcome_reasoning TEXT DEFAULT '';
-- הרחבת cases עם appeal_type (אם לא קיים)
ALTER TABLE cases ADD COLUMN IF NOT EXISTS appeal_type TEXT DEFAULT '';
ALTER TABLE cases ADD COLUMN IF NOT EXISTS practice_area TEXT DEFAULT 'appeals_committee';
ALTER TABLE cases ADD COLUMN IF NOT EXISTS appeal_subtype TEXT DEFAULT '';
-- active_draft_path = path to the DOCX that is the current source of truth
-- for this case's decision text. Set to the latest טיוטה-v*.docx after export,
-- or the latest עריכה-v*.docx after user upload. Used by revise_draft to know
-- what file to base Track Changes revisions on.
ALTER TABLE cases ADD COLUMN IF NOT EXISTS active_draft_path TEXT;
-- הרחבת style_corpus עם practice_area / appeal_subtype
ALTER TABLE style_corpus ADD COLUMN IF NOT EXISTS practice_area TEXT DEFAULT 'appeals_committee';
ALTER TABLE style_corpus ADD COLUMN IF NOT EXISTS appeal_subtype TEXT DEFAULT '';
-- הרחבת style_patterns עם appeal_subtype לניתוח סגנון נפרד לכל סוג ערר
ALTER TABLE style_patterns ADD COLUMN IF NOT EXISTS appeal_subtype TEXT DEFAULT '';
-- טבלת qa_results
CREATE TABLE IF NOT EXISTS qa_results (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE,
case_id UUID REFERENCES cases(id) ON DELETE CASCADE,
check_name TEXT NOT NULL,
passed BOOLEAN NOT NULL,
severity TEXT DEFAULT 'warning',
errors JSONB DEFAULT '[]',
details TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_qa_results_decision ON qa_results(decision_id);
CREATE INDEX IF NOT EXISTS idx_qa_results_case ON qa_results(case_id);
-- טבלת decision_definitions (אם לא קיימת)
CREATE TABLE IF NOT EXISTS decision_definitions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE,
term TEXT NOT NULL,
definition TEXT NOT NULL,
block_id TEXT DEFAULT 'block-he',
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_definitions_decision ON decision_definitions(decision_id);
-- טבלת appeal_type_rules (אם לא קיימת)
CREATE TABLE IF NOT EXISTS appeal_type_rules (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
appeal_type TEXT NOT NULL,
rule_category TEXT NOT NULL,
rule_key TEXT NOT NULL,
rule_value JSONB NOT NULL,
description TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(appeal_type, rule_category, rule_key)
);
-- image_placeholders על decision_blocks
ALTER TABLE decision_blocks ADD COLUMN IF NOT EXISTS image_placeholders JSONB DEFAULT '[]';
"""
# ── Phase 2: Decision + Knowledge + RAG layers ────────────────────
SCHEMA_V2_SQL = """
-- ═══════════════════════════════════════════════════════════════════
-- Layer 2: Decision
-- ═══════════════════════════════════════════════════════════════════
-- decisions: מטאדטה של החלטה (גרסה אחת = רשומה אחת)
CREATE TABLE IF NOT EXISTS decisions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_id UUID REFERENCES cases(id) ON DELETE CASCADE,
version INTEGER DEFAULT 1,
status TEXT DEFAULT 'draft', -- draft/review/final/published
outcome TEXT DEFAULT '', -- rejected/accepted/partial
outcome_summary TEXT DEFAULT '', -- תמצית תוצאה (שורה אחת)
total_paragraphs INTEGER DEFAULT 0,
total_words INTEGER DEFAULT 0,
decision_date DATE,
author TEXT DEFAULT 'דפנה תמיר',
panel_members JSONB DEFAULT '[]',
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(case_id, version)
);
-- decision_blocks: 12 בלוקים לפי block-schema.md
CREATE TABLE IF NOT EXISTS decision_blocks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE,
block_id TEXT NOT NULL, -- block-alef, block-bet, ... block-yod-bet
block_index INTEGER NOT NULL, -- 1-12
title TEXT DEFAULT '', -- כותרת הבלוק (ריק לבלוקים ללא כותרת)
content TEXT DEFAULT '', -- תוכן מלא (markdown)
word_count INTEGER DEFAULT 0,
weight_percent NUMERIC(5,2) DEFAULT 0, -- משקל בפועל (%)
generation_type TEXT DEFAULT '', -- template-fill/reproduction/paraphrase/...
model_used TEXT DEFAULT '', -- sonnet/opus/script
temperature NUMERIC(3,2) DEFAULT 0,
status TEXT DEFAULT 'empty', -- empty/draft/review/final
notes TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(decision_id, block_id)
);
-- decision_paragraphs: סעיפים בודדים עם מעקב ציטוטים
CREATE TABLE IF NOT EXISTS decision_paragraphs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
block_id UUID REFERENCES decision_blocks(id) ON DELETE CASCADE,
paragraph_number INTEGER NOT NULL, -- מספור רציף בתוך ההחלטה
content TEXT NOT NULL,
word_count INTEGER DEFAULT 0,
citations JSONB DEFAULT '[]', -- [{case_law_id, text, type}]
cross_references JSONB DEFAULT '[]', -- הפניות לסעיפים אחרים ["סעיף 5 לעיל"]
created_at TIMESTAMPTZ DEFAULT now()
);
-- claims: טענות צדדים (בלוק ז)
CREATE TABLE IF NOT EXISTS claims (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_id UUID REFERENCES cases(id) ON DELETE CASCADE,
party_role TEXT NOT NULL, -- appellant/respondent/permit_applicant/committee
party_name TEXT DEFAULT '',
claim_text TEXT NOT NULL,
claim_index INTEGER DEFAULT 0, -- סדר הופעה
source_document TEXT DEFAULT '', -- מאיזה מסמך חולצה הטענה
addressed_in_paragraph INTEGER, -- באיזה סעיף בדיון נענתה
created_at TIMESTAMPTZ DEFAULT now()
);
-- ═══════════════════════════════════════════════════════════════════
-- Layer 3: Legal Knowledge
-- ═══════════════════════════════════════════════════════════════════
-- case_law: פסיקה (תקדימים)
CREATE TABLE IF NOT EXISTS case_law (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_number TEXT UNIQUE NOT NULL, -- עע"מ 3975/22 או ערר 1011-03-25
case_name TEXT NOT NULL, -- שם קצר: "ב. קרן-נכסים"
court TEXT DEFAULT '', -- בג"ץ / עליון / מנהלי / ועדת ערר
date DATE,
subject_tags JSONB DEFAULT '[]', -- ["proprietary_claims", "parking"]
summary TEXT DEFAULT '', -- תמצית 2-3 משפטים
key_quote TEXT DEFAULT '', -- ציטוט מרכזי
full_text TEXT DEFAULT '', -- טקסט מלא אם זמין
source_url TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT now()
);
-- case_law_citations: קשרים בין פסיקה להחלטות שלנו
CREATE TABLE IF NOT EXISTS case_law_citations (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_law_id UUID REFERENCES case_law(id) ON DELETE CASCADE,
decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE,
paragraph_id UUID REFERENCES decision_paragraphs(id) ON DELETE SET NULL,
citation_type TEXT DEFAULT 'support', -- support/distinguish/overrule/obiter
context_text TEXT DEFAULT '', -- ההקשר שבו צוטט
created_at TIMESTAMPTZ DEFAULT now()
);
-- statutory_provisions: חקיקה נפוצה
CREATE TABLE IF NOT EXISTS statutory_provisions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
statute_name TEXT NOT NULL, -- "חוק התכנון והבנייה"
section_number TEXT NOT NULL, -- "152(א)(2)"
section_title TEXT DEFAULT '', -- "זכות ערר"
full_text TEXT DEFAULT '', -- נוסח הסעיף
common_usage TEXT DEFAULT '', -- מתי משתמשים
subject_tags JSONB DEFAULT '[]',
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(statute_name, section_number)
);
-- transition_phrases: ביטויי מעבר של דפנה
CREATE TABLE IF NOT EXISTS transition_phrases (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
phrase TEXT UNIQUE NOT NULL, -- "ועל מנת לא לצאת בחסר"
usage_context TEXT DEFAULT '', -- מתי להשתמש
block_types JSONB DEFAULT '[]', -- באילו בלוקים: ["block-yod"]
frequency INTEGER DEFAULT 1, -- כמה פעמים ראינו
source_decision TEXT DEFAULT '', -- מאיזו החלטה
created_at TIMESTAMPTZ DEFAULT now()
);
-- lessons_learned: לקחים מהשוואת טיוטות לגרסאות סופיות
CREATE TABLE IF NOT EXISTS lessons_learned (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
lesson_title TEXT NOT NULL, -- "Discussion = continuous essay, no sub-headers"
lesson_text TEXT NOT NULL, -- תיאור מלא
category TEXT DEFAULT '', -- structure/style/content/process
applies_to JSONB DEFAULT '[]', -- ["block-yod", "all"]
source_case TEXT DEFAULT '', -- "הכט 1180-1181"
severity TEXT DEFAULT 'important', -- critical/important/nice-to-have
created_at TIMESTAMPTZ DEFAULT now()
);
-- ═══════════════════════════════════════════════════════════════════
-- Layer 4: Extended RAG
-- ═══════════════════════════════════════════════════════════════════
-- paragraph_embeddings: embeddings של סעיפים בהחלטות
CREATE TABLE IF NOT EXISTS paragraph_embeddings (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
paragraph_id UUID REFERENCES decision_paragraphs(id) ON DELETE CASCADE,
embedding vector(1024),
created_at TIMESTAMPTZ DEFAULT now()
);
-- case_law_embeddings: embeddings של פסיקה
CREATE TABLE IF NOT EXISTS case_law_embeddings (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_law_id UUID REFERENCES case_law(id) ON DELETE CASCADE,
chunk_text TEXT NOT NULL,
embedding vector(1024),
created_at TIMESTAMPTZ DEFAULT now()
);
-- ═══════════════════════════════════════════════════════════════════
-- Chair Feedback (הערות דפנה על טיוטות)
-- ═══════════════════════════════════════════════════════════════════
CREATE TABLE IF NOT EXISTS chair_feedback (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_id UUID REFERENCES cases(id) ON DELETE SET NULL,
block_id TEXT DEFAULT '', -- block-yod, block-vav, etc.
feedback_text TEXT NOT NULL, -- ההערה של דפנה
category TEXT DEFAULT 'other', -- missing_content/wrong_tone/wrong_structure/factual_error/style/other
lesson_extracted TEXT DEFAULT '', -- הלקח שהופק
applied_to TEXT[] DEFAULT '{}', -- לאילו קבצים/כללים הלקח יושם
resolved BOOLEAN DEFAULT FALSE, -- האם הלקח יושם
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS tag_company_mappings (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
tag TEXT NOT NULL, -- appeal_subtype value (e.g. building_permit)
tag_label TEXT NOT NULL DEFAULT '', -- Hebrew display label
company_id TEXT NOT NULL, -- Paperclip company UUID
company_name TEXT NOT NULL DEFAULT '', -- cached company name for display
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(tag, company_id)
);
-- ═══════════════════════════════════════════════════════════════════
-- Indexes
-- ═══════════════════════════════════════════════════════════════════
CREATE INDEX IF NOT EXISTS idx_decisions_case ON decisions(case_id);
CREATE INDEX IF NOT EXISTS idx_decisions_status ON decisions(status);
CREATE INDEX IF NOT EXISTS idx_decision_blocks_decision ON decision_blocks(decision_id);
CREATE INDEX IF NOT EXISTS idx_decision_blocks_block_id ON decision_blocks(block_id);
CREATE INDEX IF NOT EXISTS idx_decision_paragraphs_block ON decision_paragraphs(block_id);
CREATE INDEX IF NOT EXISTS idx_claims_case ON claims(case_id);
CREATE INDEX IF NOT EXISTS idx_claims_role ON claims(party_role);
CREATE INDEX IF NOT EXISTS idx_case_law_subject ON case_law USING gin(subject_tags);
CREATE INDEX IF NOT EXISTS idx_case_law_citations_decision ON case_law_citations(decision_id);
CREATE INDEX IF NOT EXISTS idx_statutory_provisions_statute ON statutory_provisions(statute_name);
CREATE INDEX IF NOT EXISTS idx_transition_phrases_block ON transition_phrases USING gin(block_types);
CREATE INDEX IF NOT EXISTS idx_lessons_category ON lessons_learned(category);
CREATE INDEX IF NOT EXISTS idx_paragraph_embeddings_vec
ON paragraph_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);
CREATE INDEX IF NOT EXISTS idx_case_law_embeddings_vec
ON case_law_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);
"""
# ── Phase 4: Methodology alignment ──────────────────────────────
SCHEMA_V4_SQL = """
-- ═══════════════════════════════════════════════════════════════════
-- V4: Methodology alignment (decision-methodology.md)
-- ═══════════════════════════════════════════════════════════════════
-- claims: טיפול בטענות (bundle/skip) + סוג טענה
ALTER TABLE claims ADD COLUMN IF NOT EXISTS claim_type TEXT DEFAULT 'claim';
-- claim / response / reply
ALTER TABLE claims ADD COLUMN IF NOT EXISTS claim_handling TEXT DEFAULT 'address';
-- address (דיון מלא) / bundle (קיבוץ) / skip (דילוג)
ALTER TABLE claims ADD COLUMN IF NOT EXISTS bundle_group TEXT DEFAULT '';
-- שם הקבוצה לקיבוץ (למשל "פגמים פרוצדורליים")
ALTER TABLE claims ADD COLUMN IF NOT EXISTS handling_reason TEXT DEFAULT '';
-- נימוק לדילוג/קיבוץ (למשל "נבחנה ולא מצאנו ממש")
-- cases: תקן ביקורת + קטגוריות נושא
ALTER TABLE cases ADD COLUMN IF NOT EXISTS standard_of_review TEXT DEFAULT '';
-- "שיקול דעת תכנוני עצמאי" / "בחינת שומה מכרעת" / ...
ALTER TABLE cases ADD COLUMN IF NOT EXISTS subject_categories JSONB DEFAULT '[]';
-- ["חניה", "קווי בניין", "גובה", "שימוש חורג", ...]
-- case_law: רמת תקדים + מעמד
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS precedent_level TEXT DEFAULT '';
-- עליון / מנהלי / ועדת ערר ארצית / ועדת ערר מחוזית
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS is_binding BOOLEAN DEFAULT TRUE;
-- הלכה מחייבת (true) / אמרת אגב (false)
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS creac_role TEXT DEFAULT '';
-- rule (הנחה עליונה) / explanation (הרחבה) / analogy (אנלוגיה)
-- decisions: סדר סוגיות + תקן ביקורת
ALTER TABLE decisions ADD COLUMN IF NOT EXISTS issue_order JSONB DEFAULT '[]';
-- סדר הסוגיות שנקבע ע"י המנצח: [{"title": "...", "type": "threshold/dispositive/secondary"}]
ALTER TABLE decisions ADD COLUMN IF NOT EXISTS claim_handling JSONB DEFAULT '{}';
-- {"overrides": [{"claim_id": "...", "handling": "bundle", "group": "..."}]}
-- indexes
CREATE INDEX IF NOT EXISTS idx_claims_handling ON claims(claim_handling);
CREATE INDEX IF NOT EXISTS idx_claims_type ON claims(claim_type);
CREATE INDEX IF NOT EXISTS idx_case_law_level ON case_law(precedent_level);
"""
# ── Phase 5: Interim draft (appraiser facts + post-hearing flag) ───
SCHEMA_V5_SQL = """
-- appraiser_facts: תכניות והיתרים שצוינו ע"י כל שמאי בנפרד.
-- בשונה מ-claims (שהוא טענה משפטית), כאן מאוחסנת עובדה עניינית מתוך השומה.
-- שימוש ראשי: זיהוי סתירות בין שמאים על איזו תכנית או היתר חל בנכס.
CREATE TABLE IF NOT EXISTS appraiser_facts (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_id UUID NOT NULL REFERENCES cases(id) ON DELETE CASCADE,
document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
appraiser_name TEXT NOT NULL,
fact_type TEXT NOT NULL CHECK (fact_type IN ('plan', 'permit')),
identifier TEXT NOT NULL,
details JSONB NOT NULL DEFAULT '{}',
page_number INTEGER,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_appraiser_facts_case ON appraiser_facts(case_id, fact_type);
CREATE INDEX IF NOT EXISTS idx_appraiser_facts_identifier ON appraiser_facts(case_id, identifier);
-- V5.1: appraiser_side — which party this appraiser represents.
-- Values: 'committee' (הוועדה), 'appellant' (העורר), 'deciding' (מכריע).
-- Required by extract_appraiser_facts; the chair tags it via the UI before extraction.
-- Set via documents.metadata.appraiser_side at upload/edit time, then propagated here
-- so that conflict rendering in block-tet can label each entry with its side.
ALTER TABLE appraiser_facts ADD COLUMN IF NOT EXISTS appraiser_side TEXT DEFAULT '';
CREATE INDEX IF NOT EXISTS idx_appraiser_facts_side ON appraiser_facts(case_id, appraiser_side);
-- documents.metadata.is_post_hearing: flag for materials submitted after the hearing
-- (השלמות טיעון, הצעות פשרה). Used by block-chet to include them in the proceedings narrative.
-- documents.metadata.appraiser_side: which side the appraiser represents (see above).
-- No schema change needed — uses existing JSONB metadata column.
"""
# ── V6: Case archiving ────────────────────────────────────────────
SCHEMA_V6_SQL = """
-- archived_at: timestamp when the case was moved to the archive screen.
-- NULL = active (default). Set via POST /api/cases/{case_number}/archive.
-- Cleared via POST /api/cases/{case_number}/restore.
-- The /api/cases endpoint filters out archived cases by default;
-- pass ?include_archived=true (or use /api/cases/archived) to see them.
ALTER TABLE cases ADD COLUMN IF NOT EXISTS archived_at TIMESTAMPTZ;
CREATE INDEX IF NOT EXISTS idx_cases_archived ON cases(archived_at) WHERE archived_at IS NOT NULL;
"""
# ── V7: External Precedent Library + halacha extraction ──────────
# Chair-uploaded external court rulings and other appeals committee decisions
# become an authoritative law corpus. Distinct from style_corpus (Daphna's
# style) and case_precedents (chair-attached quotes scoped to a single case).
SCHEMA_V7_SQL = """
-- case_law extensions: distinguish chair-uploaded full rulings from
-- auto-extracted citation stubs, and track ingestion progress.
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS source_kind TEXT DEFAULT 'cited_only';
-- 'external_upload' (chair uploaded full ruling) | 'cited_only' (stub from
-- references_extractor) | 'nevo_seed' (future: auto-fetched from Nevo).
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS document_id UUID REFERENCES documents(id) ON DELETE SET NULL;
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS extraction_status TEXT DEFAULT 'pending';
-- 'pending' | 'processing' | 'completed' | 'failed'
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS halacha_extraction_status TEXT DEFAULT 'pending';
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS practice_area TEXT DEFAULT '';
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS appeal_subtype TEXT DEFAULT '';
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS headnote TEXT DEFAULT '';
-- chair-editable abstract shown in search results.
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS source_type TEXT DEFAULT '';
-- 'court_ruling' | 'appeals_committee'
-- practice_area is closed to the three appeals committee domains.
DO $$ BEGIN
ALTER TABLE case_law ADD CONSTRAINT case_law_practice_area_check
CHECK (practice_area IN ('', 'rishuy_uvniya', 'betterment_levy', 'compensation_197'));
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
CREATE INDEX IF NOT EXISTS idx_case_law_source_kind ON case_law(source_kind);
CREATE INDEX IF NOT EXISTS idx_case_law_practice ON case_law(practice_area, appeal_subtype);
-- precedent_chunks: full-text chunks of an uploaded ruling, with embeddings.
-- Analog of document_chunks for case_law rows where source_kind='external_upload'.
CREATE TABLE IF NOT EXISTS precedent_chunks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_law_id UUID REFERENCES case_law(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
section_type TEXT DEFAULT 'other',
-- intro | facts | legal_analysis | ruling | conclusion | other
page_number INTEGER,
embedding vector(1024),
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_precedent_chunks_case_law ON precedent_chunks(case_law_id);
CREATE INDEX IF NOT EXISTS idx_precedent_chunks_section ON precedent_chunks(case_law_id, section_type);
CREATE INDEX IF NOT EXISTS idx_precedent_chunks_vec
ON precedent_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);
-- halachot: extracted binding rules. One halacha = one rule + verbatim quote.
-- Embedded separately for rule-precision semantic match (chunks centroid is
-- dominated by surrounding context). All halachot start as pending_review;
-- only approved/published rows are visible to search_precedent_library.
CREATE TABLE IF NOT EXISTS halachot (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_law_id UUID REFERENCES case_law(id) ON DELETE CASCADE,
halacha_index INTEGER NOT NULL,
rule_statement TEXT NOT NULL,
rule_type TEXT DEFAULT 'binding',
-- binding | interpretive | procedural | obiter
reasoning_summary TEXT DEFAULT '',
supporting_quote TEXT NOT NULL,
page_reference TEXT DEFAULT '',
practice_areas TEXT[] DEFAULT '{}',
subject_tags TEXT[] DEFAULT '{}',
cites TEXT[] DEFAULT '{}',
confidence NUMERIC(3,2) DEFAULT 0.0,
quote_verified BOOLEAN DEFAULT FALSE,
review_status TEXT DEFAULT 'pending_review',
-- pending_review | approved | rejected | published
reviewer TEXT DEFAULT '',
reviewed_at TIMESTAMPTZ,
embedding vector(1024),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_halachot_case_law ON halachot(case_law_id);
CREATE INDEX IF NOT EXISTS idx_halachot_status ON halachot(review_status);
CREATE INDEX IF NOT EXISTS idx_halachot_practice ON halachot USING gin(practice_areas);
CREATE INDEX IF NOT EXISTS idx_halachot_tags ON halachot USING gin(subject_tags);
CREATE INDEX IF NOT EXISTS idx_halachot_vec
ON halachot USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);
"""
# ── V8: Extraction request queue ─────────────────────────────────
# Web UI buttons ("Sparkles" = request metadata extraction; "Refresh" =
# request halacha extraction) run inside the FastAPI container, which has
# no `claude` CLI. They can't run the LLM extractor directly. Instead they
# stamp a request timestamp here, and the chair (or me) runs the MCP tool
# `precedent_process_pending_extractions` from local Claude Code, where the
# CLI is available, to drain the queue. See claude_session.py for the rule.
SCHEMA_V8_SQL = """
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS metadata_extraction_requested_at TIMESTAMPTZ;
ALTER TABLE case_law ADD COLUMN IF NOT EXISTS halacha_extraction_requested_at TIMESTAMPTZ;
CREATE INDEX IF NOT EXISTS idx_case_law_metadata_requested
ON case_law(metadata_extraction_requested_at)
WHERE metadata_extraction_requested_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_case_law_halacha_requested
ON case_law(halacha_extraction_requested_at)
WHERE halacha_extraction_requested_at IS NOT NULL;
"""
# ── V9: Multimodal page-image embeddings ─────────────────────────
# voyage-multimodal-3 (1024-dim) embeds the whole page as an image:
# captures table layout, scanned content, signatures, plans — content
# that text-OCR loses. Ingestion is gated by config.MULTIMODAL_ENABLED;
# search_*_hybrid() merge text-cosine + image-cosine when present.
# image_thumbnail_path is a relative path under DATA_DIR/cases/{case}/
# thumbnails/ or DATA_DIR/precedent-library/thumbnails/ — a small JPEG
# rendered at config.MULTIMODAL_THUMB_DPI for UI preview, distinct from
# the higher-DPI render fed to the embedder (which is not persisted).
SCHEMA_V9_SQL = """
CREATE TABLE IF NOT EXISTS document_image_embeddings (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
case_id UUID REFERENCES cases(id) ON DELETE CASCADE,
page_number INTEGER NOT NULL,
image_thumbnail_path TEXT,
embedding vector(1024),
model_name TEXT DEFAULT 'voyage-multimodal-3',
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(document_id, page_number)
);
CREATE INDEX IF NOT EXISTS idx_doc_img_emb_vec
ON document_image_embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 50);
CREATE INDEX IF NOT EXISTS idx_doc_img_emb_doc
ON document_image_embeddings(document_id);
CREATE INDEX IF NOT EXISTS idx_doc_img_emb_case
ON document_image_embeddings(case_id);
CREATE TABLE IF NOT EXISTS precedent_image_embeddings (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
case_law_id UUID REFERENCES case_law(id) ON DELETE CASCADE,
page_number INTEGER NOT NULL,
image_thumbnail_path TEXT,
embedding vector(1024),
model_name TEXT DEFAULT 'voyage-multimodal-3',
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(case_law_id, page_number)
);
CREATE INDEX IF NOT EXISTS idx_prec_img_emb_vec
ON precedent_image_embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 50);
CREATE INDEX IF NOT EXISTS idx_prec_img_emb_case_law
ON precedent_image_embeddings(case_law_id);
"""
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
async with pool.acquire() as conn:
await conn.execute(SCHEMA_SQL)
await conn.execute(MIGRATIONS_SQL)
await conn.execute(SCHEMA_V2_SQL)
await conn.execute(SCHEMA_V3_SQL)
await conn.execute(SCHEMA_V4_SQL)
await conn.execute(SCHEMA_V5_SQL)
await conn.execute(SCHEMA_V6_SQL)
await conn.execute(SCHEMA_V7_SQL)
await conn.execute(SCHEMA_V8_SQL)
await conn.execute(SCHEMA_V9_SQL)
logger.info("Database schema initialized (v1-v9)")
async def init_schema() -> None:
"""Backward-compatible wrapper. Schema init now runs lazily inside get_pool()."""
await get_pool()
# ── Case CRUD ───────────────────────────────────────────────────────
async def create_case(
case_number: str,
title: str,
appellants: list[str] | None = None,
respondents: list[str] | None = None,
subject: str = "",
property_address: str = "",
permit_number: str = "",
committee_type: str = "ועדה מקומית",
hearing_date: date | None = None,
notes: str = "",
expected_outcome: str = "",
practice_area: str = "appeals_committee",
appeal_subtype: str = "",
) -> dict:
pool = await get_pool()
case_id = uuid4()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO cases (id, case_number, title, appellants, respondents,
subject, property_address, permit_number, committee_type,
hearing_date, notes, expected_outcome,
practice_area, appeal_subtype)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)""",
case_id, case_number, title,
json.dumps(appellants or []),
json.dumps(respondents or []),
subject, property_address, permit_number, committee_type,
hearing_date, notes, expected_outcome,
practice_area, appeal_subtype,
)
return await get_case(case_id)
async def get_case(case_id: UUID) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM cases WHERE id = $1", case_id)
if row is None:
return None
return _row_to_case(row)
async def set_active_draft_path(case_id: UUID, path: str | None) -> None:
"""Update the case's active_draft_path (the DOCX that is source of truth)."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"UPDATE cases SET active_draft_path = $1, updated_at = now() WHERE id = $2",
path, case_id,
)
async def get_active_draft_path(case_id: UUID) -> str | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT active_draft_path FROM cases WHERE id = $1", case_id,
)
return row["active_draft_path"] if row else None
async def get_case_by_number(case_number: str) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM cases WHERE case_number = $1", case_number
)
if row is None:
return None
return _row_to_case(row)
async def list_cases(
status: str | None = None,
limit: int = 50,
include_archived: bool = False,
archived_only: bool = False,
) -> list[dict]:
pool = await get_pool()
where = []
args: list = []
if status:
where.append(f"status = ${len(args) + 1}")
args.append(status)
if archived_only:
where.append("archived_at IS NOT NULL")
elif not include_archived:
where.append("archived_at IS NULL")
where_clause = f"WHERE {' AND '.join(where)}" if where else ""
args.append(limit)
sql = f"SELECT * FROM cases {where_clause} ORDER BY updated_at DESC LIMIT ${len(args)}"
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *args)
return [_row_to_case(r) for r in rows]
async def update_case(case_id: UUID, **fields) -> dict | None:
if not fields:
return await get_case(case_id)
pool = await get_pool()
set_clauses = []
values = []
for i, (key, val) in enumerate(fields.items(), start=2):
if key in ("appellants", "respondents", "tags"):
val = json.dumps(val)
set_clauses.append(f"{key} = ${i}")
values.append(val)
set_clauses.append("updated_at = now()")
sql = f"UPDATE cases SET {', '.join(set_clauses)} WHERE id = $1"
async with pool.acquire() as conn:
await conn.execute(sql, case_id, *values)
return await get_case(case_id)
def _row_to_case(row: asyncpg.Record) -> dict:
d = dict(row)
for field in ("appellants", "respondents", "tags"):
if isinstance(d.get(field), str):
d[field] = json.loads(d[field])
d["id"] = str(d["id"])
return d
async def archive_case(case_id: UUID) -> dict | None:
"""Mark a case as archived. Returns updated row, or None if not found."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"UPDATE cases SET archived_at = now(), updated_at = now() "
"WHERE id = $1 RETURNING *",
case_id,
)
return _row_to_case(row) if row else None
async def restore_case(case_id: UUID) -> dict | None:
"""Clear the archived_at timestamp. Returns updated row, or None if not found."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"UPDATE cases SET archived_at = NULL, updated_at = now() "
"WHERE id = $1 RETURNING *",
case_id,
)
return _row_to_case(row) if row else None
async def delete_case(case_id: UUID) -> bool:
"""Delete a case row. Returns True if a row was actually removed.
All dependent rows are removed automatically by FK constraints:
• CASCADE: documents, document_chunks, claims, appraiser_facts,
decisions, qa_results, case_precedents
• SET NULL: audit_log.case_id, chair_feedback.case_id
NOTE: this only touches the legal-ai database. The Paperclip project
(issues, comments, runs) and Gitea repo for the case live in other
systems and are NOT cleaned up here — call sites that need a full
reset must handle those separately.
"""
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute("DELETE FROM cases WHERE id = $1", case_id)
# asyncpg execute returns "DELETE <n>" — extract count.
return int(result.split()[-1]) > 0
# ── Document CRUD ───────────────────────────────────────────────────
async def create_document(
case_id: UUID,
doc_type: str,
title: str,
file_path: str,
page_count: int | None = None,
) -> dict:
pool = await get_pool()
doc_id = uuid4()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO documents (id, case_id, doc_type, title, file_path, page_count)
VALUES ($1, $2, $3, $4, $5, $6)""",
doc_id, case_id, doc_type, title, file_path, page_count,
)
row = await conn.fetchrow("SELECT * FROM documents WHERE id = $1", doc_id)
return _row_to_doc(row)
async def update_document(doc_id: UUID, **fields) -> None:
if not fields:
return
pool = await get_pool()
set_clauses = []
values = []
for i, (key, val) in enumerate(fields.items(), start=2):
if key == "metadata":
val = json.dumps(val)
set_clauses.append(f"{key} = ${i}")
values.append(val)
sql = f"UPDATE documents SET {', '.join(set_clauses)} WHERE id = $1"
async with pool.acquire() as conn:
await conn.execute(sql, doc_id, *values)
async def get_document(doc_id: UUID) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM documents WHERE id = $1", doc_id)
return _row_to_doc(row) if row else None
async def list_documents(case_id: UUID) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM documents WHERE case_id = $1 ORDER BY created_at", case_id
)
return [_row_to_doc(r) for r in rows]
async def get_document_text(doc_id: UUID) -> str:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT extracted_text FROM documents WHERE id = $1", doc_id
)
return row["extracted_text"] if row else ""
def _row_to_doc(row: asyncpg.Record) -> dict:
d = dict(row)
d["id"] = str(d["id"])
d["case_id"] = str(d["case_id"])
if isinstance(d.get("metadata"), str):
d["metadata"] = json.loads(d["metadata"])
return d
# ── Claims ─────────────────────────────────────────────────────────
async def store_claims(case_id: UUID, claims: list[dict], source_document: str = "") -> int:
"""Store extracted claims. Replaces existing claims from same source.
Each claim dict: party_role, claim_text, claim_index, party_name (optional)
"""
pool = await get_pool()
async with pool.acquire() as conn:
if source_document:
await conn.execute(
"DELETE FROM claims WHERE case_id = $1 AND source_document = $2",
case_id, source_document,
)
for claim in claims:
await conn.execute(
"""INSERT INTO claims (case_id, party_role, party_name, claim_text, claim_index, source_document, claim_type)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
case_id,
claim["party_role"],
claim.get("party_name", ""),
claim["claim_text"],
claim.get("claim_index", 0),
source_document,
claim.get("claim_type", "claim"),
)
return len(claims)
async def get_claims(case_id: UUID, party_role: str | None = None) -> list[dict]:
"""Get claims for a case, optionally filtered by party role."""
pool = await get_pool()
async with pool.acquire() as conn:
if party_role:
rows = await conn.fetch(
"SELECT * FROM claims WHERE case_id = $1 AND party_role = $2 ORDER BY claim_index",
case_id, party_role,
)
else:
rows = await conn.fetch(
"SELECT * FROM claims WHERE case_id = $1 ORDER BY party_role, claim_index",
case_id,
)
return [dict(r) for r in rows]
# ── Decisions ──────────────────────────────────────────────────────
async def create_decision(
case_id: UUID,
outcome: str = "",
outcome_summary: str = "",
outcome_reasoning: str = "",
direction_doc: dict | None = None,
) -> dict:
"""Create a decision record for a case."""
pool = await get_pool()
decision_id = uuid4()
async with pool.acquire() as conn:
# Check if a decision already exists for this case
existing = await conn.fetchrow(
"SELECT id, version FROM decisions WHERE case_id = $1 ORDER BY version DESC LIMIT 1",
case_id,
)
version = (existing["version"] + 1) if existing else 1
await conn.execute(
"""INSERT INTO decisions (id, case_id, version, outcome, outcome_summary,
outcome_reasoning, direction_doc)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
decision_id, case_id, version, outcome, outcome_summary,
outcome_reasoning, json.dumps(direction_doc) if direction_doc else None,
)
return await get_decision(decision_id)
async def get_decision(decision_id: UUID) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM decisions WHERE id = $1", decision_id)
if not row:
return None
d = dict(row)
d["id"] = str(d["id"])
d["case_id"] = str(d["case_id"])
if isinstance(d.get("direction_doc"), str):
d["direction_doc"] = json.loads(d["direction_doc"])
if isinstance(d.get("panel_members"), str):
d["panel_members"] = json.loads(d["panel_members"])
return d
async def get_decision_by_case(case_id: UUID) -> dict | None:
"""Get the latest decision for a case."""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM decisions WHERE case_id = $1 ORDER BY version DESC LIMIT 1",
case_id,
)
if not row:
return None
d = dict(row)
d["id"] = str(d["id"])
d["case_id"] = str(d["case_id"])
if isinstance(d.get("direction_doc"), str):
d["direction_doc"] = json.loads(d["direction_doc"])
if isinstance(d.get("panel_members"), str):
d["panel_members"] = json.loads(d["panel_members"])
return d
async def update_decision(decision_id: UUID, **fields) -> None:
if not fields:
return
pool = await get_pool()
set_clauses = []
values = []
for i, (key, val) in enumerate(fields.items(), start=2):
if key in ("direction_doc", "panel_members") and isinstance(val, (dict, list)):
val = json.dumps(val)
set_clauses.append(f"{key} = ${i}")
values.append(val)
set_clauses.append("updated_at = now()")
sql = f"UPDATE decisions SET {', '.join(set_clauses)} WHERE id = $1"
async with pool.acquire() as conn:
await conn.execute(sql, decision_id, *values)
# ── Document deletion ──────────────────────────────────────────────
async def delete_document(doc_id: UUID) -> bool:
"""Delete a document and all its chunks. Returns True if deleted."""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"DELETE FROM document_chunks WHERE document_id = $1", doc_id
)
result = await conn.execute(
"DELETE FROM documents WHERE id = $1", doc_id
)
return int(result.split()[-1]) > 0
# ── Chunks & Vectors ───────────────────────────────────────────────
async def delete_document_chunks(document_id: UUID) -> int:
"""Delete all chunks for a document (used before reprocessing)."""
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM document_chunks WHERE document_id = $1", document_id
)
return int(result.split()[-1]) # e.g. "DELETE 5" -> 5
async def store_chunks(
document_id: UUID,
case_id: UUID | None,
chunks: list[dict],
) -> int:
"""Store document chunks with embeddings. Each chunk dict has:
content, section_type, embedding (list[float]), page_number, chunk_index
"""
pool = await get_pool()
async with pool.acquire() as conn:
# Delete existing chunks for this document
await conn.execute(
"DELETE FROM document_chunks WHERE document_id = $1", document_id
)
for chunk in chunks:
await conn.execute(
"""INSERT INTO document_chunks
(document_id, case_id, chunk_index, content, section_type, embedding, page_number)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
document_id, case_id,
chunk["chunk_index"],
chunk["content"],
chunk.get("section_type", "other"),
chunk["embedding"],
chunk.get("page_number"),
)
return len(chunks)
async def search_similar(
query_embedding: list[float],
limit: int = 10,
case_id: UUID | None = None,
section_type: str | None = None,
practice_area: str | None = None,
appeal_subtype: str | None = None,
) -> list[dict]:
"""Cosine similarity search on document chunks."""
pool = await get_pool()
conditions = []
params: list = [query_embedding, limit]
param_idx = 3
if case_id:
conditions.append(f"dc.case_id = ${param_idx}")
params.append(case_id)
param_idx += 1
if section_type:
conditions.append(f"dc.section_type = ${param_idx}")
params.append(section_type)
param_idx += 1
if practice_area:
conditions.append(f"c.practice_area = ${param_idx}")
params.append(practice_area)
param_idx += 1
if appeal_subtype:
conditions.append(f"c.appeal_subtype = ${param_idx}")
params.append(appeal_subtype)
param_idx += 1
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
sql = f"""
SELECT dc.content, dc.section_type, dc.page_number,
dc.document_id, dc.case_id,
d.title AS document_title,
c.case_number,
1 - (dc.embedding <=> $1) AS score
FROM document_chunks dc
JOIN documents d ON d.id = dc.document_id
JOIN cases c ON c.id = dc.case_id
{where}
ORDER BY dc.embedding <=> $1
LIMIT $2
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(r) for r in rows]
# ── Style corpus ────────────────────────────────────────────────────
async def add_to_style_corpus(
document_id: UUID | None,
decision_number: str,
decision_date: date | None,
subject_categories: list[str],
full_text: str,
summary: str = "",
outcome: str = "",
key_principles: list[str] | None = None,
practice_area: str = "appeals_committee",
appeal_subtype: str = "",
) -> UUID:
pool = await get_pool()
corpus_id = uuid4()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO style_corpus
(id, document_id, decision_number, decision_date,
subject_categories, full_text, summary, outcome, key_principles,
practice_area, appeal_subtype)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
corpus_id, document_id, decision_number, decision_date,
json.dumps(subject_categories), full_text, summary, outcome,
json.dumps(key_principles or []),
practice_area, appeal_subtype,
)
return corpus_id
async def delete_from_style_corpus(corpus_id: UUID) -> dict:
"""Remove a decision from style_corpus + related documents (cascades chunks).
Also tries to delete the [קורפוס] document associated by title match,
since the current training pipeline inserts style_corpus with document_id=NULL.
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
row = await conn.fetchrow(
"DELETE FROM style_corpus WHERE id = $1 "
"RETURNING decision_number, document_id",
corpus_id,
)
if not row:
return {"deleted": False, "reason": "not found"}
docs_deleted = 0
if row["document_id"]:
await conn.execute(
"DELETE FROM documents WHERE id = $1", row["document_id"]
)
docs_deleted = 1
else:
# Best-effort: match a [קורפוס] document by the decision_number
# in its title. Only for single, unambiguous matches.
if row["decision_number"]:
docs = await conn.fetch(
"SELECT id FROM documents "
"WHERE case_id IS NULL AND title LIKE $1",
f"%{row['decision_number']}%",
)
if len(docs) == 1:
await conn.execute(
"DELETE FROM documents WHERE id = $1", docs[0]["id"]
)
docs_deleted = 1
return {
"deleted": True,
"decision_number": row["decision_number"],
"docs_deleted": docs_deleted,
}
async def get_style_patterns(pattern_type: str | None = None) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
if pattern_type:
rows = await conn.fetch(
"SELECT * FROM style_patterns WHERE pattern_type = $1 ORDER BY frequency DESC",
pattern_type,
)
else:
rows = await conn.fetch(
"SELECT * FROM style_patterns ORDER BY pattern_type, frequency DESC"
)
return [dict(r) for r in rows]
async def upsert_style_pattern(
pattern_type: str,
pattern_text: str,
context: str = "",
examples: list[str] | None = None,
appeal_subtype: str = "",
) -> None:
pool = await get_pool()
async with pool.acquire() as conn:
existing = await conn.fetchrow(
"SELECT id, frequency FROM style_patterns "
"WHERE pattern_type = $1 AND pattern_text = $2 AND appeal_subtype = $3",
pattern_type, pattern_text, appeal_subtype,
)
if existing:
await conn.execute(
"UPDATE style_patterns SET frequency = frequency + 1 WHERE id = $1",
existing["id"],
)
else:
await conn.execute(
"""INSERT INTO style_patterns (pattern_type, pattern_text, context, examples, appeal_subtype)
VALUES ($1, $2, $3, $4, $5)""",
pattern_type, pattern_text, context,
json.dumps(examples or []),
appeal_subtype,
)
async def clear_style_patterns(appeal_subtype: str = "") -> None:
"""Delete style patterns, optionally filtered by appeal_subtype.
Empty appeal_subtype = delete ALL patterns.
"""
pool = await get_pool()
async with pool.acquire() as conn:
if appeal_subtype:
await conn.execute(
"DELETE FROM style_patterns WHERE appeal_subtype = $1", appeal_subtype
)
else:
await conn.execute("DELETE FROM style_patterns")
# ── Semantic Search (V2 — decision blocks & case law) ─────────────
async def search_similar_paragraphs(
query_embedding: list[float],
limit: int = 10,
block_type: str | None = None,
) -> list[dict]:
"""Search decision paragraphs by semantic similarity."""
pool = await get_pool()
conditions = []
params: list = [query_embedding, limit]
param_idx = 3
if block_type:
conditions.append(f"db.block_id = ${param_idx}")
params.append(block_type)
param_idx += 1
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
sql = f"""
SELECT dp.content, dp.word_count, dp.paragraph_number,
db.block_id AS block_type, db.title AS block_title,
c.case_number, c.title AS case_title,
d.outcome, d.author,
1 - (pe.embedding <=> $1) AS score
FROM paragraph_embeddings pe
JOIN decision_paragraphs dp ON dp.id = pe.paragraph_id
JOIN decision_blocks db ON db.id = dp.block_id
JOIN decisions d ON d.id = db.decision_id
JOIN cases c ON c.id = d.case_id
{where}
ORDER BY pe.embedding <=> $1
LIMIT $2
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(r) for r in rows]
async def search_similar_case_law(
query_embedding: list[float],
limit: int = 5,
) -> list[dict]:
"""Search case law by semantic similarity."""
pool = await get_pool()
sql = """
SELECT cl.case_number, cl.case_name, cl.court, cl.summary,
cl.key_quote, cl.subject_tags,
cle.chunk_text,
1 - (cle.embedding <=> $1) AS score
FROM case_law_embeddings cle
JOIN case_law cl ON cl.id = cle.case_law_id
ORDER BY cle.embedding <=> $1
LIMIT $2
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, query_embedding, limit)
results = []
for r in rows:
d = dict(r)
if isinstance(d.get("subject_tags"), str):
d["subject_tags"] = json.loads(d["subject_tags"])
results.append(d)
return results
async def search_precedents(
query_embedding: list[float],
limit: int = 10,
) -> list[dict]:
"""Combined search: paragraphs + case law, ranked by score."""
paragraphs = await search_similar_paragraphs(query_embedding, limit=limit)
case_law = await search_similar_case_law(query_embedding, limit=limit)
# Combine and sort by score
results = []
for p in paragraphs:
results.append({
"type": "decision_paragraph",
"score": float(p["score"]),
"case_number": p["case_number"],
"case_title": p["case_title"],
"block_type": p["block_type"],
"content": p["content"][:500],
"author": p["author"],
})
for c in case_law:
results.append({
"type": "case_law",
"score": float(c["score"]),
"case_number": c["case_number"],
"case_name": c["case_name"],
"court": c["court"],
"content": c["summary"],
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
# ── Case precedents (CRUD) ────────────────────────────────────────
async def create_case_precedent(
case_id: UUID,
quote: str,
citation: str,
section_id: str | None = None,
chair_note: str = "",
pdf_document_id: UUID | None = None,
practice_area: str | None = None,
) -> dict:
"""Insert a new precedent attached to a case."""
pool = await get_pool()
row = await pool.fetchrow(
"""
INSERT INTO case_precedents
(case_id, section_id, quote, citation, chair_note, pdf_document_id, practice_area)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
""",
case_id, section_id, quote, citation, chair_note, pdf_document_id, practice_area,
)
return dict(row)
async def list_case_precedents(case_id: UUID) -> list[dict]:
"""List all precedents attached to a case, ordered by section then creation time."""
pool = await get_pool()
rows = await pool.fetch(
"""
SELECT id, case_id, section_id, quote, citation, chair_note,
pdf_document_id, practice_area, created_at, updated_at
FROM case_precedents
WHERE case_id = $1
ORDER BY section_id NULLS LAST, created_at
""",
case_id,
)
return [dict(r) for r in rows]
async def delete_case_precedent(precedent_id: UUID) -> bool:
"""Delete a precedent attachment by ID. Returns True if deleted."""
pool = await get_pool()
result = await pool.execute(
"DELETE FROM case_precedents WHERE id = $1", precedent_id
)
return result == "DELETE 1"
async def search_precedent_library(
query: str, practice_area: str = "", limit: int = 10,
) -> list[dict]:
"""Search all precedents across cases by citation or quote text."""
pool = await get_pool()
pattern = f"%{query}%"
if practice_area:
rows = await pool.fetch(
"""
SELECT id, case_id, section_id, quote, citation, chair_note,
practice_area, created_at
FROM case_precedents
WHERE (citation ILIKE $1 OR quote ILIKE $1)
AND practice_area = $2
ORDER BY created_at DESC
LIMIT $3
""",
pattern, practice_area, limit,
)
else:
rows = await pool.fetch(
"""
SELECT id, case_id, section_id, quote, citation, chair_note,
practice_area, created_at
FROM case_precedents
WHERE citation ILIKE $1 OR quote ILIKE $1
ORDER BY created_at DESC
LIMIT $2
""",
pattern, limit,
)
return [dict(r) for r in rows]
# ── Chair feedback ────────────────────────────────────────────────
async def record_chair_feedback(
case_id: UUID | None,
block_id: str,
feedback_text: str,
category: str = "other",
lesson_extracted: str = "",
) -> UUID:
"""Record feedback from the chair (Dafna) on a draft block."""
pool = await get_pool()
feedback_id = uuid4()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO chair_feedback
(id, case_id, block_id, feedback_text, category, lesson_extracted)
VALUES ($1, $2, $3, $4, $5, $6)""",
feedback_id, case_id, block_id, feedback_text, category,
lesson_extracted,
)
return feedback_id
async def list_chair_feedback(
case_id: UUID | None = None,
category: str | None = None,
unresolved_only: bool = False,
) -> list[dict]:
"""List chair feedback, optionally filtered."""
pool = await get_pool()
conditions = []
params: list = []
idx = 1
if case_id:
conditions.append(f"case_id = ${idx}")
params.append(case_id)
idx += 1
if category:
conditions.append(f"category = ${idx}")
params.append(category)
idx += 1
if unresolved_only:
conditions.append("resolved = FALSE")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
async with pool.acquire() as conn:
rows = await conn.fetch(
f"SELECT * FROM chair_feedback {where} ORDER BY created_at DESC",
*params,
)
return [dict(r) for r in rows]
async def resolve_chair_feedback(
feedback_id: UUID,
applied_to: list[str],
) -> None:
"""Mark feedback as resolved and record where it was applied."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""UPDATE chair_feedback
SET resolved = TRUE, applied_to = $2
WHERE id = $1""",
feedback_id, applied_to,
)
# ── Appraiser facts (V5 — for interim drafts) ─────────────────────
async def replace_appraiser_facts(
case_id: UUID,
document_id: UUID,
facts: list[dict],
) -> int:
"""Replace all appraiser_facts for a given document.
Each fact dict: appraiser_name, appraiser_side, fact_type ('plan'|'permit'),
identifier, details (dict), page_number (optional).
"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"DELETE FROM appraiser_facts WHERE document_id = $1", document_id,
)
for f in facts:
await conn.execute(
"""INSERT INTO appraiser_facts
(case_id, document_id, appraiser_name, appraiser_side,
fact_type, identifier, details, page_number)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)""",
case_id, document_id,
f["appraiser_name"],
f.get("appraiser_side", ""),
f["fact_type"],
f["identifier"],
json.dumps(f.get("details", {}), ensure_ascii=False),
f.get("page_number"),
)
return len(facts)
async def list_appraiser_facts(
case_id: UUID,
fact_type: str | None = None,
) -> list[dict]:
"""List appraiser_facts for a case, optionally filtered by fact_type."""
pool = await get_pool()
async with pool.acquire() as conn:
if fact_type:
rows = await conn.fetch(
"""SELECT * FROM appraiser_facts
WHERE case_id = $1 AND fact_type = $2
ORDER BY identifier, appraiser_name""",
case_id, fact_type,
)
else:
rows = await conn.fetch(
"""SELECT * FROM appraiser_facts
WHERE case_id = $1
ORDER BY fact_type, identifier, appraiser_name""",
case_id,
)
results = []
for r in rows:
d = dict(r)
d["id"] = str(d["id"])
d["case_id"] = str(d["case_id"])
d["document_id"] = str(d["document_id"])
if isinstance(d.get("details"), str):
d["details"] = json.loads(d["details"])
results.append(d)
return results
async def detect_appraiser_conflicts(case_id: UUID) -> list[dict]:
"""Detect conflicts: identifiers cited by 2+ different appraisers in this case.
A conflict exists when the SAME identifier (e.g., "תמ"א 38") was reported
differently by two appraisers — different details, or one cited it and the
other did not. Returns list of conflict groups. Each entry in a group
carries the appraiser's side so the caller can label it as committee /
appellant / deciding.
"""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT identifier, fact_type,
json_agg(jsonb_build_object(
'appraiser_name', appraiser_name,
'appraiser_side', appraiser_side,
'details', details,
'page_number', page_number,
'document_id', document_id
) ORDER BY
CASE appraiser_side
WHEN 'committee' THEN 1
WHEN 'appellant' THEN 2
WHEN 'deciding' THEN 3
ELSE 4
END,
appraiser_name
) AS entries,
COUNT(DISTINCT appraiser_name) AS n_appraisers
FROM appraiser_facts
WHERE case_id = $1
GROUP BY identifier, fact_type
HAVING COUNT(DISTINCT appraiser_name) > 1""",
case_id,
)
conflicts = []
for r in rows:
entries = r["entries"]
if isinstance(entries, str):
entries = json.loads(entries)
# Parse nested details if still strings
for e in entries:
if isinstance(e.get("details"), str):
e["details"] = json.loads(e["details"])
conflicts.append({
"identifier": r["identifier"],
"fact_type": r["fact_type"],
"n_appraisers": r["n_appraisers"],
"entries": entries,
})
return conflicts
# ── V7: External precedent library + halachot ─────────────────────
def _row_to_case_law(row: asyncpg.Record) -> dict:
"""Normalize a case_law row, parsing subject_tags JSONB to list."""
d = dict(row)
if isinstance(d.get("subject_tags"), str):
try:
d["subject_tags"] = json.loads(d["subject_tags"])
except (TypeError, ValueError):
d["subject_tags"] = []
if d.get("date") is not None:
d["date"] = d["date"].isoformat()
return d
async def get_case_law(case_law_id: UUID) -> dict | None:
pool = await get_pool()
row = await pool.fetchrow(
"SELECT * FROM case_law WHERE id = $1", case_law_id,
)
return _row_to_case_law(row) if row else None
async def get_case_law_by_citation(case_number: str) -> dict | None:
pool = await get_pool()
row = await pool.fetchrow(
"SELECT * FROM case_law WHERE case_number = $1", case_number,
)
return _row_to_case_law(row) if row else None
async def create_external_case_law(
case_number: str,
case_name: str,
full_text: str,
court: str = "",
decision_date: date | None = None,
practice_area: str = "",
appeal_subtype: str = "",
subject_tags: list[str] | None = None,
summary: str = "",
headnote: str = "",
key_quote: str = "",
source_url: str = "",
source_type: str = "",
precedent_level: str = "",
is_binding: bool = True,
document_id: UUID | None = None,
) -> dict:
"""Insert a chair-uploaded external precedent into case_law.
If a row with this ``case_number`` already exists with
source_kind='cited_only' (auto-discovered), promote it to
source_kind='external_upload' and fill in the missing fields.
"""
pool = await get_pool()
tags_json = json.dumps(subject_tags or [], ensure_ascii=False)
async with pool.acquire() as conn:
existing = await conn.fetchrow(
"SELECT id, source_kind FROM case_law WHERE case_number = $1",
case_number,
)
if existing:
row = await conn.fetchrow(
"""
UPDATE case_law SET
case_name = $2,
court = COALESCE(NULLIF($3, ''), court),
date = COALESCE($4, date),
practice_area = $5,
appeal_subtype = $6,
subject_tags = $7,
summary = COALESCE(NULLIF($8, ''), summary),
headnote = $9,
key_quote = COALESCE(NULLIF($10, ''), key_quote),
full_text = $11,
source_url = COALESCE(NULLIF($12, ''), source_url),
source_type = $13,
precedent_level = $14,
is_binding = $15,
document_id = COALESCE($16, document_id),
source_kind = 'external_upload',
extraction_status = 'processing',
halacha_extraction_status = 'pending'
WHERE id = $1
RETURNING *
""",
existing["id"], case_name, court, decision_date,
practice_area, appeal_subtype, tags_json, summary, headnote,
key_quote, full_text, source_url, source_type,
precedent_level, is_binding, document_id,
)
else:
row = await conn.fetchrow(
"""
INSERT INTO case_law (
case_number, case_name, court, date, subject_tags,
summary, key_quote, full_text, source_url,
source_kind, document_id, extraction_status,
halacha_extraction_status, practice_area, appeal_subtype,
headnote, source_type, precedent_level, is_binding
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9,
'external_upload', $10, 'processing', 'pending',
$11, $12, $13, $14, $15, $16
)
RETURNING *
""",
case_number, case_name, court, decision_date, tags_json,
summary, key_quote, full_text, source_url,
document_id, practice_area, appeal_subtype, headnote,
source_type, precedent_level, is_binding,
)
return _row_to_case_law(row)
async def update_case_law(case_law_id: UUID, **fields) -> dict | None:
"""Patch metadata fields on a case_law row.
Allowed fields: case_name, court, date, practice_area, appeal_subtype,
subject_tags, summary, headnote, key_quote, source_url, source_type,
precedent_level, is_binding.
"""
allowed = {
"case_name", "court", "date", "practice_area", "appeal_subtype",
"subject_tags", "summary", "headnote", "key_quote", "source_url",
"source_type", "precedent_level", "is_binding",
}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return await get_case_law(case_law_id)
pool = await get_pool()
set_parts = []
params: list = [case_law_id]
for i, (k, v) in enumerate(updates.items(), start=2):
if k == "subject_tags":
v = json.dumps(v or [], ensure_ascii=False)
set_parts.append(f"{k} = ${i}")
params.append(v)
sql = f"UPDATE case_law SET {', '.join(set_parts)} WHERE id = $1 RETURNING *"
row = await pool.fetchrow(sql, *params)
return _row_to_case_law(row) if row else None
async def set_case_law_extraction_status(case_law_id: UUID, status: str) -> None:
"""Set text-extraction status. When transitioning to a terminal state
('completed'/'failed') we also NULL ``metadata_extraction_requested_at``
so the local-MCP queue (`process_pending_extractions`, which scans by
``WHERE *_requested_at IS NOT NULL``) doesn't re-pick the row forever
and leave the row blocked in the UI's `isPrecedentActive` check."""
pool = await get_pool()
if status in ("completed", "failed"):
await pool.execute(
"UPDATE case_law SET extraction_status = $2, "
"metadata_extraction_requested_at = NULL WHERE id = $1",
case_law_id, status,
)
else:
await pool.execute(
"UPDATE case_law SET extraction_status = $2 WHERE id = $1",
case_law_id, status,
)
async def set_case_law_halacha_status(case_law_id: UUID, status: str) -> None:
"""Set halacha-extraction status. Mirrors ``set_case_law_extraction_status``:
on terminal states we also clear ``halacha_extraction_requested_at`` so the
queue and UI don't see a stale request flag."""
pool = await get_pool()
if status in ("completed", "failed"):
await pool.execute(
"UPDATE case_law SET halacha_extraction_status = $2, "
"halacha_extraction_requested_at = NULL WHERE id = $1",
case_law_id, status,
)
else:
await pool.execute(
"UPDATE case_law SET halacha_extraction_status = $2 WHERE id = $1",
case_law_id, status,
)
async def list_external_case_law(
practice_area: str = "",
court: str = "",
precedent_level: str = "",
source_type: str = "",
search: str = "",
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List chair-uploaded precedents, with simple filters."""
pool = await get_pool()
conditions = ["source_kind = 'external_upload'"]
params: list = []
idx = 1
if practice_area:
conditions.append(f"practice_area = ${idx}")
params.append(practice_area)
idx += 1
if court:
conditions.append(f"court ILIKE ${idx}")
params.append(f"%{court}%")
idx += 1
if precedent_level:
conditions.append(f"precedent_level = ${idx}")
params.append(precedent_level)
idx += 1
if source_type:
conditions.append(f"source_type = ${idx}")
params.append(source_type)
idx += 1
if search:
conditions.append(
f"(case_number ILIKE ${idx} OR case_name ILIKE ${idx} "
f"OR summary ILIKE ${idx} OR headnote ILIKE ${idx})"
)
params.append(f"%{search}%")
idx += 1
where_sql = " AND ".join(conditions)
params.extend([limit, offset])
sql = f"""
SELECT id, case_number, case_name, court, date, practice_area,
appeal_subtype, source_type, precedent_level, is_binding,
summary, headnote, subject_tags, source_kind,
extraction_status, halacha_extraction_status,
metadata_extraction_requested_at,
halacha_extraction_requested_at,
created_at,
(SELECT COUNT(*) FROM halachot h WHERE h.case_law_id = case_law.id) AS halachot_count,
(SELECT COUNT(*) FROM halachot h WHERE h.case_law_id = case_law.id
AND h.review_status IN ('approved', 'published')) AS approved_count
FROM case_law
WHERE {where_sql}
ORDER BY created_at DESC
LIMIT ${idx} OFFSET ${idx + 1}
"""
rows = await pool.fetch(sql, *params)
out = []
for r in rows:
d = _row_to_case_law(r)
# Render timestamps as ISO strings so the JSON layer stays simple
for k in ("metadata_extraction_requested_at", "halacha_extraction_requested_at"):
if d.get(k) is not None:
d[k] = d[k].isoformat()
out.append(d)
return out
async def delete_case_law(case_law_id: UUID) -> bool:
"""Delete a precedent and cascade chunks + halachot."""
pool = await get_pool()
result = await pool.execute(
"DELETE FROM case_law WHERE id = $1", case_law_id,
)
return result == "DELETE 1"
async def store_precedent_chunks(
case_law_id: UUID, chunks: list[dict],
) -> int:
"""Replace precedent chunks for a case_law row.
Each chunk dict has: chunk_index, content, section_type, page_number,
embedding (list[float] or None).
"""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM precedent_chunks WHERE case_law_id = $1",
case_law_id,
)
for c in chunks:
await conn.execute(
"""INSERT INTO precedent_chunks
(case_law_id, chunk_index, content, section_type,
page_number, embedding)
VALUES ($1, $2, $3, $4, $5, $6)""",
case_law_id,
c["chunk_index"],
c["content"],
c.get("section_type", "other"),
c.get("page_number"),
c.get("embedding"),
)
return len(chunks)
async def list_precedent_chunks(
case_law_id: UUID,
section_types: tuple[str, ...] | None = None,
) -> list[dict]:
pool = await get_pool()
if section_types:
rows = await pool.fetch(
"""SELECT id, chunk_index, content, section_type, page_number
FROM precedent_chunks
WHERE case_law_id = $1 AND section_type = ANY($2::text[])
ORDER BY chunk_index""",
case_law_id, list(section_types),
)
else:
rows = await pool.fetch(
"""SELECT id, chunk_index, content, section_type, page_number
FROM precedent_chunks
WHERE case_law_id = $1
ORDER BY chunk_index""",
case_law_id,
)
return [dict(r) for r in rows]
async def delete_halachot(case_law_id: UUID) -> int:
pool = await get_pool()
result = await pool.execute(
"DELETE FROM halachot WHERE case_law_id = $1", case_law_id,
)
# result is e.g. "DELETE 5" — extract the number.
try:
return int(result.split()[-1])
except (ValueError, IndexError):
return 0
async def store_halachot(case_law_id: UUID, halachot: list[dict]) -> int:
"""Bulk-insert extracted halachot.
Each halacha enters with review_status determined by extractor
confidence vs ``config.HALACHA_AUTO_APPROVE_THRESHOLD``:
- confidence >= threshold → 'approved' (visible to search immediately)
- else → 'pending_review' (chair must approve manually)
The auto-approval reviewer is recorded as 'auto' for traceability.
"""
if not halachot:
return 0
threshold = config.HALACHA_AUTO_APPROVE_THRESHOLD
pool = await get_pool()
async with pool.acquire() as conn:
for i, h in enumerate(halachot):
confidence = float(h.get("confidence", 0.0))
auto_approve = confidence >= threshold
review_status = "approved" if auto_approve else "pending_review"
reviewer = (
f"auto-approved (confidence ≥ {threshold:.2f})"
if auto_approve else None
)
reviewed_at_clause = "now()" if auto_approve else "NULL"
await conn.execute(
f"""INSERT INTO halachot
(case_law_id, halacha_index, rule_statement, rule_type,
reasoning_summary, supporting_quote, page_reference,
practice_areas, subject_tags, cites, confidence,
quote_verified, embedding, review_status,
reviewer, reviewed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
$12, $13, $14, $15, {reviewed_at_clause})""",
case_law_id,
i,
h["rule_statement"],
h.get("rule_type", "binding"),
h.get("reasoning_summary", ""),
h["supporting_quote"],
h.get("page_reference", ""),
h.get("practice_areas", []),
h.get("subject_tags", []),
h.get("cites", []),
confidence,
h.get("quote_verified", False),
h.get("embedding"),
review_status,
reviewer,
)
return len(halachot)
async def list_halachot(
case_law_id: UUID | None = None,
review_status: str | None = None,
practice_area: str | None = None,
limit: int = 200,
offset: int = 0,
) -> list[dict]:
pool = await get_pool()
conditions = []
params: list = []
idx = 1
if case_law_id is not None:
conditions.append(f"h.case_law_id = ${idx}")
params.append(case_law_id)
idx += 1
if review_status:
conditions.append(f"h.review_status = ${idx}")
params.append(review_status)
idx += 1
if practice_area:
conditions.append(f"${idx} = ANY(h.practice_areas)")
params.append(practice_area)
idx += 1
where_sql = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.extend([limit, offset])
sql = f"""
SELECT h.id, h.case_law_id, h.halacha_index, h.rule_statement,
h.rule_type, h.reasoning_summary, h.supporting_quote,
h.page_reference, h.practice_areas, h.subject_tags,
h.cites, h.confidence, h.quote_verified, h.review_status,
h.reviewer, h.reviewed_at, h.created_at, h.updated_at,
cl.case_number, cl.case_name, cl.court, cl.date AS decision_date,
cl.precedent_level
FROM halachot h
LEFT JOIN case_law cl ON cl.id = h.case_law_id
{where_sql}
ORDER BY h.case_law_id, h.halacha_index
LIMIT ${idx} OFFSET ${idx + 1}
"""
rows = await pool.fetch(sql, *params)
out = []
for r in rows:
d = dict(r)
if d.get("decision_date") is not None:
d["decision_date"] = d["decision_date"].isoformat()
out.append(d)
return out
async def update_halacha(
halacha_id: UUID,
review_status: str | None = None,
reviewer: str = "",
rule_statement: str | None = None,
reasoning_summary: str | None = None,
subject_tags: list[str] | None = None,
practice_areas: list[str] | None = None,
) -> dict | None:
"""Update a halacha — used by the chair to approve/reject/edit."""
pool = await get_pool()
set_parts: list[str] = []
params: list = [halacha_id]
idx = 2
if review_status is not None:
set_parts.append(f"review_status = ${idx}")
params.append(review_status)
idx += 1
if review_status in ("approved", "rejected", "published"):
set_parts.append(f"reviewed_at = now()")
set_parts.append(f"reviewer = ${idx}")
params.append(reviewer)
idx += 1
if rule_statement is not None:
set_parts.append(f"rule_statement = ${idx}")
params.append(rule_statement)
idx += 1
if reasoning_summary is not None:
set_parts.append(f"reasoning_summary = ${idx}")
params.append(reasoning_summary)
idx += 1
if subject_tags is not None:
set_parts.append(f"subject_tags = ${idx}")
params.append(subject_tags)
idx += 1
if practice_areas is not None:
set_parts.append(f"practice_areas = ${idx}")
params.append(practice_areas)
idx += 1
if not set_parts:
return None
set_parts.append("updated_at = now()")
# Exclude `embedding` — it's a numpy.ndarray of np.float32 that breaks
# FastAPI's jsonable_encoder downstream (PATCH /api/halachot/{id}).
# Callers that need it (none today) can re-fetch with get_halacha.
sql = f"""
UPDATE halachot SET {', '.join(set_parts)} WHERE id = $1
RETURNING id, case_law_id, halacha_index, rule_statement, rule_type,
reasoning_summary, supporting_quote, page_reference,
practice_areas, subject_tags, cites, confidence,
quote_verified, review_status, reviewer, reviewed_at,
created_at, updated_at
"""
row = await pool.fetchrow(sql, *params)
return dict(row) if row else None
async def search_precedent_library_semantic(
query_embedding: list[float],
practice_area: str = "",
court: str = "",
precedent_level: str = "",
appeal_subtype: str = "",
is_binding: bool | None = None,
subject_tag: str = "",
limit: int = 10,
include_halachot: bool = True,
) -> list[dict]:
"""Semantic search over chair-uploaded precedents.
Returns merged halachot + chunks. Halachot are pre-distilled rules, so
they get a small score boost. Only ``approved`` / ``published`` halachot
are visible (per chair-review policy). Chunks are visible regardless
of halacha review status.
"""
pool = await get_pool()
halacha_filters = ["h.review_status IN ('approved', 'published')"]
chunk_filters = ["cl.source_kind = 'external_upload'"]
h_params: list = [query_embedding, limit]
c_params: list = [query_embedding, limit]
h_idx = 3
c_idx = 3
if practice_area:
halacha_filters.append(f"${h_idx} = ANY(h.practice_areas)")
h_params.append(practice_area)
h_idx += 1
chunk_filters.append(f"cl.practice_area = ${c_idx}")
c_params.append(practice_area)
c_idx += 1
if court:
halacha_filters.append(f"cl.court ILIKE ${h_idx}")
h_params.append(f"%{court}%")
h_idx += 1
chunk_filters.append(f"cl.court ILIKE ${c_idx}")
c_params.append(f"%{court}%")
c_idx += 1
if precedent_level:
halacha_filters.append(f"cl.precedent_level = ${h_idx}")
h_params.append(precedent_level)
h_idx += 1
chunk_filters.append(f"cl.precedent_level = ${c_idx}")
c_params.append(precedent_level)
c_idx += 1
if appeal_subtype:
halacha_filters.append(f"cl.appeal_subtype = ${h_idx}")
h_params.append(appeal_subtype)
h_idx += 1
chunk_filters.append(f"cl.appeal_subtype = ${c_idx}")
c_params.append(appeal_subtype)
c_idx += 1
if is_binding is not None:
halacha_filters.append(f"cl.is_binding = ${h_idx}")
h_params.append(is_binding)
h_idx += 1
chunk_filters.append(f"cl.is_binding = ${c_idx}")
c_params.append(is_binding)
c_idx += 1
if subject_tag:
halacha_filters.append(f"${h_idx} = ANY(h.subject_tags)")
h_params.append(subject_tag)
h_idx += 1
halacha_sql = f"""
SELECT h.id AS halacha_id, h.case_law_id, h.rule_statement,
h.reasoning_summary, h.supporting_quote, h.page_reference,
h.practice_areas, h.subject_tags, h.confidence, h.rule_type,
cl.case_number, cl.case_name, cl.court, cl.date AS decision_date,
cl.precedent_level,
1 - (h.embedding <=> $1) AS score
FROM halachot h
JOIN case_law cl ON cl.id = h.case_law_id
WHERE {' AND '.join(halacha_filters)}
AND h.embedding IS NOT NULL
ORDER BY h.embedding <=> $1
LIMIT $2
"""
chunk_sql = f"""
SELECT pc.id AS chunk_id, pc.case_law_id, pc.content,
pc.section_type, pc.page_number,
cl.case_number, cl.case_name, cl.court, cl.date AS decision_date,
cl.precedent_level, cl.practice_area,
1 - (pc.embedding <=> $1) AS score
FROM precedent_chunks pc
JOIN case_law cl ON cl.id = pc.case_law_id
WHERE {' AND '.join(chunk_filters)}
AND pc.embedding IS NOT NULL
ORDER BY pc.embedding <=> $1
LIMIT $2
"""
results: list[dict] = []
if include_halachot:
rows = await pool.fetch(halacha_sql, *h_params)
for r in rows:
d = dict(r)
if d.get("decision_date") is not None:
d["decision_date"] = d["decision_date"].isoformat()
d["score"] = float(d["score"]) + 0.05 # rule-level boost
d["type"] = "halacha"
results.append(d)
rows = await pool.fetch(chunk_sql, *c_params)
for r in rows:
d = dict(r)
if d.get("decision_date") is not None:
d["decision_date"] = d["decision_date"].isoformat()
d["score"] = float(d["score"])
d["type"] = "passage"
results.append(d)
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
async def precedent_library_stats() -> dict:
"""Aggregate stats for the /precedents stats tab."""
pool = await get_pool()
async with pool.acquire() as conn:
total = await conn.fetchval(
"SELECT COUNT(*) FROM case_law WHERE source_kind = 'external_upload'"
)
by_practice = await conn.fetch(
"""SELECT practice_area, COUNT(*) AS n
FROM case_law
WHERE source_kind = 'external_upload'
GROUP BY practice_area
ORDER BY n DESC"""
)
by_level = await conn.fetch(
"""SELECT precedent_level, COUNT(*) AS n
FROM case_law
WHERE source_kind = 'external_upload'
GROUP BY precedent_level
ORDER BY n DESC"""
)
halachot_total = await conn.fetchval(
"SELECT COUNT(*) FROM halachot"
)
halachot_pending = await conn.fetchval(
"SELECT COUNT(*) FROM halachot WHERE review_status = 'pending_review'"
)
halachot_approved = await conn.fetchval(
"SELECT COUNT(*) FROM halachot WHERE review_status IN ('approved', 'published')"
)
return {
"precedents_total": int(total or 0),
"by_practice_area": [
{"practice_area": r["practice_area"], "count": int(r["n"])}
for r in by_practice
],
"by_precedent_level": [
{"precedent_level": r["precedent_level"], "count": int(r["n"])}
for r in by_level
],
"halachot_total": int(halachot_total or 0),
"halachot_pending": int(halachot_pending or 0),
"halachot_approved": int(halachot_approved or 0),
}
# ── V8: extraction request queue helpers ─────────────────────────
async def request_metadata_extraction(case_law_id: UUID) -> bool:
"""Stamp ``metadata_extraction_requested_at`` for the local MCP worker
to pick up. Returns False if the row is missing."""
pool = await get_pool()
result = await pool.execute(
"UPDATE case_law SET metadata_extraction_requested_at = now() "
"WHERE id = $1 AND source_kind = 'external_upload'",
case_law_id,
)
return result == "UPDATE 1"
async def request_halacha_extraction(case_law_id: UUID) -> bool:
"""Same but for halacha extraction."""
pool = await get_pool()
result = await pool.execute(
"UPDATE case_law SET halacha_extraction_requested_at = now() "
"WHERE id = $1 AND source_kind = 'external_upload'",
case_law_id,
)
return result == "UPDATE 1"
async def list_pending_extraction_requests(
kind: str = "metadata", # 'metadata' | 'halacha'
limit: int = 20,
) -> list[dict]:
"""Return rows requesting extraction, oldest request first.
The MCP worker drains the queue in order: process → clear timestamp.
"""
col = (
"metadata_extraction_requested_at"
if kind == "metadata"
else "halacha_extraction_requested_at"
)
pool = await get_pool()
rows = await pool.fetch(
f"""SELECT id, case_number, case_name, court, date,
practice_area, is_binding, {col} AS requested_at
FROM case_law
WHERE {col} IS NOT NULL
AND source_kind = 'external_upload'
ORDER BY {col} ASC
LIMIT $1""",
limit,
)
out = []
for r in rows:
d = dict(r)
if d.get("date") is not None:
d["date"] = d["date"].isoformat()
if d.get("requested_at") is not None:
d["requested_at"] = d["requested_at"].isoformat()
out.append(d)
return out
async def clear_extraction_request(
case_law_id: UUID, kind: str = "metadata",
) -> None:
col = (
"metadata_extraction_requested_at"
if kind == "metadata"
else "halacha_extraction_requested_at"
)
pool = await get_pool()
await pool.execute(
f"UPDATE case_law SET {col} = NULL WHERE id = $1",
case_law_id,
)
# ── V9: Multimodal page image embeddings ─────────────────────────
async def store_document_image_embeddings(
document_id: UUID,
case_id: UUID | None,
page_records: list[dict],
model_name: str = "voyage-multimodal-3",
) -> int:
"""Replace per-page image embeddings for a document.
Each ``page_records`` entry: ``{page_number, embedding, image_thumbnail_path}``.
Embeddings should already be 1024-dim lists (or None for skipped pages).
"""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM document_image_embeddings WHERE document_id = $1",
document_id,
)
for r in page_records:
await conn.execute(
"""INSERT INTO document_image_embeddings
(document_id, case_id, page_number, embedding,
image_thumbnail_path, model_name)
VALUES ($1, $2, $3, $4, $5, $6)""",
document_id, case_id,
r["page_number"],
r.get("embedding"),
r.get("image_thumbnail_path"),
model_name,
)
return len(page_records)
async def store_precedent_image_embeddings(
case_law_id: UUID,
page_records: list[dict],
model_name: str = "voyage-multimodal-3",
) -> int:
"""Same pattern as store_document_image_embeddings but for precedents."""
pool = await get_pool()
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM precedent_image_embeddings WHERE case_law_id = $1",
case_law_id,
)
for r in page_records:
await conn.execute(
"""INSERT INTO precedent_image_embeddings
(case_law_id, page_number, embedding,
image_thumbnail_path, model_name)
VALUES ($1, $2, $3, $4, $5)""",
case_law_id,
r["page_number"],
r.get("embedding"),
r.get("image_thumbnail_path"),
model_name,
)
return len(page_records)
async def search_document_images_similar(
query_embedding: list[float],
limit: int = 10,
case_id: UUID | None = None,
practice_area: str | None = None,
appeal_subtype: str | None = None,
) -> list[dict]:
"""Cosine search over per-page image embeddings of case documents."""
pool = await get_pool()
conditions: list[str] = []
params: list = [query_embedding, limit]
idx = 3
if case_id:
conditions.append(f"die.case_id = ${idx}")
params.append(case_id); idx += 1
if practice_area:
conditions.append(f"c.practice_area = ${idx}")
params.append(practice_area); idx += 1
if appeal_subtype:
conditions.append(f"c.appeal_subtype = ${idx}")
params.append(appeal_subtype); idx += 1
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
sql = f"""
SELECT die.document_id, die.case_id, die.page_number,
die.image_thumbnail_path,
d.title AS document_title,
c.case_number,
1 - (die.embedding <=> $1) AS score
FROM document_image_embeddings die
JOIN documents d ON d.id = die.document_id
JOIN cases c ON c.id = die.case_id
{where}
ORDER BY die.embedding <=> $1
LIMIT $2
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(r) for r in rows]
async def search_precedent_images_similar(
query_embedding: list[float],
limit: int = 10,
practice_area: str = "",
court: str = "",
precedent_level: str = "",
appeal_subtype: str = "",
is_binding: bool | None = None,
) -> list[dict]:
"""Cosine search over per-page image embeddings of precedent rulings."""
pool = await get_pool()
conditions: list[str] = ["cl.source_kind = 'external_upload'"]
params: list = [query_embedding, limit]
idx = 3
if practice_area:
conditions.append(f"cl.practice_area = ${idx}")
params.append(practice_area); idx += 1
if court:
conditions.append(f"cl.court ILIKE ${idx}")
params.append(f"%{court}%"); idx += 1
if precedent_level:
conditions.append(f"cl.precedent_level = ${idx}")
params.append(precedent_level); idx += 1
if appeal_subtype:
conditions.append(f"cl.appeal_subtype = ${idx}")
params.append(appeal_subtype); idx += 1
if is_binding is not None:
conditions.append(f"cl.is_binding = ${idx}")
params.append(is_binding); idx += 1
where = " AND ".join(conditions)
sql = f"""
SELECT pie.case_law_id, pie.page_number, pie.image_thumbnail_path,
cl.case_number, cl.case_name, cl.court, cl.date AS decision_date,
cl.precedent_level, cl.practice_area,
1 - (pie.embedding <=> $1) AS score
FROM precedent_image_embeddings pie
JOIN case_law cl ON cl.id = pie.case_law_id
WHERE {where}
ORDER BY pie.embedding <=> $1
LIMIT $2
"""
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
out = []
for r in rows:
d = dict(r)
if d.get("decision_date") is not None:
d["decision_date"] = d["decision_date"].isoformat()
out.append(d)
return out
async def search_similar_hybrid(
query_text_embedding: list[float],
query_image_embedding: list[float],
limit: int = 10,
fetch_k: int = 30,
text_weight: float = 0.65,
case_id: UUID | None = None,
section_type: str | None = None,
practice_area: str | None = None,
appeal_subtype: str | None = None,
) -> list[dict]:
"""Weighted merge of text-chunk and per-page image search.
Same (document_id, page_number) → boost text chunk by image score
on that page. Image-only pages with no overlapping text chunk are
surfaced as ``match_type='image'`` so dense scanned content still
appears in results.
"""
img_weight = 1.0 - text_weight
text_rows = await search_similar(
query_text_embedding, limit=fetch_k, case_id=case_id,
section_type=section_type, practice_area=practice_area,
appeal_subtype=appeal_subtype,
)
img_rows = await search_document_images_similar(
query_image_embedding, limit=fetch_k, case_id=case_id,
practice_area=practice_area, appeal_subtype=appeal_subtype,
)
img_by_page: dict[tuple, dict] = {
(str(r["document_id"]), r["page_number"]): r for r in img_rows
}
seen: set = set()
merged: list[dict] = []
for r in text_rows:
page = r.get("page_number")
key = (str(r["document_id"]), page) if page is not None else None
img_hit = img_by_page.get(key) if key else None
text_score = float(r["score"])
image_score = float(img_hit["score"]) if img_hit else 0.0
d = dict(r)
d["text_score"] = text_score
d["image_score"] = image_score
d["score"] = text_score * text_weight + image_score * img_weight
d["match_type"] = "text+image" if img_hit else "text"
if img_hit:
d["image_thumbnail_path"] = img_hit.get("image_thumbnail_path")
merged.append(d)
if key:
seen.add(key)
for r in img_rows:
key = (str(r["document_id"]), r["page_number"])
if key in seen:
continue
d = dict(r)
d["text_score"] = 0.0
d["image_score"] = float(r["score"])
d["score"] = float(r["score"]) * img_weight
d["match_type"] = "image"
d["content"] = ""
d["section_type"] = "image"
merged.append(d)
merged.sort(key=lambda x: -x["score"])
return merged[:limit]
async def search_precedent_library_hybrid(
query_text_embedding: list[float],
query_image_embedding: list[float],
limit: int = 10,
fetch_k: int = 30,
text_weight: float = 0.65,
practice_area: str = "",
court: str = "",
precedent_level: str = "",
appeal_subtype: str = "",
is_binding: bool | None = None,
subject_tag: str = "",
include_halachot: bool = True,
) -> list[dict]:
"""Hybrid variant of search_precedent_library_semantic.
Halachot have no ``page_number`` — they're boosted by the max
image score from any page in the same case_law row.
"""
img_weight = 1.0 - text_weight
text_results = await search_precedent_library_semantic(
query_text_embedding,
practice_area=practice_area, court=court,
precedent_level=precedent_level, appeal_subtype=appeal_subtype,
is_binding=is_binding, subject_tag=subject_tag,
limit=fetch_k, include_halachot=include_halachot,
)
img_results = await search_precedent_images_similar(
query_image_embedding, limit=fetch_k,
practice_area=practice_area, court=court,
precedent_level=precedent_level, appeal_subtype=appeal_subtype,
is_binding=is_binding,
)
img_by_page: dict[tuple, dict] = {}
img_by_case: dict[str, float] = {}
for r in img_results:
cid = str(r["case_law_id"])
img_by_page[(cid, r["page_number"])] = r
img_by_case[cid] = max(img_by_case.get(cid, 0.0), float(r["score"]))
seen: set = set()
merged: list[dict] = []
for r in text_results:
cid = str(r["case_law_id"])
page = r.get("page_number")
key = (cid, page) if page is not None else None
img_hit = img_by_page.get(key) if key else None
if img_hit:
image_score = float(img_hit["score"])
elif r.get("type") == "halacha":
image_score = img_by_case.get(cid, 0.0)
else:
image_score = 0.0
text_score = float(r["score"])
d = dict(r)
d["text_score"] = text_score
d["image_score"] = image_score
d["score"] = text_score * text_weight + image_score * img_weight
if img_hit:
d["image_thumbnail_path"] = img_hit.get("image_thumbnail_path")
if key:
seen.add(key)
merged.append(d)
for r in img_results:
key = (str(r["case_law_id"]), r["page_number"])
if key in seen:
continue
d = dict(r)
d["text_score"] = 0.0
d["image_score"] = float(r["score"])
d["score"] = float(r["score"]) * img_weight
d["type"] = "image_page"
d["content"] = ""
d["section_type"] = "image"
merged.append(d)
merged.sort(key=lambda x: -x["score"])
return merged[:limit]