diff --git a/mcp-server/src/legal_mcp/config.py b/mcp-server/src/legal_mcp/config.py index 27afaa7..95d5dd3 100644 --- a/mcp-server/src/legal_mcp/config.py +++ b/mcp-server/src/legal_mcp/config.py @@ -54,6 +54,12 @@ REDIS_URL = os.environ.get("REDIS_URL", "redis://127.0.0.1:6380/0") # pinned. HALACHA_EXTRACT_MODEL = os.environ.get("HALACHA_EXTRACT_MODEL", "claude-opus-4-8") HALACHA_EXTRACT_EFFORT = os.environ.get("HALACHA_EXTRACT_EFFORT", "xhigh") +# Effort for BULK queue-drain extraction (process_pending over many precedents). +# xhigh is the quality sweet-spot for a single precedent but very slow at scale +# (a 64-chunk case ≈ 20 min). Bulk drains use a lighter effort to cut wall-clock; +# interactive single re-extraction keeps HALACHA_EXTRACT_EFFORT (xhigh). Tune via +# env (set to 'xhigh' to make bulk match single, or 'medium' for max speed). +HALACHA_BULK_EXTRACT_EFFORT = os.environ.get("HALACHA_BULK_EXTRACT_EFFORT", "high") # Concurrent chunks WITHIN a single extraction. Each `claude -p` @ xhigh holds # ~300MB RSS + heavy CPU; cross-process overlap (agent retries) on top of this # froze the box on 2026-05-31 (hard reboot). A global advisory lock now caps diff --git a/mcp-server/src/legal_mcp/services/halacha_extractor.py b/mcp-server/src/legal_mcp/services/halacha_extractor.py index c7c3797..b7f8244 100644 --- a/mcp-server/src/legal_mcp/services/halacha_extractor.py +++ b/mcp-server/src/legal_mcp/services/halacha_extractor.py @@ -287,6 +287,7 @@ async def _extract_chunk( chunk_total: int, context: str, is_binding: bool, + effort: str | None = None, ) -> tuple[list[dict], bool]: """Run the halacha extractor on one chunk with retry. @@ -320,7 +321,7 @@ async def _extract_chunk( user_msg, system=base_prompt, model=config.HALACHA_EXTRACT_MODEL or None, - effort=config.HALACHA_EXTRACT_EFFORT or None, + effort=(effort or config.HALACHA_EXTRACT_EFFORT) or None, ) except Exception as e: last_err = e @@ -342,9 +343,14 @@ async def _extract_chunk( return [], False -async def extract(case_law_id: UUID | str, force: bool = False) -> dict: +async def extract(case_law_id: UUID | str, force: bool = False, + effort: str | None = None) -> dict: """Extract halachot from an uploaded precedent — globally serialized. + ``effort`` overrides the per-chunk LLM effort (default + ``config.HALACHA_EXTRACT_EFFORT`` = xhigh). Bulk queue-drains pass the + lighter ``config.HALACHA_BULK_EXTRACT_EFFORT`` to cut wall-clock at scale. + ``force=False`` (default) RESUMES: chunks already extracted (checkpointed) are skipped, so a crash/interrupt never loses completed work or re-pays for it. ``force=True`` wipes prior halachot + checkpoints and re-extracts all @@ -379,7 +385,7 @@ async def extract(case_law_id: UUID | str, force: bool = False) -> dict: "case_law_id": str(case_law_id), } try: - return await _extract_impl(case_law_id, force=force) + return await _extract_impl(case_law_id, force=force, effort=effort) finally: await lock_conn.fetchval( "SELECT pg_advisory_unlock($1)", _HALACHA_EXTRACT_LOCK_KEY, @@ -388,7 +394,8 @@ async def extract(case_law_id: UUID | str, force: bool = False) -> dict: await pool.release(lock_conn) -async def _extract_impl(case_law_id: UUID, force: bool = False) -> dict: +async def _extract_impl(case_law_id: UUID, force: bool = False, + effort: str | None = None) -> dict: """Core extraction (caller holds the global advisory lock for the duration). Crash-safe + resumable: each chunk's halachot are stored AND the chunk is @@ -476,6 +483,7 @@ async def _extract_impl(case_law_id: UUID, force: bool = False) -> dict: items, ok = await _extract_chunk( chunk_row["content"], chunk_row["section_type"], idx_by_id[chunk_row["id"]], len(chunks), context, is_binding, + effort, ) if not ok: failed_chunks += 1 # leave chunk un-checkpointed → retried on resume diff --git a/mcp-server/src/legal_mcp/services/precedent_library.py b/mcp-server/src/legal_mcp/services/precedent_library.py index eae3ea8..d15e100 100644 --- a/mcp-server/src/legal_mcp/services/precedent_library.py +++ b/mcp-server/src/legal_mcp/services/precedent_library.py @@ -219,7 +219,12 @@ async def process_pending_extractions(kind: str = "metadata", limit: int = 20) - async def _run_once(cid: UUID) -> dict: if kind == "metadata": return await precedent_metadata_extractor.extract_and_apply(cid) - return await halacha_extractor.extract(cid) + # Bulk queue-drain → lighter effort (config.HALACHA_BULK_EXTRACT_EFFORT, + # default 'high') to cut wall-clock at scale. Resume (force=False) so an + # interrupted drain continues per-chunk. Single re-extract stays xhigh. + return await halacha_extractor.extract( + cid, effort=config.HALACHA_BULK_EXTRACT_EFFORT, + ) results: list[dict] = [] processed = 0