"""Gemini structured-output helper — a drop-in for ``claude_session.query_json`` for BOUNDED extraction tasks (text → JSON). Why a second LLM path: metadata extraction is a single structured call (fill case_name/summary/headnote/tags from a verdict's text), not an agentic loop. The ``claude -p`` CLI behind ``claude_session`` is agentic — it reaches for tools and hits ``error_max_turns`` on a task that should be one shot — so it was slow and flaky for the precedent metadata queue. Gemini Flash with JSON mode (``responseMimeType: application/json``) is the right tool: one call, schema- clean JSON, fast, and ~$0.10/1M tokens (negligible for this volume). Scope: **bounded extraction only** (precedent metadata). The agentic, voice- sensitive work — decision writing, analysis, halacha extraction — stays on ``claude_session`` (Daphna's subscription, zero API cost). This is a deliberate per-task provider choice, not a wholesale move off Claude. Key: ``GEMINI_API_KEY`` (host ~/.env; SoT Infisical nautilus:/external-apis/gemini as ``GOOGLE_GEMINI_API_KEY``). Model: ``GEMINI_MODEL`` (default gemini-2.5-flash). Direct REST via httpx — no extra SDK dependency. """ from __future__ import annotations import json import logging import os import httpx logger = logging.getLogger(__name__) _BASE = "https://generativelanguage.googleapis.com/v1beta" _DEFAULT_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") _DEFAULT_TIMEOUT = float(os.environ.get("GEMINI_TIMEOUT_S", "120")) class GeminiError(RuntimeError): """Gemini API call failed or returned an unexpected shape.""" def _api_key() -> str: key = os.environ.get("GEMINI_API_KEY", "").strip() if not key: raise GeminiError( "GEMINI_API_KEY אינו מוגדר (host ~/.env / Infisical " "nautilus:/external-apis/gemini)." ) return key async def query_json( prompt: str, timeout: float | int = _DEFAULT_TIMEOUT, *, system: str | None = None, model: str | None = None, # Accepted for drop-in parity with claude_session.query_json; ignored here. effort: str | None = None, tools: str | None = None, ) -> dict | list | None: """Single structured-output call → parsed JSON. Drop-in for ``claude_session.query_json``. Raises ``GeminiError`` on failure (the caller treats that like any extraction failure — recorded, never silently wrong). """ model = model or _DEFAULT_MODEL body: dict = { "contents": [{"role": "user", "parts": [{"text": prompt}]}], "generationConfig": { "responseMimeType": "application/json", "temperature": 0, }, } if system: body["system_instruction"] = {"parts": [{"text": system}]} url = f"{_BASE}/models/{model}:generateContent" try: async with httpx.AsyncClient(timeout=float(timeout)) as client: resp = await client.post(url, params={"key": _api_key()}, json=body) except httpx.HTTPError as e: raise GeminiError(f"Gemini request failed: {e}") from e if resp.status_code != 200: raise GeminiError(f"Gemini HTTP {resp.status_code}: {resp.text[:200]}") data = resp.json() # Surface an explicit safety/finish block rather than returning empty. cand = (data.get("candidates") or [{}])[0] if cand.get("finishReason") in ("SAFETY", "RECITATION", "PROHIBITED_CONTENT"): raise GeminiError(f"Gemini blocked output: finishReason={cand['finishReason']}") try: text = cand["content"]["parts"][0]["text"] except (KeyError, IndexError, TypeError) as e: raise GeminiError(f"Gemini unexpected response: {str(data)[:200]}") from e try: return json.loads(text) except json.JSONDecodeError as e: raise GeminiError(f"Gemini returned non-JSON: {text[:200]}") from e