Merge pull request 'feat(X13): אחזור-פסיקה אוטומטי מנט המשפט → קורפוס (Tier 0 + scaffold)' (#110) from worktree-court-fetch into main
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m21s
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 1m21s
This commit was merged in pull request #110.
This commit is contained in:
@@ -1352,6 +1352,36 @@ CREATE INDEX IF NOT EXISTS idx_digests_content_tsv ON digests USING gin(content_
|
||||
"""
|
||||
|
||||
|
||||
# ── X13 — Court Verdict Fetch queue ──────────────────────────────────────
|
||||
# A lightweight, observable, idempotent job queue for the auto-fetch
|
||||
# subsystem (docs/spec/X13-court-fetch.md). One row per court verdict we try
|
||||
# to pull from a public source. Mirrors the extraction-queue pattern: status
|
||||
# is always explicit (INV-CF2 — no silent drop), the canonical case number is
|
||||
# the idempotency key (INV-CF5), and ``attempts`` drives the human-fallback
|
||||
# gate (INV-CF3 — flip to 'manual' after N autonomous failures).
|
||||
# V31 — digests (X12) took V30 when it merged first.
|
||||
SCHEMA_V31_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS court_fetch_jobs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
case_number_norm TEXT NOT NULL UNIQUE, -- idempotency key (INV-CF5)
|
||||
citation_raw TEXT NOT NULL DEFAULT '',
|
||||
tier TEXT NOT NULL DEFAULT '', -- supreme | admin | skip
|
||||
court TEXT NOT NULL DEFAULT '',
|
||||
status TEXT NOT NULL DEFAULT 'pending', -- pending|running|done|failed|manual
|
||||
attempts INT NOT NULL DEFAULT 0,
|
||||
error TEXT NOT NULL DEFAULT '',
|
||||
case_law_id UUID REFERENCES case_law(id) ON DELETE SET NULL,
|
||||
digest_id UUID, -- source digest (X12), nullable for ad-hoc
|
||||
source_url TEXT NOT NULL DEFAULT '', -- provenance (INV-CF7)
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_court_fetch_jobs_status ON court_fetch_jobs(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_court_fetch_jobs_digest ON court_fetch_jobs(digest_id)
|
||||
WHERE digest_id IS NOT NULL;
|
||||
"""
|
||||
|
||||
|
||||
async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(SCHEMA_SQL)
|
||||
@@ -1385,7 +1415,8 @@ async def _run_schema_migrations(pool: asyncpg.Pool) -> None:
|
||||
await conn.execute(SCHEMA_V28_SQL)
|
||||
await conn.execute(SCHEMA_V29_SQL)
|
||||
await conn.execute(SCHEMA_V30_SQL)
|
||||
logger.info("Database schema initialized (v1-v30)")
|
||||
await conn.execute(SCHEMA_V31_SQL)
|
||||
logger.info("Database schema initialized (v1-v31)")
|
||||
|
||||
|
||||
async def init_schema() -> None:
|
||||
@@ -5942,3 +5973,110 @@ async def find_missing_precedent_by_citation(
|
||||
citation.strip(),
|
||||
)
|
||||
return _row_to_missing_precedent(row) if row else None
|
||||
|
||||
|
||||
# ── X13 — Court Verdict Fetch jobs ───────────────────────────────────────
|
||||
# CRUD for the auto-fetch queue (docs/spec/X13-court-fetch.md). Status is
|
||||
# always explicit; failures are recorded, never swallowed (INV-CF2). Upsert
|
||||
# is keyed on the canonical case number (INV-CF5).
|
||||
|
||||
def _row_to_court_fetch_job(row) -> dict:
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
async def court_fetch_job_upsert(
|
||||
case_number_norm: str,
|
||||
citation_raw: str = "",
|
||||
tier: str = "",
|
||||
court: str = "",
|
||||
digest_id: UUID | None = None,
|
||||
) -> dict:
|
||||
"""Idempotent create-or-get of a fetch job by canonical case number.
|
||||
|
||||
Re-requesting the same case number returns the existing row (with a
|
||||
``_existing`` flag) rather than creating a duplicate — the canonical
|
||||
number is a UNIQUE key. A job that already reached a terminal state is
|
||||
returned as-is so callers can decide whether to retry.
|
||||
"""
|
||||
if not (case_number_norm or "").strip():
|
||||
raise ValueError("case_number_norm is required")
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
existing = await conn.fetchrow(
|
||||
"SELECT * FROM court_fetch_jobs WHERE case_number_norm = $1",
|
||||
case_number_norm,
|
||||
)
|
||||
if existing:
|
||||
out = _row_to_court_fetch_job(existing)
|
||||
out["_existing"] = True
|
||||
return out
|
||||
row = await conn.fetchrow(
|
||||
"""INSERT INTO court_fetch_jobs
|
||||
(case_number_norm, citation_raw, tier, court, digest_id)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING *""",
|
||||
case_number_norm, citation_raw, tier, court, digest_id,
|
||||
)
|
||||
out = _row_to_court_fetch_job(row)
|
||||
out["_existing"] = False
|
||||
return out
|
||||
|
||||
|
||||
async def court_fetch_job_update(
|
||||
job_id: UUID,
|
||||
*,
|
||||
status: str | None = None,
|
||||
error: str | None = None,
|
||||
case_law_id: UUID | None = None,
|
||||
source_url: str | None = None,
|
||||
bump_attempts: bool = False,
|
||||
) -> dict:
|
||||
"""Patch a job row. Only provided fields change; ``updated_at`` always does."""
|
||||
sets = ["updated_at = now()"]
|
||||
args: list = []
|
||||
if status is not None:
|
||||
args.append(status); sets.append(f"status = ${len(args)}")
|
||||
if error is not None:
|
||||
args.append(error); sets.append(f"error = ${len(args)}")
|
||||
if case_law_id is not None:
|
||||
args.append(case_law_id); sets.append(f"case_law_id = ${len(args)}")
|
||||
if source_url is not None:
|
||||
args.append(source_url); sets.append(f"source_url = ${len(args)}")
|
||||
if bump_attempts:
|
||||
sets.append("attempts = attempts + 1")
|
||||
args.append(job_id)
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
f"UPDATE court_fetch_jobs SET {', '.join(sets)} "
|
||||
f"WHERE id = ${len(args)} RETURNING *",
|
||||
*args,
|
||||
)
|
||||
return _row_to_court_fetch_job(row)
|
||||
|
||||
|
||||
async def court_fetch_job_get(case_number_norm: str) -> dict | None:
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"SELECT * FROM court_fetch_jobs WHERE case_number_norm = $1",
|
||||
case_number_norm,
|
||||
)
|
||||
return _row_to_court_fetch_job(row) if row else None
|
||||
|
||||
|
||||
async def court_fetch_job_list(status: str | None = None, limit: int = 100) -> list[dict]:
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
if status:
|
||||
rows = await conn.fetch(
|
||||
"SELECT * FROM court_fetch_jobs WHERE status = $1 "
|
||||
"ORDER BY created_at DESC LIMIT $2",
|
||||
status, limit,
|
||||
)
|
||||
else:
|
||||
rows = await conn.fetch(
|
||||
"SELECT * FROM court_fetch_jobs ORDER BY created_at DESC LIMIT $1",
|
||||
limit,
|
||||
)
|
||||
return [_row_to_court_fetch_job(r) for r in rows]
|
||||
|
||||
Reference in New Issue
Block a user