feat(halacha): lighter effort for BULK queue-drain extraction (speed at scale)
xhigh is the quality sweet-spot for a single precedent but very slow at scale (64-chunk case ≈ 20 min). Bulk queue-drains (process_pending over many precedents) now use a lighter effort to cut wall-clock; interactive single re-extraction keeps xhigh quality. - config.HALACHA_BULK_EXTRACT_EFFORT (env, default 'high'; set 'medium' for max speed, 'xhigh' to match single). - extract()/_extract_impl()/_extract_chunk() take an `effort` override threaded to claude_session.query_json; None falls back to HALACHA_EXTRACT_EFFORT (xhigh). - process_pending_extractions(kind='halacha') passes the bulk effort; single reextract_halachot keeps xhigh. Verified end-to-end (mocked LLM): _extract_chunk(effort='medium') → query_json effort='medium'; effort=None → 'xhigh' fallback. Closes the open item in #72. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -54,6 +54,12 @@ REDIS_URL = os.environ.get("REDIS_URL", "redis://127.0.0.1:6380/0")
|
|||||||
# pinned.
|
# pinned.
|
||||||
HALACHA_EXTRACT_MODEL = os.environ.get("HALACHA_EXTRACT_MODEL", "claude-opus-4-8")
|
HALACHA_EXTRACT_MODEL = os.environ.get("HALACHA_EXTRACT_MODEL", "claude-opus-4-8")
|
||||||
HALACHA_EXTRACT_EFFORT = os.environ.get("HALACHA_EXTRACT_EFFORT", "xhigh")
|
HALACHA_EXTRACT_EFFORT = os.environ.get("HALACHA_EXTRACT_EFFORT", "xhigh")
|
||||||
|
# Effort for BULK queue-drain extraction (process_pending over many precedents).
|
||||||
|
# xhigh is the quality sweet-spot for a single precedent but very slow at scale
|
||||||
|
# (a 64-chunk case ≈ 20 min). Bulk drains use a lighter effort to cut wall-clock;
|
||||||
|
# interactive single re-extraction keeps HALACHA_EXTRACT_EFFORT (xhigh). Tune via
|
||||||
|
# env (set to 'xhigh' to make bulk match single, or 'medium' for max speed).
|
||||||
|
HALACHA_BULK_EXTRACT_EFFORT = os.environ.get("HALACHA_BULK_EXTRACT_EFFORT", "high")
|
||||||
# Concurrent chunks WITHIN a single extraction. Each `claude -p` @ xhigh holds
|
# Concurrent chunks WITHIN a single extraction. Each `claude -p` @ xhigh holds
|
||||||
# ~300MB RSS + heavy CPU; cross-process overlap (agent retries) on top of this
|
# ~300MB RSS + heavy CPU; cross-process overlap (agent retries) on top of this
|
||||||
# froze the box on 2026-05-31 (hard reboot). A global advisory lock now caps
|
# froze the box on 2026-05-31 (hard reboot). A global advisory lock now caps
|
||||||
|
|||||||
@@ -287,6 +287,7 @@ async def _extract_chunk(
|
|||||||
chunk_total: int,
|
chunk_total: int,
|
||||||
context: str,
|
context: str,
|
||||||
is_binding: bool,
|
is_binding: bool,
|
||||||
|
effort: str | None = None,
|
||||||
) -> tuple[list[dict], bool]:
|
) -> tuple[list[dict], bool]:
|
||||||
"""Run the halacha extractor on one chunk with retry.
|
"""Run the halacha extractor on one chunk with retry.
|
||||||
|
|
||||||
@@ -320,7 +321,7 @@ async def _extract_chunk(
|
|||||||
user_msg,
|
user_msg,
|
||||||
system=base_prompt,
|
system=base_prompt,
|
||||||
model=config.HALACHA_EXTRACT_MODEL or None,
|
model=config.HALACHA_EXTRACT_MODEL or None,
|
||||||
effort=config.HALACHA_EXTRACT_EFFORT or None,
|
effort=(effort or config.HALACHA_EXTRACT_EFFORT) or None,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
last_err = e
|
last_err = e
|
||||||
@@ -342,9 +343,14 @@ async def _extract_chunk(
|
|||||||
return [], False
|
return [], False
|
||||||
|
|
||||||
|
|
||||||
async def extract(case_law_id: UUID | str, force: bool = False) -> dict:
|
async def extract(case_law_id: UUID | str, force: bool = False,
|
||||||
|
effort: str | None = None) -> dict:
|
||||||
"""Extract halachot from an uploaded precedent — globally serialized.
|
"""Extract halachot from an uploaded precedent — globally serialized.
|
||||||
|
|
||||||
|
``effort`` overrides the per-chunk LLM effort (default
|
||||||
|
``config.HALACHA_EXTRACT_EFFORT`` = xhigh). Bulk queue-drains pass the
|
||||||
|
lighter ``config.HALACHA_BULK_EXTRACT_EFFORT`` to cut wall-clock at scale.
|
||||||
|
|
||||||
``force=False`` (default) RESUMES: chunks already extracted (checkpointed)
|
``force=False`` (default) RESUMES: chunks already extracted (checkpointed)
|
||||||
are skipped, so a crash/interrupt never loses completed work or re-pays for
|
are skipped, so a crash/interrupt never loses completed work or re-pays for
|
||||||
it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all
|
it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all
|
||||||
@@ -379,7 +385,7 @@ async def extract(case_law_id: UUID | str, force: bool = False) -> dict:
|
|||||||
"case_law_id": str(case_law_id),
|
"case_law_id": str(case_law_id),
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
return await _extract_impl(case_law_id, force=force)
|
return await _extract_impl(case_law_id, force=force, effort=effort)
|
||||||
finally:
|
finally:
|
||||||
await lock_conn.fetchval(
|
await lock_conn.fetchval(
|
||||||
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
"SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY,
|
||||||
@@ -388,7 +394,8 @@ async def extract(case_law_id: UUID | str, force: bool = False) -> dict:
|
|||||||
await pool.release(lock_conn)
|
await pool.release(lock_conn)
|
||||||
|
|
||||||
|
|
||||||
async def _extract_impl(case_law_id: UUID, force: bool = False) -> dict:
|
async def _extract_impl(case_law_id: UUID, force: bool = False,
|
||||||
|
effort: str | None = None) -> dict:
|
||||||
"""Core extraction (caller holds the global advisory lock for the duration).
|
"""Core extraction (caller holds the global advisory lock for the duration).
|
||||||
|
|
||||||
Crash-safe + resumable: each chunk's halachot are stored AND the chunk is
|
Crash-safe + resumable: each chunk's halachot are stored AND the chunk is
|
||||||
@@ -476,6 +483,7 @@ async def _extract_impl(case_law_id: UUID, force: bool = False) -> dict:
|
|||||||
items, ok = await _extract_chunk(
|
items, ok = await _extract_chunk(
|
||||||
chunk_row["content"], chunk_row["section_type"],
|
chunk_row["content"], chunk_row["section_type"],
|
||||||
idx_by_id[chunk_row["id"]], len(chunks), context, is_binding,
|
idx_by_id[chunk_row["id"]], len(chunks), context, is_binding,
|
||||||
|
effort,
|
||||||
)
|
)
|
||||||
if not ok:
|
if not ok:
|
||||||
failed_chunks += 1 # leave chunk un-checkpointed → retried on resume
|
failed_chunks += 1 # leave chunk un-checkpointed → retried on resume
|
||||||
|
|||||||
@@ -219,7 +219,12 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) -
|
|||||||
async def _run_once(cid: UUID) -> dict:
|
async def _run_once(cid: UUID) -> dict:
|
||||||
if kind == "metadata":
|
if kind == "metadata":
|
||||||
return await precedent_metadata_extractor.extract_and_apply(cid)
|
return await precedent_metadata_extractor.extract_and_apply(cid)
|
||||||
return await halacha_extractor.extract(cid)
|
# Bulk queue-drain → lighter effort (config.HALACHA_BULK_EXTRACT_EFFORT,
|
||||||
|
# default 'high') to cut wall-clock at scale. Resume (force=False) so an
|
||||||
|
# interrupted drain continues per-chunk. Single re-extract stays xhigh.
|
||||||
|
return await halacha_extractor.extract(
|
||||||
|
cid, effort=config.HALACHA_BULK_EXTRACT_EFFORT,
|
||||||
|
)
|
||||||
|
|
||||||
results: list[dict] = []
|
results: list[dict] = []
|
||||||
processed = 0
|
processed = 0
|
||||||
|
|||||||
Reference in New Issue
Block a user