diff --git a/mcp-server/src/legal_mcp/services/db.py b/mcp-server/src/legal_mcp/services/db.py index b8d8a2c..567026b 100644 --- a/mcp-server/src/legal_mcp/services/db.py +++ b/mcp-server/src/legal_mcp/services/db.py @@ -2581,61 +2581,54 @@ async def create_external_case_law( pool = await get_pool() tags_json = json.dumps(subject_tags or [], ensure_ascii=False) async with pool.acquire() as conn: - existing = await conn.fetchrow( - "SELECT id, source_kind FROM case_law WHERE case_number = $1", - case_number, - ) - if existing: - row = await conn.fetchrow( - """ - UPDATE case_law SET - case_name = $2, - court = COALESCE(NULLIF($3, ''), court), - date = COALESCE($4, date), - practice_area = $5, - appeal_subtype = $6, - subject_tags = $7, - summary = COALESCE(NULLIF($8, ''), summary), - headnote = $9, - key_quote = COALESCE(NULLIF($10, ''), key_quote), - full_text = $11, - source_url = COALESCE(NULLIF($12, ''), source_url), - source_type = $13, - precedent_level = $14, - is_binding = $15, - document_id = COALESCE($16, document_id), - source_kind = 'external_upload', - extraction_status = 'processing', - halacha_extraction_status = 'pending' - WHERE id = $1 - RETURNING * - """, - existing["id"], case_name, court, decision_date, - practice_area, appeal_subtype, tags_json, summary, headnote, - key_quote, full_text, source_url, source_type, - precedent_level, is_binding, document_id, - ) - else: - row = await conn.fetchrow( - """ - INSERT INTO case_law ( - case_number, case_name, court, date, subject_tags, - summary, key_quote, full_text, source_url, - source_kind, document_id, extraction_status, - halacha_extraction_status, practice_area, appeal_subtype, - headnote, source_type, precedent_level, is_binding - ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, - 'external_upload', $10, 'processing', 'pending', - $11, $12, $13, $14, $15, $16 - ) - RETURNING * - """, - case_number, case_name, court, decision_date, tags_json, + # Atomic upsert on the V15 partial unique index + # uq_case_law_external_number (case_number) WHERE source_kind <> 'internal_committee'. + # The predicate is repeated in ON CONFLICT (required for partial indexes). + # This also subsumes the old cited_only→external_upload promotion: a + # cited_only row with the same case_number conflicts and is promoted by + # DO UPDATE. Scoped to the external partial index, so an internal row with + # the same number is NOT touched (the old SELECT-without-source_kind could + # wrongly promote it). + row = await conn.fetchrow( + """ + INSERT INTO case_law ( + case_number, case_name, court, date, subject_tags, summary, key_quote, full_text, source_url, - document_id, practice_area, appeal_subtype, headnote, - source_type, precedent_level, is_binding, + source_kind, document_id, extraction_status, + halacha_extraction_status, practice_area, appeal_subtype, + headnote, source_type, precedent_level, is_binding + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + 'external_upload', $10, 'processing', 'pending', + $11, $12, $13, $14, $15, $16 ) + ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee' + DO UPDATE SET + case_name = EXCLUDED.case_name, + court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court), + date = COALESCE(EXCLUDED.date, case_law.date), + practice_area = EXCLUDED.practice_area, + appeal_subtype = EXCLUDED.appeal_subtype, + subject_tags = EXCLUDED.subject_tags, + summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary), + headnote = EXCLUDED.headnote, + key_quote = COALESCE(NULLIF(EXCLUDED.key_quote, ''), case_law.key_quote), + full_text = EXCLUDED.full_text, + source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), case_law.source_url), + source_type = EXCLUDED.source_type, + precedent_level = EXCLUDED.precedent_level, + is_binding = EXCLUDED.is_binding, + document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), + source_kind = 'external_upload', + extraction_status = 'processing', + halacha_extraction_status = 'pending' + RETURNING * + """, + case_number, case_name, court, decision_date, tags_json, + summary, key_quote, full_text, source_url, + document_id, practice_area, appeal_subtype, headnote, + source_type, precedent_level, is_binding, + ) return _row_to_case_law(row) @@ -2665,62 +2658,51 @@ async def create_internal_committee_decision( case_number = _canonical_case_number(case_number) tags_json = json.dumps(subject_tags or [], ensure_ascii=False) async with pool.acquire() as conn: - existing = await conn.fetchrow( - "SELECT id FROM case_law " - "WHERE case_number = $1 AND proceeding_type = $2 " - " AND source_kind = 'internal_committee'", - case_number, proceeding_type, + # Atomic upsert on V15 partial unique index + # uq_case_law_internal_number_proc (case_number, proceeding_type) + # WHERE source_kind = 'internal_committee'. Predicate repeated for the + # partial index. Replaces the old SELECT-then-INSERT/UPDATE (race-prone). + row = await conn.fetchrow( + """ + INSERT INTO case_law ( + case_number, case_name, court, date, chair_name, district, + subject_tags, summary, full_text, + source_kind, source_type, document_id, + extraction_status, halacha_extraction_status, + practice_area, appeal_subtype, is_binding, proceeding_type + ) VALUES ( + $1, $2, $3, $4, $5, $6, + $7, $8, $9, + 'internal_committee', 'appeals_committee', $10, + 'processing', 'pending', + $11, $12, $13, $14 + ) + ON CONFLICT (case_number, proceeding_type) + WHERE source_kind = 'internal_committee' + DO UPDATE SET + case_name = EXCLUDED.case_name, + court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court), + date = COALESCE(EXCLUDED.date, case_law.date), + chair_name = COALESCE(NULLIF(EXCLUDED.chair_name, ''), case_law.chair_name), + district = COALESCE(NULLIF(EXCLUDED.district, ''), case_law.district), + practice_area = EXCLUDED.practice_area, + appeal_subtype = EXCLUDED.appeal_subtype, + subject_tags = EXCLUDED.subject_tags, + summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary), + full_text = EXCLUDED.full_text, + source_type = 'appeals_committee', + source_kind = 'internal_committee', + is_binding = EXCLUDED.is_binding, + document_id = COALESCE(EXCLUDED.document_id, case_law.document_id), + extraction_status = 'processing', + halacha_extraction_status = 'pending' + RETURNING * + """, + case_number, case_name, court, decision_date, chair_name, district, + tags_json, summary, full_text, + document_id, practice_area, appeal_subtype, is_binding, + proceeding_type, ) - if existing: - row = await conn.fetchrow( - """ - UPDATE case_law SET - case_name = $2, - court = COALESCE(NULLIF($3, ''), court), - date = COALESCE($4, date), - chair_name = COALESCE(NULLIF($5, ''), chair_name), - district = COALESCE(NULLIF($6, ''), district), - practice_area = $7, - appeal_subtype = $8, - subject_tags = $9, - summary = COALESCE(NULLIF($10, ''), summary), - full_text = $11, - source_type = 'appeals_committee', - source_kind = 'internal_committee', - is_binding = $12, - document_id = COALESCE($13, document_id), - extraction_status = 'processing', - halacha_extraction_status = 'pending' - WHERE id = $1 - RETURNING * - """, - existing["id"], case_name, court, decision_date, - chair_name, district, practice_area, appeal_subtype, - tags_json, summary, full_text, is_binding, document_id, - ) - else: - row = await conn.fetchrow( - """ - INSERT INTO case_law ( - case_number, case_name, court, date, chair_name, district, - subject_tags, summary, full_text, - source_kind, source_type, document_id, - extraction_status, halacha_extraction_status, - practice_area, appeal_subtype, is_binding, proceeding_type - ) VALUES ( - $1, $2, $3, $4, $5, $6, - $7, $8, $9, - 'internal_committee', 'appeals_committee', $10, - 'processing', 'pending', - $11, $12, $13, $14 - ) - RETURNING * - """, - case_number, case_name, court, decision_date, chair_name, district, - tags_json, summary, full_text, - document_id, practice_area, appeal_subtype, is_binding, - proceeding_type, - ) return _row_to_case_law(row)