Clean up scripts/: archive 17, delete 5, add SCRIPTS.md registry
Active scripts (5): auto-sync-cases.sh, backup-db.sh, restore-db.sh, notify.py, bidi_table.py Archived (17): one-time migration/seeding scripts whose functionality is now in MCP server or web API. Moved to scripts/.archive/ Deleted (5): zero-value scripts (duplicates, hardcoded single-case, debug scripts) Added scripts/SCRIPTS.md — registry of all scripts with purpose, status, and what superseded them. CLAUDE.md updated with rule: any script change requires SCRIPTS.md update. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
163
scripts/.archive/backfill_pattern_frequency.py
Normal file
163
scripts/.archive/backfill_pattern_frequency.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""Backfill style_patterns.frequency with real occurrence counts.
|
||||
|
||||
The analyzer currently stores frequency=1 for every pattern (it only extracts
|
||||
unique patterns, doesn't count occurrences). This script scans the full_text
|
||||
of every decision in style_corpus and updates each pattern's frequency to
|
||||
the true count of decisions containing the pattern_text as a substring.
|
||||
|
||||
Run once after analysis, and again whenever new decisions are added.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
|
||||
# Load env
|
||||
for line in (Path.home() / ".env").read_text().splitlines():
|
||||
if "=" in line and not line.startswith("#"):
|
||||
k, v = line.split("=", 1)
|
||||
os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))
|
||||
|
||||
sys.path.insert(0, "/home/chaim/legal-ai/mcp-server/src")
|
||||
|
||||
from legal_mcp.services import db as db_mod # noqa: E402
|
||||
|
||||
|
||||
def _strip_nikud(text: str) -> str:
|
||||
"""Remove Hebrew combining marks (nikud) for robust matching."""
|
||||
return "".join(
|
||||
c for c in unicodedata.normalize("NFD", text)
|
||||
if not unicodedata.combining(c)
|
||||
)
|
||||
|
||||
|
||||
def _extract_searchable_variants(pattern_text: str) -> list[str]:
|
||||
"""Extract searchable substrings from a pattern template.
|
||||
|
||||
The analyzer stores patterns as templates with:
|
||||
- Placeholders in [brackets]: "בפנינו ערר על החלטת [הגוף] מיום [תאריך]"
|
||||
- Alternatives separated by / : "נפנה ל... / ראה והשווה / נפנה להחלטה"
|
||||
- Ellipsis ... for variable parts
|
||||
|
||||
This function returns a list of concrete substrings to search for.
|
||||
We pick the longest fixed segment from each alternative (>= 4 chars)
|
||||
so that matching is specific enough to be meaningful but still flexible.
|
||||
"""
|
||||
# Split on " / " or " או " to get alternatives
|
||||
alternatives = re.split(r"\s*/\s*|\s+או\s+", pattern_text)
|
||||
|
||||
variants: list[str] = []
|
||||
for alt in alternatives:
|
||||
alt = alt.strip()
|
||||
if not alt:
|
||||
continue
|
||||
|
||||
# Remove bracket placeholders [X]
|
||||
alt = re.sub(r"\[[^\]]*\]", "|", alt)
|
||||
# Replace ellipsis with separator
|
||||
alt = re.sub(r"\.{2,}", "|", alt)
|
||||
# Remove ellipsis unicode
|
||||
alt = alt.replace("…", "|")
|
||||
|
||||
# Split on the | separator and take fixed segments
|
||||
segments = [s.strip(" ,.:;\"'") for s in alt.split("|")]
|
||||
# Keep segments long enough to be meaningful (>= 4 chars, not just common words)
|
||||
good = [s for s in segments if len(s) >= 4]
|
||||
|
||||
if good:
|
||||
# Use the longest segment as the key variant for this alternative
|
||||
variants.append(max(good, key=len))
|
||||
elif alt.strip():
|
||||
# Fallback: use the whole cleaned alternative
|
||||
stripped = alt.replace("|", " ").strip()
|
||||
if len(stripped) >= 4:
|
||||
variants.append(stripped)
|
||||
|
||||
# Deduplicate while preserving order
|
||||
seen = set()
|
||||
unique = []
|
||||
for v in variants:
|
||||
if v not in seen:
|
||||
seen.add(v)
|
||||
unique.append(v)
|
||||
return unique
|
||||
|
||||
|
||||
def _count_decisions_containing(variants: list[str], normalized_decisions: list) -> int:
|
||||
"""Count how many decisions contain ANY of the variants."""
|
||||
count = 0
|
||||
for _, _, text in normalized_decisions:
|
||||
if any(v in text for v in variants):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
async def main() -> int:
|
||||
pool = await db_mod.get_pool()
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
decisions = await conn.fetch(
|
||||
"SELECT id, decision_number, full_text FROM style_corpus "
|
||||
"WHERE full_text IS NOT NULL AND length(full_text) > 0"
|
||||
)
|
||||
patterns = await conn.fetch(
|
||||
"SELECT id, pattern_text, pattern_type FROM style_patterns"
|
||||
)
|
||||
|
||||
print(f"Scanning {len(patterns)} patterns across {len(decisions)} decisions...")
|
||||
|
||||
# Normalize decisions once
|
||||
normalized_decisions = [
|
||||
(d["id"], d["decision_number"], _strip_nikud(d["full_text"]))
|
||||
for d in decisions
|
||||
]
|
||||
|
||||
updates = []
|
||||
for p in patterns:
|
||||
pattern_text = p["pattern_text"]
|
||||
if not pattern_text or len(pattern_text) < 3:
|
||||
updates.append((0, p["id"]))
|
||||
continue
|
||||
|
||||
variants = _extract_searchable_variants(_strip_nikud(pattern_text))
|
||||
if not variants:
|
||||
updates.append((0, p["id"]))
|
||||
continue
|
||||
|
||||
count = _count_decisions_containing(variants, normalized_decisions)
|
||||
updates.append((count, p["id"]))
|
||||
|
||||
await conn.executemany(
|
||||
"UPDATE style_patterns SET frequency = $1 WHERE id = $2",
|
||||
updates,
|
||||
)
|
||||
|
||||
# Show distribution
|
||||
rows = await conn.fetch(
|
||||
"SELECT pattern_type, pattern_text, frequency "
|
||||
"FROM style_patterns "
|
||||
"ORDER BY frequency DESC "
|
||||
"LIMIT 15"
|
||||
)
|
||||
print(f"\nTop 15 patterns by real frequency:")
|
||||
for r in rows:
|
||||
print(f" {r['frequency']:>3} [{r['pattern_type']:<22}] {r['pattern_text'][:90]}")
|
||||
|
||||
dist = await conn.fetch(
|
||||
"SELECT frequency, count(*) FROM style_patterns "
|
||||
"GROUP BY frequency ORDER BY frequency DESC"
|
||||
)
|
||||
print(f"\nFrequency distribution:")
|
||||
for r in dist:
|
||||
print(f" frequency={r['frequency']:>3} → {r['count']} patterns")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(asyncio.run(main()))
|
||||
Reference in New Issue
Block a user