"""Database service - asyncpg connection pool and queries.""" from __future__ import annotations import json import logging from datetime import date from uuid import UUID, uuid4 import asyncpg from pgvector.asyncpg import register_vector from legal_mcp import config logger = logging.getLogger(__name__) _pool: asyncpg.Pool | None = None async def get_pool() -> asyncpg.Pool: global _pool if _pool is None: # First, ensure pgvector extension exists (before registering type codec) conn = await asyncpg.connect(config.POSTGRES_URL) await conn.execute('CREATE EXTENSION IF NOT EXISTS vector') await conn.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"') await conn.close() _pool = await asyncpg.create_pool( config.POSTGRES_URL, min_size=2, max_size=10, init=_init_connection, ) return _pool async def _init_connection(conn: asyncpg.Connection) -> None: await register_vector(conn) async def close_pool() -> None: global _pool if _pool: await _pool.close() _pool = None # ── Schema ────────────────────────────────────────────────────────── SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS cases ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), case_number TEXT UNIQUE NOT NULL, title TEXT NOT NULL, appellants JSONB DEFAULT '[]', respondents JSONB DEFAULT '[]', subject TEXT DEFAULT '', property_address TEXT DEFAULT '', permit_number TEXT DEFAULT '', committee_type TEXT DEFAULT 'ועדה מקומית', status TEXT DEFAULT 'new', hearing_date DATE, decision_date DATE, tags JSONB DEFAULT '[]', notes TEXT DEFAULT '', created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now() ); CREATE TABLE IF NOT EXISTS documents ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), case_id UUID REFERENCES cases(id) ON DELETE CASCADE, doc_type TEXT NOT NULL, title TEXT NOT NULL, file_path TEXT NOT NULL, extracted_text TEXT DEFAULT '', extraction_status TEXT DEFAULT 'pending', page_count INTEGER, metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT now() ); CREATE TABLE IF NOT EXISTS document_chunks ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), document_id UUID REFERENCES documents(id) ON DELETE CASCADE, case_id UUID REFERENCES cases(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, content TEXT NOT NULL, section_type TEXT DEFAULT 'other', embedding vector(1024), page_number INTEGER, created_at TIMESTAMPTZ DEFAULT now() ); CREATE TABLE IF NOT EXISTS style_corpus ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), document_id UUID REFERENCES documents(id) ON DELETE SET NULL, decision_number TEXT, decision_date DATE, subject_categories JSONB DEFAULT '[]', full_text TEXT NOT NULL, summary TEXT DEFAULT '', outcome TEXT DEFAULT '', key_principles JSONB DEFAULT '[]', created_at TIMESTAMPTZ DEFAULT now() ); CREATE TABLE IF NOT EXISTS style_patterns ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), pattern_type TEXT NOT NULL, pattern_text TEXT NOT NULL, frequency INTEGER DEFAULT 1, context TEXT DEFAULT '', examples JSONB DEFAULT '[]', created_at TIMESTAMPTZ DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_chunks_embedding ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); CREATE INDEX IF NOT EXISTS idx_chunks_case ON document_chunks(case_id); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON document_chunks(document_id); CREATE INDEX IF NOT EXISTS idx_docs_case ON documents(case_id); CREATE INDEX IF NOT EXISTS idx_cases_status ON cases(status); CREATE INDEX IF NOT EXISTS idx_cases_number ON cases(case_number); """ MIGRATIONS_SQL = """ ALTER TABLE cases ADD COLUMN IF NOT EXISTS expected_outcome TEXT DEFAULT ''; CREATE TABLE IF NOT EXISTS audit_log ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), action TEXT NOT NULL, case_id UUID REFERENCES cases(id) ON DELETE SET NULL, document_id UUID REFERENCES documents(id) ON DELETE SET NULL, details JSONB DEFAULT '{}', actor TEXT DEFAULT 'system', created_at TIMESTAMPTZ DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_audit_case ON audit_log(case_id); CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_log(action); CREATE INDEX IF NOT EXISTS idx_audit_created ON audit_log(created_at DESC); """ # ── Phase 3: Workflow expansion ──────────────────────────────────── SCHEMA_V3_SQL = """ -- הרחבת decisions עם שדות חדשים ALTER TABLE decisions ADD COLUMN IF NOT EXISTS direction_doc JSONB DEFAULT NULL; ALTER TABLE decisions ADD COLUMN IF NOT EXISTS outcome_reasoning TEXT DEFAULT ''; -- הרחבת cases עם appeal_type (אם לא קיים) ALTER TABLE cases ADD COLUMN IF NOT EXISTS appeal_type TEXT DEFAULT ''; -- טבלת qa_results CREATE TABLE IF NOT EXISTS qa_results ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE, case_id UUID REFERENCES cases(id) ON DELETE CASCADE, check_name TEXT NOT NULL, passed BOOLEAN NOT NULL, severity TEXT DEFAULT 'warning', errors JSONB DEFAULT '[]', details TEXT DEFAULT '', created_at TIMESTAMPTZ DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_qa_results_decision ON qa_results(decision_id); CREATE INDEX IF NOT EXISTS idx_qa_results_case ON qa_results(case_id); -- טבלת decision_definitions (אם לא קיימת) CREATE TABLE IF NOT EXISTS decision_definitions ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE, term TEXT NOT NULL, definition TEXT NOT NULL, block_id TEXT DEFAULT 'block-he', created_at TIMESTAMPTZ DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_definitions_decision ON decision_definitions(decision_id); -- טבלת appeal_type_rules (אם לא קיימת) CREATE TABLE IF NOT EXISTS appeal_type_rules ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), appeal_type TEXT NOT NULL, rule_category TEXT NOT NULL, rule_key TEXT NOT NULL, rule_value JSONB NOT NULL, description TEXT DEFAULT '', created_at TIMESTAMPTZ DEFAULT now(), UNIQUE(appeal_type, rule_category, rule_key) ); -- image_placeholders על decision_blocks ALTER TABLE decision_blocks ADD COLUMN IF NOT EXISTS image_placeholders JSONB DEFAULT '[]'; """ # ── Phase 4: Practice area separation (multi-tenant axis) ────────── SCHEMA_V4_SQL = """ -- ═══════════════════════════════════════════════════════════════════ -- practice_area = top-level legal domain (multi-tenant axis): -- appeals_committee | national_insurance | labor_law | ... -- appeal_subtype = refines within practice_area: -- building_permit | betterment_levy | compensation_197 | unknown -- Both columns are denormalized to documents/chunks/decisions/style_corpus -- so vector searches can filter without expensive JOINs. -- ═══════════════════════════════════════════════════════════════════ ALTER TABLE cases ADD COLUMN IF NOT EXISTS practice_area TEXT; ALTER TABLE cases ADD COLUMN IF NOT EXISTS appeal_subtype TEXT; ALTER TABLE documents ADD COLUMN IF NOT EXISTS practice_area TEXT; ALTER TABLE documents ADD COLUMN IF NOT EXISTS appeal_subtype TEXT; ALTER TABLE document_chunks ADD COLUMN IF NOT EXISTS practice_area TEXT; ALTER TABLE document_chunks ADD COLUMN IF NOT EXISTS appeal_subtype TEXT; ALTER TABLE decisions ADD COLUMN IF NOT EXISTS practice_area TEXT; ALTER TABLE decisions ADD COLUMN IF NOT EXISTS appeal_subtype TEXT; ALTER TABLE style_corpus ADD COLUMN IF NOT EXISTS practice_area TEXT; ALTER TABLE style_corpus ADD COLUMN IF NOT EXISTS appeal_subtype TEXT; CREATE INDEX IF NOT EXISTS idx_cases_practice ON cases(practice_area, appeal_subtype); CREATE INDEX IF NOT EXISTS idx_chunks_practice ON document_chunks(practice_area); CREATE INDEX IF NOT EXISTS idx_corpus_practice ON style_corpus(practice_area, appeal_subtype); CREATE INDEX IF NOT EXISTS idx_decisions_practice ON decisions(practice_area); -- Backfill (idempotent — only fills NULLs) UPDATE cases SET practice_area = 'appeals_committee' WHERE practice_area IS NULL; UPDATE cases SET appeal_subtype = CASE WHEN case_number ~ '^1[0-9]{3}' THEN 'building_permit' WHEN case_number ~ '^8[0-9]{3}' THEN 'betterment_levy' WHEN case_number ~ '^9[0-9]{3}' THEN 'compensation_197' ELSE 'unknown' END WHERE appeal_subtype IS NULL; UPDATE documents d SET practice_area = c.practice_area, appeal_subtype = c.appeal_subtype FROM cases c WHERE d.case_id = c.id AND d.practice_area IS NULL; UPDATE document_chunks dc SET practice_area = c.practice_area, appeal_subtype = c.appeal_subtype FROM cases c WHERE dc.case_id = c.id AND dc.practice_area IS NULL; UPDATE decisions de SET practice_area = c.practice_area, appeal_subtype = c.appeal_subtype FROM cases c WHERE de.case_id = c.id AND de.practice_area IS NULL; -- All existing style_corpus entries are דפנה's appeals-committee decisions UPDATE style_corpus SET practice_area = 'appeals_committee' WHERE practice_area IS NULL; -- Training corpus documents/chunks have case_id = NULL. All historical -- training material is from דפנה's appeals committee, so default them. UPDATE documents SET practice_area = 'appeals_committee' WHERE case_id IS NULL AND practice_area IS NULL; UPDATE document_chunks dc SET practice_area = d.practice_area, appeal_subtype = d.appeal_subtype FROM documents d WHERE dc.document_id = d.id AND dc.practice_area IS NULL; """ # ── Phase 2: Decision + Knowledge + RAG layers ──────────────────── SCHEMA_V2_SQL = """ -- ═══════════════════════════════════════════════════════════════════ -- Layer 2: Decision -- ═══════════════════════════════════════════════════════════════════ -- decisions: מטאדטה של החלטה (גרסה אחת = רשומה אחת) CREATE TABLE IF NOT EXISTS decisions ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), case_id UUID REFERENCES cases(id) ON DELETE CASCADE, version INTEGER DEFAULT 1, status TEXT DEFAULT 'draft', -- draft/review/final/published outcome TEXT DEFAULT '', -- rejected/accepted/partial outcome_summary TEXT DEFAULT '', -- תמצית תוצאה (שורה אחת) total_paragraphs INTEGER DEFAULT 0, total_words INTEGER DEFAULT 0, decision_date DATE, author TEXT DEFAULT 'דפנה תמיר', panel_members JSONB DEFAULT '[]', created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now(), UNIQUE(case_id, version) ); -- decision_blocks: 12 בלוקים לפי block-schema.md CREATE TABLE IF NOT EXISTS decision_blocks ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE, block_id TEXT NOT NULL, -- block-alef, block-bet, ... block-yod-bet block_index INTEGER NOT NULL, -- 1-12 title TEXT DEFAULT '', -- כותרת הבלוק (ריק לבלוקים ללא כותרת) content TEXT DEFAULT '', -- תוכן מלא (markdown) word_count INTEGER DEFAULT 0, weight_percent NUMERIC(5,2) DEFAULT 0, -- משקל בפועל (%) generation_type TEXT DEFAULT '', -- template-fill/reproduction/paraphrase/... model_used TEXT DEFAULT '', -- sonnet/opus/script temperature NUMERIC(3,2) DEFAULT 0, status TEXT DEFAULT 'empty', -- empty/draft/review/final notes TEXT DEFAULT '', created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now(), UNIQUE(decision_id, block_id) ); -- decision_paragraphs: סעיפים בודדים עם מעקב ציטוטים CREATE TABLE IF NOT EXISTS decision_paragraphs ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), block_id UUID REFERENCES decision_blocks(id) ON DELETE CASCADE, paragraph_number INTEGER NOT NULL, -- מספור רציף בתוך ההחלטה content TEXT NOT NULL, word_count INTEGER DEFAULT 0, citations JSONB DEFAULT '[]', -- [{case_law_id, text, type}] cross_references JSONB DEFAULT '[]', -- הפניות לסעיפים אחרים ["סעיף 5 לעיל"] created_at TIMESTAMPTZ DEFAULT now() ); -- claims: טענות צדדים (בלוק ז) CREATE TABLE IF NOT EXISTS claims ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), case_id UUID REFERENCES cases(id) ON DELETE CASCADE, party_role TEXT NOT NULL, -- appellant/respondent/permit_applicant/committee party_name TEXT DEFAULT '', claim_text TEXT NOT NULL, claim_index INTEGER DEFAULT 0, -- סדר הופעה source_document TEXT DEFAULT '', -- מאיזה מסמך חולצה הטענה addressed_in_paragraph INTEGER, -- באיזה סעיף בדיון נענתה created_at TIMESTAMPTZ DEFAULT now() ); -- ═══════════════════════════════════════════════════════════════════ -- Layer 3: Legal Knowledge -- ═══════════════════════════════════════════════════════════════════ -- case_law: פסיקה (תקדימים) CREATE TABLE IF NOT EXISTS case_law ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), case_number TEXT UNIQUE NOT NULL, -- עע"מ 3975/22 או ערר 1011-03-25 case_name TEXT NOT NULL, -- שם קצר: "ב. קרן-נכסים" court TEXT DEFAULT '', -- בג"ץ / עליון / מנהלי / ועדת ערר date DATE, subject_tags JSONB DEFAULT '[]', -- ["proprietary_claims", "parking"] summary TEXT DEFAULT '', -- תמצית 2-3 משפטים key_quote TEXT DEFAULT '', -- ציטוט מרכזי full_text TEXT DEFAULT '', -- טקסט מלא אם זמין source_url TEXT DEFAULT '', created_at TIMESTAMPTZ DEFAULT now() ); -- case_law_citations: קשרים בין פסיקה להחלטות שלנו CREATE TABLE IF NOT EXISTS case_law_citations ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), case_law_id UUID REFERENCES case_law(id) ON DELETE CASCADE, decision_id UUID REFERENCES decisions(id) ON DELETE CASCADE, paragraph_id UUID REFERENCES decision_paragraphs(id) ON DELETE SET NULL, citation_type TEXT DEFAULT 'support', -- support/distinguish/overrule/obiter context_text TEXT DEFAULT '', -- ההקשר שבו צוטט created_at TIMESTAMPTZ DEFAULT now() ); -- statutory_provisions: חקיקה נפוצה CREATE TABLE IF NOT EXISTS statutory_provisions ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), statute_name TEXT NOT NULL, -- "חוק התכנון והבנייה" section_number TEXT NOT NULL, -- "152(א)(2)" section_title TEXT DEFAULT '', -- "זכות ערר" full_text TEXT DEFAULT '', -- נוסח הסעיף common_usage TEXT DEFAULT '', -- מתי משתמשים subject_tags JSONB DEFAULT '[]', created_at TIMESTAMPTZ DEFAULT now(), UNIQUE(statute_name, section_number) ); -- transition_phrases: ביטויי מעבר של דפנה CREATE TABLE IF NOT EXISTS transition_phrases ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), phrase TEXT UNIQUE NOT NULL, -- "ועל מנת לא לצאת בחסר" usage_context TEXT DEFAULT '', -- מתי להשתמש block_types JSONB DEFAULT '[]', -- באילו בלוקים: ["block-yod"] frequency INTEGER DEFAULT 1, -- כמה פעמים ראינו source_decision TEXT DEFAULT '', -- מאיזו החלטה created_at TIMESTAMPTZ DEFAULT now() ); -- lessons_learned: לקחים מהשוואת טיוטות לגרסאות סופיות CREATE TABLE IF NOT EXISTS lessons_learned ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), lesson_title TEXT NOT NULL, -- "Discussion = continuous essay, no sub-headers" lesson_text TEXT NOT NULL, -- תיאור מלא category TEXT DEFAULT '', -- structure/style/content/process applies_to JSONB DEFAULT '[]', -- ["block-yod", "all"] source_case TEXT DEFAULT '', -- "הכט 1180-1181" severity TEXT DEFAULT 'important', -- critical/important/nice-to-have created_at TIMESTAMPTZ DEFAULT now() ); -- ═══════════════════════════════════════════════════════════════════ -- Layer 4: Extended RAG -- ═══════════════════════════════════════════════════════════════════ -- paragraph_embeddings: embeddings של סעיפים בהחלטות CREATE TABLE IF NOT EXISTS paragraph_embeddings ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), paragraph_id UUID REFERENCES decision_paragraphs(id) ON DELETE CASCADE, embedding vector(1024), created_at TIMESTAMPTZ DEFAULT now() ); -- case_law_embeddings: embeddings של פסיקה CREATE TABLE IF NOT EXISTS case_law_embeddings ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), case_law_id UUID REFERENCES case_law(id) ON DELETE CASCADE, chunk_text TEXT NOT NULL, embedding vector(1024), created_at TIMESTAMPTZ DEFAULT now() ); -- ═══════════════════════════════════════════════════════════════════ -- Indexes -- ═══════════════════════════════════════════════════════════════════ CREATE INDEX IF NOT EXISTS idx_decisions_case ON decisions(case_id); CREATE INDEX IF NOT EXISTS idx_decisions_status ON decisions(status); CREATE INDEX IF NOT EXISTS idx_decision_blocks_decision ON decision_blocks(decision_id); CREATE INDEX IF NOT EXISTS idx_decision_blocks_block_id ON decision_blocks(block_id); CREATE INDEX IF NOT EXISTS idx_decision_paragraphs_block ON decision_paragraphs(block_id); CREATE INDEX IF NOT EXISTS idx_claims_case ON claims(case_id); CREATE INDEX IF NOT EXISTS idx_claims_role ON claims(party_role); CREATE INDEX IF NOT EXISTS idx_case_law_subject ON case_law USING gin(subject_tags); CREATE INDEX IF NOT EXISTS idx_case_law_citations_decision ON case_law_citations(decision_id); CREATE INDEX IF NOT EXISTS idx_statutory_provisions_statute ON statutory_provisions(statute_name); CREATE INDEX IF NOT EXISTS idx_transition_phrases_block ON transition_phrases USING gin(block_types); CREATE INDEX IF NOT EXISTS idx_lessons_category ON lessons_learned(category); CREATE INDEX IF NOT EXISTS idx_paragraph_embeddings_vec ON paragraph_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50); CREATE INDEX IF NOT EXISTS idx_case_law_embeddings_vec ON case_law_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50); """ async def init_schema() -> None: pool = await get_pool() async with pool.acquire() as conn: await conn.execute(SCHEMA_SQL) await conn.execute(MIGRATIONS_SQL) await conn.execute(SCHEMA_V2_SQL) await conn.execute(SCHEMA_V3_SQL) await conn.execute(SCHEMA_V4_SQL) logger.info("Database schema initialized (v1 + v2 + v3 + v4)") # ── Case CRUD ─────────────────────────────────────────────────────── async def create_case( case_number: str, title: str, appellants: list[str] | None = None, respondents: list[str] | None = None, subject: str = "", property_address: str = "", permit_number: str = "", committee_type: str = "ועדה מקומית", hearing_date: date | None = None, notes: str = "", expected_outcome: str = "", practice_area: str = "appeals_committee", appeal_subtype: str | None = None, ) -> dict: pool = await get_pool() case_id = uuid4() async with pool.acquire() as conn: await conn.execute( """INSERT INTO cases (id, case_number, title, appellants, respondents, subject, property_address, permit_number, committee_type, hearing_date, notes, expected_outcome, practice_area, appeal_subtype) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)""", case_id, case_number, title, json.dumps(appellants or []), json.dumps(respondents or []), subject, property_address, permit_number, committee_type, hearing_date, notes, expected_outcome, practice_area, appeal_subtype, ) return await get_case(case_id) async def get_case_practice_area(case_id: UUID) -> tuple[str | None, str | None]: """Return (practice_area, appeal_subtype) for a case, or (None, None) if missing.""" pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT practice_area, appeal_subtype FROM cases WHERE id = $1", case_id ) if row is None: return None, None return row["practice_area"], row["appeal_subtype"] async def get_case_practice_area_by_number(case_number: str) -> tuple[str | None, str | None]: pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT practice_area, appeal_subtype FROM cases WHERE case_number = $1", case_number, ) if row is None: return None, None return row["practice_area"], row["appeal_subtype"] async def get_case(case_id: UUID) -> dict | None: pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow("SELECT * FROM cases WHERE id = $1", case_id) if row is None: return None return _row_to_case(row) async def get_case_by_number(case_number: str) -> dict | None: pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT * FROM cases WHERE case_number = $1", case_number ) if row is None: return None return _row_to_case(row) async def list_cases(status: str | None = None, limit: int = 50) -> list[dict]: pool = await get_pool() async with pool.acquire() as conn: if status: rows = await conn.fetch( "SELECT * FROM cases WHERE status = $1 ORDER BY updated_at DESC LIMIT $2", status, limit, ) else: rows = await conn.fetch( "SELECT * FROM cases ORDER BY updated_at DESC LIMIT $1", limit ) return [_row_to_case(r) for r in rows] async def update_case(case_id: UUID, **fields) -> dict | None: if not fields: return await get_case(case_id) pool = await get_pool() set_clauses = [] values = [] for i, (key, val) in enumerate(fields.items(), start=2): if key in ("appellants", "respondents", "tags"): val = json.dumps(val) set_clauses.append(f"{key} = ${i}") values.append(val) set_clauses.append("updated_at = now()") sql = f"UPDATE cases SET {', '.join(set_clauses)} WHERE id = $1" async with pool.acquire() as conn: await conn.execute(sql, case_id, *values) return await get_case(case_id) def _row_to_case(row: asyncpg.Record) -> dict: d = dict(row) for field in ("appellants", "respondents", "tags"): if isinstance(d.get(field), str): d[field] = json.loads(d[field]) d["id"] = str(d["id"]) return d # ── Document CRUD ─────────────────────────────────────────────────── async def create_document( case_id: UUID | None, doc_type: str, title: str, file_path: str, page_count: int | None = None, practice_area: str | None = None, appeal_subtype: str | None = None, ) -> dict: pool = await get_pool() doc_id = uuid4() async with pool.acquire() as conn: # If practice_area not explicitly given, inherit from the parent case # (for case-bound documents). Training corpus passes case_id=None and # provides the practice_area directly. if practice_area is None and case_id is not None: case_row = await conn.fetchrow( "SELECT practice_area, appeal_subtype FROM cases WHERE id = $1", case_id, ) if case_row: practice_area = case_row["practice_area"] appeal_subtype = case_row["appeal_subtype"] await conn.execute( """INSERT INTO documents (id, case_id, doc_type, title, file_path, page_count, practice_area, appeal_subtype) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)""", doc_id, case_id, doc_type, title, file_path, page_count, practice_area, appeal_subtype, ) row = await conn.fetchrow("SELECT * FROM documents WHERE id = $1", doc_id) return _row_to_doc(row) async def update_document(doc_id: UUID, **fields) -> None: if not fields: return pool = await get_pool() set_clauses = [] values = [] for i, (key, val) in enumerate(fields.items(), start=2): if key == "metadata": val = json.dumps(val) set_clauses.append(f"{key} = ${i}") values.append(val) sql = f"UPDATE documents SET {', '.join(set_clauses)} WHERE id = $1" async with pool.acquire() as conn: await conn.execute(sql, doc_id, *values) async def get_document(doc_id: UUID) -> dict | None: pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow("SELECT * FROM documents WHERE id = $1", doc_id) return _row_to_doc(row) if row else None async def list_documents(case_id: UUID) -> list[dict]: pool = await get_pool() async with pool.acquire() as conn: rows = await conn.fetch( "SELECT * FROM documents WHERE case_id = $1 ORDER BY created_at", case_id ) return [_row_to_doc(r) for r in rows] async def get_document_text(doc_id: UUID) -> str: pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT extracted_text FROM documents WHERE id = $1", doc_id ) return row["extracted_text"] if row else "" def _row_to_doc(row: asyncpg.Record) -> dict: d = dict(row) d["id"] = str(d["id"]) d["case_id"] = str(d["case_id"]) if isinstance(d.get("metadata"), str): d["metadata"] = json.loads(d["metadata"]) return d # ── Claims ───────────────────────────────────────────────────────── async def store_claims(case_id: UUID, claims: list[dict], source_document: str = "") -> int: """Store extracted claims. Replaces existing claims from same source. Each claim dict: party_role, claim_text, claim_index, party_name (optional) """ pool = await get_pool() async with pool.acquire() as conn: if source_document: await conn.execute( "DELETE FROM claims WHERE case_id = $1 AND source_document = $2", case_id, source_document, ) for claim in claims: await conn.execute( """INSERT INTO claims (case_id, party_role, party_name, claim_text, claim_index, source_document, claim_type) VALUES ($1, $2, $3, $4, $5, $6, $7)""", case_id, claim["party_role"], claim.get("party_name", ""), claim["claim_text"], claim.get("claim_index", 0), source_document, claim.get("claim_type", "claim"), ) return len(claims) async def get_claims(case_id: UUID, party_role: str | None = None) -> list[dict]: """Get claims for a case, optionally filtered by party role.""" pool = await get_pool() async with pool.acquire() as conn: if party_role: rows = await conn.fetch( "SELECT * FROM claims WHERE case_id = $1 AND party_role = $2 ORDER BY claim_index", case_id, party_role, ) else: rows = await conn.fetch( "SELECT * FROM claims WHERE case_id = $1 ORDER BY party_role, claim_index", case_id, ) return [dict(r) for r in rows] # ── Decisions ────────────────────────────────────────────────────── async def create_decision( case_id: UUID, outcome: str = "", outcome_summary: str = "", outcome_reasoning: str = "", direction_doc: dict | None = None, ) -> dict: """Create a decision record for a case.""" pool = await get_pool() decision_id = uuid4() async with pool.acquire() as conn: # Check if a decision already exists for this case existing = await conn.fetchrow( "SELECT id, version FROM decisions WHERE case_id = $1 ORDER BY version DESC LIMIT 1", case_id, ) version = (existing["version"] + 1) if existing else 1 case_row = await conn.fetchrow( "SELECT practice_area, appeal_subtype FROM cases WHERE id = $1", case_id ) practice_area = case_row["practice_area"] if case_row else None appeal_subtype = case_row["appeal_subtype"] if case_row else None await conn.execute( """INSERT INTO decisions (id, case_id, version, outcome, outcome_summary, outcome_reasoning, direction_doc, practice_area, appeal_subtype) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)""", decision_id, case_id, version, outcome, outcome_summary, outcome_reasoning, json.dumps(direction_doc) if direction_doc else None, practice_area, appeal_subtype, ) return await get_decision(decision_id) async def get_decision(decision_id: UUID) -> dict | None: pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow("SELECT * FROM decisions WHERE id = $1", decision_id) if not row: return None d = dict(row) d["id"] = str(d["id"]) d["case_id"] = str(d["case_id"]) if isinstance(d.get("direction_doc"), str): d["direction_doc"] = json.loads(d["direction_doc"]) if isinstance(d.get("panel_members"), str): d["panel_members"] = json.loads(d["panel_members"]) return d async def get_decision_by_case(case_id: UUID) -> dict | None: """Get the latest decision for a case.""" pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT * FROM decisions WHERE case_id = $1 ORDER BY version DESC LIMIT 1", case_id, ) if not row: return None d = dict(row) d["id"] = str(d["id"]) d["case_id"] = str(d["case_id"]) if isinstance(d.get("direction_doc"), str): d["direction_doc"] = json.loads(d["direction_doc"]) if isinstance(d.get("panel_members"), str): d["panel_members"] = json.loads(d["panel_members"]) return d async def update_decision(decision_id: UUID, **fields) -> None: if not fields: return pool = await get_pool() set_clauses = [] values = [] for i, (key, val) in enumerate(fields.items(), start=2): if key in ("direction_doc", "panel_members") and isinstance(val, (dict, list)): val = json.dumps(val) set_clauses.append(f"{key} = ${i}") values.append(val) set_clauses.append("updated_at = now()") sql = f"UPDATE decisions SET {', '.join(set_clauses)} WHERE id = $1" async with pool.acquire() as conn: await conn.execute(sql, decision_id, *values) # ── Chunks & Vectors ─────────────────────────────────────────────── async def delete_document_chunks(document_id: UUID) -> int: """Delete all chunks for a document (used before reprocessing).""" pool = await get_pool() async with pool.acquire() as conn: result = await conn.execute( "DELETE FROM document_chunks WHERE document_id = $1", document_id ) return int(result.split()[-1]) # e.g. "DELETE 5" -> 5 async def store_chunks( document_id: UUID, case_id: UUID | None, chunks: list[dict], practice_area: str | None = None, appeal_subtype: str | None = None, ) -> int: """Store document chunks with embeddings. Each chunk dict has: content, section_type, embedding (list[float]), page_number, chunk_index. practice_area defaults to the parent case's value, or — when case_id is None (training corpus) — falls back to the parent document's value so vector search can still filter cleanly. """ pool = await get_pool() async with pool.acquire() as conn: # Resolve practice_area in priority order: explicit > case > document. if practice_area is None: if case_id is not None: case_row = await conn.fetchrow( "SELECT practice_area, appeal_subtype FROM cases WHERE id = $1", case_id, ) if case_row: practice_area = case_row["practice_area"] appeal_subtype = case_row["appeal_subtype"] if practice_area is None: doc_row = await conn.fetchrow( "SELECT practice_area, appeal_subtype FROM documents WHERE id = $1", document_id, ) if doc_row: practice_area = doc_row["practice_area"] appeal_subtype = doc_row["appeal_subtype"] # Delete existing chunks for this document await conn.execute( "DELETE FROM document_chunks WHERE document_id = $1", document_id ) for chunk in chunks: await conn.execute( """INSERT INTO document_chunks (document_id, case_id, chunk_index, content, section_type, embedding, page_number, practice_area, appeal_subtype) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)""", document_id, case_id, chunk["chunk_index"], chunk["content"], chunk.get("section_type", "other"), chunk["embedding"], chunk.get("page_number"), practice_area, appeal_subtype, ) return len(chunks) async def search_similar( query_embedding: list[float], limit: int = 10, case_id: UUID | None = None, section_type: str | None = None, practice_area: str | None = None, appeal_subtype: str | None = None, ) -> list[dict]: """Cosine similarity search on document chunks. Filter by practice_area to keep precedents from the same legal domain (e.g. don't surface betterment-levy chunks when working on building permits). Uses the denormalized column on document_chunks — no JOIN. """ pool = await get_pool() conditions = [] params: list = [query_embedding, limit] param_idx = 3 if case_id: conditions.append(f"dc.case_id = ${param_idx}") params.append(case_id) param_idx += 1 if section_type: conditions.append(f"dc.section_type = ${param_idx}") params.append(section_type) param_idx += 1 if practice_area: conditions.append(f"dc.practice_area = ${param_idx}") params.append(practice_area) param_idx += 1 if appeal_subtype: conditions.append(f"dc.appeal_subtype = ${param_idx}") params.append(appeal_subtype) param_idx += 1 where = f"WHERE {' AND '.join(conditions)}" if conditions else "" sql = f""" SELECT dc.content, dc.section_type, dc.page_number, dc.document_id, dc.case_id, d.title AS document_title, c.case_number, 1 - (dc.embedding <=> $1) AS score FROM document_chunks dc JOIN documents d ON d.id = dc.document_id JOIN cases c ON c.id = dc.case_id {where} ORDER BY dc.embedding <=> $1 LIMIT $2 """ async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) return [dict(r) for r in rows] # ── Style corpus ──────────────────────────────────────────────────── async def add_to_style_corpus( document_id: UUID | None, decision_number: str, decision_date: date | None, subject_categories: list[str], full_text: str, summary: str = "", outcome: str = "", key_principles: list[str] | None = None, practice_area: str = "appeals_committee", appeal_subtype: str | None = None, ) -> UUID: pool = await get_pool() corpus_id = uuid4() async with pool.acquire() as conn: await conn.execute( """INSERT INTO style_corpus (id, document_id, decision_number, decision_date, subject_categories, full_text, summary, outcome, key_principles, practice_area, appeal_subtype) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)""", corpus_id, document_id, decision_number, decision_date, json.dumps(subject_categories), full_text, summary, outcome, json.dumps(key_principles or []), practice_area, appeal_subtype, ) return corpus_id async def delete_from_style_corpus(corpus_id: UUID) -> dict: """Remove a decision from style_corpus + related documents (cascades chunks). Also tries to delete the [קורפוס] document associated by title match, since the current training pipeline inserts style_corpus with document_id=NULL. """ pool = await get_pool() async with pool.acquire() as conn: async with conn.transaction(): row = await conn.fetchrow( "DELETE FROM style_corpus WHERE id = $1 " "RETURNING decision_number, document_id", corpus_id, ) if not row: return {"deleted": False, "reason": "not found"} docs_deleted = 0 if row["document_id"]: await conn.execute( "DELETE FROM documents WHERE id = $1", row["document_id"] ) docs_deleted = 1 else: # Best-effort: match a [קורפוס] document by the decision_number # in its title. Only for single, unambiguous matches. if row["decision_number"]: docs = await conn.fetch( "SELECT id FROM documents " "WHERE case_id IS NULL AND title LIKE $1", f"%{row['decision_number']}%", ) if len(docs) == 1: await conn.execute( "DELETE FROM documents WHERE id = $1", docs[0]["id"] ) docs_deleted = 1 return { "deleted": True, "decision_number": row["decision_number"], "docs_deleted": docs_deleted, } async def get_style_patterns(pattern_type: str | None = None) -> list[dict]: pool = await get_pool() async with pool.acquire() as conn: if pattern_type: rows = await conn.fetch( "SELECT * FROM style_patterns WHERE pattern_type = $1 ORDER BY frequency DESC", pattern_type, ) else: rows = await conn.fetch( "SELECT * FROM style_patterns ORDER BY pattern_type, frequency DESC" ) return [dict(r) for r in rows] async def upsert_style_pattern( pattern_type: str, pattern_text: str, context: str = "", examples: list[str] | None = None, ) -> None: pool = await get_pool() async with pool.acquire() as conn: existing = await conn.fetchrow( "SELECT id, frequency FROM style_patterns WHERE pattern_type = $1 AND pattern_text = $2", pattern_type, pattern_text, ) if existing: await conn.execute( "UPDATE style_patterns SET frequency = frequency + 1 WHERE id = $1", existing["id"], ) else: await conn.execute( """INSERT INTO style_patterns (pattern_type, pattern_text, context, examples) VALUES ($1, $2, $3, $4)""", pattern_type, pattern_text, context, json.dumps(examples or []), ) async def clear_style_patterns() -> None: """Delete all existing style patterns (used before re-analysis).""" pool = await get_pool() async with pool.acquire() as conn: await conn.execute("DELETE FROM style_patterns") # ── Semantic Search (V2 — decision blocks & case law) ───────────── async def search_similar_paragraphs( query_embedding: list[float], limit: int = 10, block_type: str | None = None, practice_area: str | None = None, appeal_subtype: str | None = None, ) -> list[dict]: """Search decision paragraphs by semantic similarity. Filtering by practice_area uses the denormalized column on `decisions` so we don't pull, e.g., betterment-levy paragraphs when writing a building-permit decision. """ pool = await get_pool() conditions = [] params: list = [query_embedding, limit] param_idx = 3 if block_type: conditions.append(f"db.block_id = ${param_idx}") params.append(block_type) param_idx += 1 if practice_area: conditions.append(f"d.practice_area = ${param_idx}") params.append(practice_area) param_idx += 1 if appeal_subtype: conditions.append(f"d.appeal_subtype = ${param_idx}") params.append(appeal_subtype) param_idx += 1 where = f"WHERE {' AND '.join(conditions)}" if conditions else "" sql = f""" SELECT dp.content, dp.word_count, dp.paragraph_number, db.block_id AS block_type, db.title AS block_title, c.case_number, c.title AS case_title, d.outcome, d.author, 1 - (pe.embedding <=> $1) AS score FROM paragraph_embeddings pe JOIN decision_paragraphs dp ON dp.id = pe.paragraph_id JOIN decision_blocks db ON db.id = dp.block_id JOIN decisions d ON d.id = db.decision_id JOIN cases c ON c.id = d.case_id {where} ORDER BY pe.embedding <=> $1 LIMIT $2 """ async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) return [dict(r) for r in rows] async def search_similar_case_law( query_embedding: list[float], limit: int = 5, ) -> list[dict]: """Search case law by semantic similarity.""" pool = await get_pool() sql = """ SELECT cl.case_number, cl.case_name, cl.court, cl.summary, cl.key_quote, cl.subject_tags, cle.chunk_text, 1 - (cle.embedding <=> $1) AS score FROM case_law_embeddings cle JOIN case_law cl ON cl.id = cle.case_law_id ORDER BY cle.embedding <=> $1 LIMIT $2 """ async with pool.acquire() as conn: rows = await conn.fetch(sql, query_embedding, limit) results = [] for r in rows: d = dict(r) if isinstance(d.get("subject_tags"), str): d["subject_tags"] = json.loads(d["subject_tags"]) results.append(d) return results async def search_precedents( query_embedding: list[float], limit: int = 10, ) -> list[dict]: """Combined search: paragraphs + case law, ranked by score.""" paragraphs = await search_similar_paragraphs(query_embedding, limit=limit) case_law = await search_similar_case_law(query_embedding, limit=limit) # Combine and sort by score results = [] for p in paragraphs: results.append({ "type": "decision_paragraph", "score": float(p["score"]), "case_number": p["case_number"], "case_title": p["case_title"], "block_type": p["block_type"], "content": p["content"][:500], "author": p["author"], }) for c in case_law: results.append({ "type": "case_law", "score": float(c["score"]), "case_number": c["case_number"], "case_name": c["case_name"], "court": c["court"], "content": c["summary"], }) results.sort(key=lambda x: x["score"], reverse=True) return results[:limit]