feat(ingest): atomic ON CONFLICT upsert in create_*_case_law (GAP-03, FU-2a)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-30 20:44:31 +00:00
parent 2b91173f25
commit cd0f6cda0a

View File

@@ -2581,41 +2581,14 @@ async def create_external_case_law(
pool = await get_pool() pool = await get_pool()
tags_json = json.dumps(subject_tags or [], ensure_ascii=False) tags_json = json.dumps(subject_tags or [], ensure_ascii=False)
async with pool.acquire() as conn: async with pool.acquire() as conn:
existing = await conn.fetchrow( # Atomic upsert on the V15 partial unique index
"SELECT id, source_kind FROM case_law WHERE case_number = $1", # uq_case_law_external_number (case_number) WHERE source_kind <> 'internal_committee'.
case_number, # The predicate is repeated in ON CONFLICT (required for partial indexes).
) # This also subsumes the old cited_only→external_upload promotion: a
if existing: # cited_only row with the same case_number conflicts and is promoted by
row = await conn.fetchrow( # DO UPDATE. Scoped to the external partial index, so an internal row with
""" # the same number is NOT touched (the old SELECT-without-source_kind could
UPDATE case_law SET # wrongly promote it).
case_name = $2,
court = COALESCE(NULLIF($3, ''), court),
date = COALESCE($4, date),
practice_area = $5,
appeal_subtype = $6,
subject_tags = $7,
summary = COALESCE(NULLIF($8, ''), summary),
headnote = $9,
key_quote = COALESCE(NULLIF($10, ''), key_quote),
full_text = $11,
source_url = COALESCE(NULLIF($12, ''), source_url),
source_type = $13,
precedent_level = $14,
is_binding = $15,
document_id = COALESCE($16, document_id),
source_kind = 'external_upload',
extraction_status = 'processing',
halacha_extraction_status = 'pending'
WHERE id = $1
RETURNING *
""",
existing["id"], case_name, court, decision_date,
practice_area, appeal_subtype, tags_json, summary, headnote,
key_quote, full_text, source_url, source_type,
precedent_level, is_binding, document_id,
)
else:
row = await conn.fetchrow( row = await conn.fetchrow(
""" """
INSERT INTO case_law ( INSERT INTO case_law (
@@ -2629,6 +2602,26 @@ async def create_external_case_law(
'external_upload', $10, 'processing', 'pending', 'external_upload', $10, 'processing', 'pending',
$11, $12, $13, $14, $15, $16 $11, $12, $13, $14, $15, $16
) )
ON CONFLICT (case_number) WHERE source_kind <> 'internal_committee'
DO UPDATE SET
case_name = EXCLUDED.case_name,
court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court),
date = COALESCE(EXCLUDED.date, case_law.date),
practice_area = EXCLUDED.practice_area,
appeal_subtype = EXCLUDED.appeal_subtype,
subject_tags = EXCLUDED.subject_tags,
summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary),
headnote = EXCLUDED.headnote,
key_quote = COALESCE(NULLIF(EXCLUDED.key_quote, ''), case_law.key_quote),
full_text = EXCLUDED.full_text,
source_url = COALESCE(NULLIF(EXCLUDED.source_url, ''), case_law.source_url),
source_type = EXCLUDED.source_type,
precedent_level = EXCLUDED.precedent_level,
is_binding = EXCLUDED.is_binding,
document_id = COALESCE(EXCLUDED.document_id, case_law.document_id),
source_kind = 'external_upload',
extraction_status = 'processing',
halacha_extraction_status = 'pending'
RETURNING * RETURNING *
""", """,
case_number, case_name, court, decision_date, tags_json, case_number, case_name, court, decision_date, tags_json,
@@ -2665,40 +2658,10 @@ async def create_internal_committee_decision(
case_number = _canonical_case_number(case_number) case_number = _canonical_case_number(case_number)
tags_json = json.dumps(subject_tags or [], ensure_ascii=False) tags_json = json.dumps(subject_tags or [], ensure_ascii=False)
async with pool.acquire() as conn: async with pool.acquire() as conn:
existing = await conn.fetchrow( # Atomic upsert on V15 partial unique index
"SELECT id FROM case_law " # uq_case_law_internal_number_proc (case_number, proceeding_type)
"WHERE case_number = $1 AND proceeding_type = $2 " # WHERE source_kind = 'internal_committee'. Predicate repeated for the
" AND source_kind = 'internal_committee'", # partial index. Replaces the old SELECT-then-INSERT/UPDATE (race-prone).
case_number, proceeding_type,
)
if existing:
row = await conn.fetchrow(
"""
UPDATE case_law SET
case_name = $2,
court = COALESCE(NULLIF($3, ''), court),
date = COALESCE($4, date),
chair_name = COALESCE(NULLIF($5, ''), chair_name),
district = COALESCE(NULLIF($6, ''), district),
practice_area = $7,
appeal_subtype = $8,
subject_tags = $9,
summary = COALESCE(NULLIF($10, ''), summary),
full_text = $11,
source_type = 'appeals_committee',
source_kind = 'internal_committee',
is_binding = $12,
document_id = COALESCE($13, document_id),
extraction_status = 'processing',
halacha_extraction_status = 'pending'
WHERE id = $1
RETURNING *
""",
existing["id"], case_name, court, decision_date,
chair_name, district, practice_area, appeal_subtype,
tags_json, summary, full_text, is_binding, document_id,
)
else:
row = await conn.fetchrow( row = await conn.fetchrow(
""" """
INSERT INTO case_law ( INSERT INTO case_law (
@@ -2714,6 +2677,25 @@ async def create_internal_committee_decision(
'processing', 'pending', 'processing', 'pending',
$11, $12, $13, $14 $11, $12, $13, $14
) )
ON CONFLICT (case_number, proceeding_type)
WHERE source_kind = 'internal_committee'
DO UPDATE SET
case_name = EXCLUDED.case_name,
court = COALESCE(NULLIF(EXCLUDED.court, ''), case_law.court),
date = COALESCE(EXCLUDED.date, case_law.date),
chair_name = COALESCE(NULLIF(EXCLUDED.chair_name, ''), case_law.chair_name),
district = COALESCE(NULLIF(EXCLUDED.district, ''), case_law.district),
practice_area = EXCLUDED.practice_area,
appeal_subtype = EXCLUDED.appeal_subtype,
subject_tags = EXCLUDED.subject_tags,
summary = COALESCE(NULLIF(EXCLUDED.summary, ''), case_law.summary),
full_text = EXCLUDED.full_text,
source_type = 'appeals_committee',
source_kind = 'internal_committee',
is_binding = EXCLUDED.is_binding,
document_id = COALESCE(EXCLUDED.document_id, case_law.document_id),
extraction_status = 'processing',
halacha_extraction_status = 'pending'
RETURNING * RETURNING *
""", """,
case_number, case_name, court, decision_date, chair_name, district, case_number, case_name, court, decision_date, chair_name, district,