| | """Smart OCR deduplication — stabilization-first approach. |
| | |
| | Core principle: **don't read text until it STOPS CHANGING**. |
| | Then check against read history to avoid repeats. |
| | |
| | Architecture: |
| | |
| | Phase 1 — **Snapshot Stabilization** |
| | Each tick compares the full OCR output (all regions merged) with the |
| | previous tick. If text is growing (typewriter effect), we wait. |
| | Only when the snapshot is identical for ``stabilize_ticks`` consecutive |
| | ticks do we consider it "stable" and proceed. |
| | |
| | Phase 2 — **Line History Dedup** |
| | Once stable, each line is fuzzy-compared against a history of previously |
| | emitted lines. Only genuinely new lines pass through. History entries |
| | expire via TTL so the same text can be re-read after a cooldown. |
| | |
| | Phase 3 — **Significance Check** |
| | Rejects composed output that is too short, has too few real words, |
| | or is mostly non-alphanumeric (OCR garbage / UI artifacts). |
| | |
| | This naturally handles: |
| | - **Typewriter effects**: text grows → wait → stabilize → read complete sentence |
| | - **Static UI** (HP bars, names): stabilizes → read once → in history → skip |
| | - **OCR noise**: fuzzy matching tolerates minor variations |
| | - **Dialog changes**: snapshot changes → re-stabilize → emit new parts only |
| | - **Repeated dialog**: TTL expiry allows re-reading after cooldown |
| | |
| | Usage:: |
| | |
| | from src.services.ocr.dedup import SmartDedup |
| | |
| | dedup = SmartDedup() |
| | text = dedup.process(region_labels, ocr_results) |
| | if text is not None: |
| | translate_and_speak(text) |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import time |
| | from collections import deque |
| | from dataclasses import dataclass |
| | from difflib import SequenceMatcher |
| |
|
| | from src.services.ocr.models import OcrResult |
| | from src.utils.logger import logger |
| |
|
| | |
| |
|
| | DEFAULT_STABILIZE_TICKS: int = 3 |
| | DEFAULT_SNAPSHOT_SIMILARITY: float = 0.92 |
| | DEFAULT_LINE_SIMILARITY: float = 0.80 |
| | DEFAULT_LINE_TTL: float = 120.0 |
| | DEFAULT_HISTORY_TTL: float = 90.0 |
| | DEFAULT_HISTORY_SIZE: int = 30 |
| | DEFAULT_MIN_NEW_CHARS: int = 8 |
| | DEFAULT_MIN_NEW_WORDS: int = 2 |
| | DEFAULT_MIN_ALNUM_RATIO: float = 0.35 |
| |
|
| |
|
| | |
| |
|
| |
|
| | @dataclass |
| | class HistoryEntry: |
| | """An entry in the global text history ring buffer.""" |
| |
|
| | norm_text: str |
| | original_text: str |
| | first_seen: float |
| | last_seen: float |
| | hit_count: int = 1 |
| |
|
| |
|
| | @dataclass |
| | class DedupConfig: |
| | """All tunable knobs for the dedup system. |
| | |
| | Attributes: |
| | stabilize_ticks: Consecutive identical ticks before text is considered "stable". |
| | snapshot_similarity: Fuzzy threshold for treating two snapshots as identical (0-1). |
| | line_similarity: Fuzzy threshold for line-level history matching (0-1). |
| | line_ttl: Seconds before a known line in history expires. |
| | history_ttl: Seconds before a global history entry expires. |
| | history_size: Max entries in the global history ring buffer. |
| | history_similarity: Alias for line_similarity (backward compat with bridge.py). |
| | min_new_chars: Minimum characters for a change to be significant. |
| | min_new_words: Minimum word count for significance. |
| | min_alnum_ratio: Minimum alphanumeric ratio for significance. |
| | debounce_time: Legacy field — not used internally, kept for bridge compat. |
| | """ |
| |
|
| | stabilize_ticks: int = DEFAULT_STABILIZE_TICKS |
| | snapshot_similarity: float = DEFAULT_SNAPSHOT_SIMILARITY |
| | line_similarity: float = DEFAULT_LINE_SIMILARITY |
| | line_ttl: float = DEFAULT_LINE_TTL |
| | history_ttl: float = DEFAULT_HISTORY_TTL |
| | history_size: int = DEFAULT_HISTORY_SIZE |
| | history_similarity: float = DEFAULT_LINE_SIMILARITY |
| | min_new_chars: int = DEFAULT_MIN_NEW_CHARS |
| | min_new_words: int = DEFAULT_MIN_NEW_WORDS |
| | min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO |
| | debounce_time: float = 0.0 |
| | instant_mode: bool = False |
| |
|
| |
|
| | |
| |
|
| |
|
| | def _normalize(text: str) -> str: |
| | """Collapse whitespace, strip, lowercase — for comparison only.""" |
| | return " ".join(text.split()).strip().lower() |
| |
|
| |
|
| | |
| |
|
| |
|
| | class LineHistory: |
| | """Tracks previously emitted lines with TTL-based expiry. |
| | |
| | Each emitted line is stored (normalized) with a timestamp. |
| | Old entries expire after ``ttl`` seconds, allowing re-reading. |
| | Fuzzy matching handles OCR noise on short lines. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | ttl: float = DEFAULT_LINE_TTL, |
| | similarity: float = DEFAULT_LINE_SIMILARITY, |
| | ) -> None: |
| | self._entries: dict[str, float] = {} |
| | self._ttl = ttl |
| | self._similarity = similarity |
| |
|
| | def is_known(self, line: str) -> bool: |
| | """Check if a line was emitted recently (within TTL). |
| | |
| | Uses exact match first, then fuzzy for short lines. |
| | |
| | Args: |
| | line: Raw (non-normalized) line text. |
| | |
| | Returns: |
| | True if line is in recent history (should be skipped). |
| | """ |
| | norm = _normalize(line) |
| | if len(norm) < 2: |
| | return True |
| |
|
| | now = time.monotonic() |
| | self._gc(now) |
| |
|
| | |
| | if norm in self._entries: |
| | return True |
| |
|
| | |
| | if len(norm) < 60: |
| | for key in self._entries: |
| | if abs(len(norm) - len(key)) > max(5, len(key) * 0.25): |
| | continue |
| | ratio = SequenceMatcher(None, norm, key).ratio() |
| | if ratio >= self._similarity: |
| | return True |
| |
|
| | return False |
| |
|
| | def mark_emitted(self, line: str) -> None: |
| | """Record a line as emitted.""" |
| | norm = _normalize(line) |
| | if norm: |
| | self._entries[norm] = time.monotonic() |
| |
|
| | def reset(self) -> None: |
| | """Clear all history.""" |
| | self._entries.clear() |
| |
|
| | @property |
| | def size(self) -> int: |
| | return len(self._entries) |
| |
|
| | def _gc(self, now: float) -> None: |
| | """Remove entries older than TTL.""" |
| | expired = [k for k, ts in self._entries.items() if now - ts > self._ttl] |
| | for k in expired: |
| | del self._entries[k] |
| |
|
| |
|
| | |
| |
|
| |
|
| | class GlobalTextHistory: |
| | """Ring buffer of recently emitted text blocks with TTL. |
| | |
| | Prevents the same composed text from being re-emitted within |
| | the TTL window. Uses fuzzy matching to handle OCR noise. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | max_size: int = DEFAULT_HISTORY_SIZE, |
| | ttl: float = DEFAULT_HISTORY_TTL, |
| | similarity: float = DEFAULT_LINE_SIMILARITY, |
| | ) -> None: |
| | self._entries: deque[HistoryEntry] = deque(maxlen=max_size) |
| | self._ttl = ttl |
| | self._similarity = similarity |
| |
|
| | def is_duplicate(self, text: str) -> tuple[bool, float]: |
| | """Check whether text duplicates something in recent history. |
| | |
| | Args: |
| | text: Composed text block. |
| | |
| | Returns: |
| | ``(is_dup, best_similarity)`` |
| | """ |
| | now = time.monotonic() |
| | norm = _normalize(text) |
| | if not norm: |
| | return (True, 1.0) |
| |
|
| | best_sim = 0.0 |
| | for entry in self._entries: |
| | if now - entry.last_seen > self._ttl: |
| | continue |
| |
|
| | if entry.norm_text == norm: |
| | entry.last_seen = now |
| | entry.hit_count += 1 |
| | return (True, 1.0) |
| |
|
| | ratio = SequenceMatcher(None, norm, entry.norm_text).ratio() |
| | best_sim = max(best_sim, ratio) |
| | if ratio >= self._similarity: |
| | entry.last_seen = now |
| | entry.hit_count += 1 |
| | return (True, ratio) |
| |
|
| | return (False, best_sim) |
| |
|
| | def add(self, text: str) -> None: |
| | """Record a new text block in history.""" |
| | norm = _normalize(text) |
| | now = time.monotonic() |
| | self._entries.append( |
| | HistoryEntry( |
| | norm_text=norm, |
| | original_text=text, |
| | first_seen=now, |
| | last_seen=now, |
| | ) |
| | ) |
| |
|
| | def reset(self) -> None: |
| | self._entries.clear() |
| |
|
| | @property |
| | def size(self) -> int: |
| | return len(self._entries) |
| |
|
| |
|
| | |
| |
|
| |
|
| | class ChangeDetector: |
| | """Decide whether new lines constitute a meaningful change. |
| | |
| | Rejects very short text, too few words, or mostly non-alphanumeric content. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | min_chars: int = DEFAULT_MIN_NEW_CHARS, |
| | min_words: int = DEFAULT_MIN_NEW_WORDS, |
| | min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO, |
| | ) -> None: |
| | self._min_chars = min_chars |
| | self._min_words = min_words |
| | self._min_alnum_ratio = min_alnum_ratio |
| |
|
| | def is_significant(self, new_lines: list[str]) -> bool: |
| | """Return True if the new lines represent real content, not OCR garbage.""" |
| | text = " ".join(line.strip() for line in new_lines).strip() |
| |
|
| | if len(text) < self._min_chars: |
| | return False |
| |
|
| | words = text.split() |
| | if len(words) < self._min_words: |
| | return False |
| |
|
| | alnum = sum(1 for c in text if c.isalnum()) |
| | ratio = alnum / len(text) if text else 0 |
| | if ratio < self._min_alnum_ratio: |
| | return False |
| |
|
| | return True |
| |
|
| |
|
| | |
| |
|
| |
|
| | class SmartDedup: |
| | """Stabilization-first OCR deduplication. |
| | |
| | Core algorithm: |
| | |
| | 1. Each tick: merge all OCR results into a single text snapshot |
| | 2. Compare snapshot with previous tick — growing? same? different? |
| | 3. When snapshot is identical for ``stabilize_ticks`` consecutive ticks → STABLE |
| | 4. Extract lines, filter against read history → emit only NEW lines |
| | 5. Significance check → reject OCR garbage |
| | 6. Add emitted lines to history, record in global ring buffer |
| | |
| | This replaces the old per-line-tracker approach which caused: |
| | - Sentence fragments (read partial text too early) |
| | - Infinite silence (partial lines marked "known" too aggressively) |
| | |
| | Example:: |
| | |
| | dedup = SmartDedup() |
| | |
| | # On each pipeline tick: |
| | text = dedup.process(region_labels, ocr_results) |
| | if text is not None: |
| | await translate_and_speak(text) |
| | |
| | # On pipeline stop or config change: |
| | dedup.reset() |
| | """ |
| |
|
| | def __init__(self, config: DedupConfig | None = None) -> None: |
| | self._cfg = config or DedupConfig() |
| |
|
| | |
| | self._last_snapshot: str | None = None |
| | self._last_raw: str | None = None |
| | self._stable_count: int = 0 |
| | self._processed_snapshot: str | None = None |
| |
|
| | |
| | |
| | self._last_emitted_norm: str | None = None |
| |
|
| | |
| | self._line_history = LineHistory( |
| | ttl=self._cfg.line_ttl, |
| | similarity=self._cfg.line_similarity, |
| | ) |
| | self._global_history = GlobalTextHistory( |
| | max_size=self._cfg.history_size, |
| | ttl=self._cfg.history_ttl, |
| | similarity=self._cfg.history_similarity, |
| | ) |
| | self._change_detector = ChangeDetector( |
| | min_chars=self._cfg.min_new_chars, |
| | min_words=self._cfg.min_new_words, |
| | min_alnum_ratio=self._cfg.min_alnum_ratio, |
| | ) |
| |
|
| | |
| |
|
| | def process( |
| | self, |
| | region_labels: list[str], |
| | ocr_results: list[OcrResult], |
| | *, |
| | force: bool = False, |
| | ) -> str | None: |
| | """Run stabilization-based dedup on multi-region OCR results. |
| | |
| | Args: |
| | region_labels: Label/ID for each region (for diagnostics). |
| | ocr_results: OCR result per region (same order as labels). |
| | force: If True, skip all dedup and return all text immediately. |
| | |
| | Returns: |
| | Text to translate + speak, or None if suppressed by dedup. |
| | """ |
| | |
| | raw_parts: list[str] = [] |
| | for result in ocr_results: |
| | if result.error or result.is_empty: |
| | continue |
| | text = result.text.strip() |
| | if text: |
| | raw_parts.append(text) |
| |
|
| | if not raw_parts: |
| | return None |
| |
|
| | full_raw = "\n".join(raw_parts) |
| | full_norm = _normalize(full_raw) |
| |
|
| | if not full_norm or len(full_norm) < 2: |
| | return None |
| |
|
| | |
| | if force: |
| | self._global_history.add(full_raw) |
| | self._mark_all_lines_known(full_raw) |
| | self._last_snapshot = full_norm |
| | self._last_raw = full_raw |
| | self._processed_snapshot = full_norm |
| | self._stable_count = 0 |
| | logger.info("Dedup: force read — emitting %d chars", len(full_raw)) |
| | return full_raw |
| |
|
| | |
| | if self._last_snapshot is None: |
| | |
| | self._last_snapshot = full_norm |
| | self._last_raw = full_raw |
| | self._stable_count = 0 |
| | self._processed_snapshot = None |
| | |
| | if not self._cfg.instant_mode: |
| | return None |
| |
|
| | |
| | snapshot_sim = self._snapshot_similarity(self._last_snapshot, full_norm) |
| |
|
| | if snapshot_sim >= self._cfg.snapshot_similarity: |
| | |
| | self._stable_count += 1 |
| | elif self._is_text_growing(self._last_snapshot, full_norm): |
| | |
| | self._stable_count = 0 |
| | self._last_snapshot = full_norm |
| | self._last_raw = full_raw |
| | self._processed_snapshot = None |
| | logger.debug("Dedup: text growing, waiting for stabilization") |
| | return None |
| | elif ( |
| | self._last_emitted_norm is not None |
| | and self._is_text_growing(self._last_emitted_norm, full_norm) |
| | ): |
| | |
| | |
| | |
| | self._stable_count = 0 |
| | self._last_snapshot = full_norm |
| | self._last_raw = full_raw |
| | self._processed_snapshot = None |
| | logger.debug("Dedup: post-emit growth detected, waiting for continuation") |
| | return None |
| | else: |
| | |
| | self._stable_count = 0 |
| | self._last_snapshot = full_norm |
| | self._last_raw = full_raw |
| | self._processed_snapshot = None |
| | logger.debug("Dedup: snapshot changed, waiting for stabilization") |
| | return None |
| |
|
| | |
| | self._last_snapshot = full_norm |
| | self._last_raw = full_raw |
| |
|
| | |
| | required_ticks = 1 if self._cfg.instant_mode else self._cfg.stabilize_ticks |
| | if self._stable_count < required_ticks: |
| | return None |
| |
|
| | |
| | if self._processed_snapshot is not None: |
| | sim = self._snapshot_similarity(full_norm, self._processed_snapshot) |
| | if sim >= self._cfg.snapshot_similarity: |
| | return None |
| |
|
| | |
| | all_lines = self._extract_lines(full_raw, ocr_results) |
| | new_lines: list[str] = [] |
| |
|
| | for line in all_lines: |
| | if not self._line_history.is_known(line): |
| | new_lines.append(line) |
| |
|
| | |
| | if new_lines: |
| | composed = "\n".join(new_lines) |
| | is_dup, sim = self._global_history.is_duplicate(composed) |
| | if is_dup: |
| | logger.debug("Dedup: global history match (sim=%.3f)", sim) |
| | new_lines = [] |
| |
|
| | if not new_lines: |
| | |
| | self._processed_snapshot = full_norm |
| | return None |
| |
|
| | |
| | if not self._change_detector.is_significant(new_lines): |
| | logger.debug( |
| | "Dedup: new lines not significant (%d lines, %d chars)", |
| | len(new_lines), |
| | sum(len(line) for line in new_lines), |
| | ) |
| | self._processed_snapshot = full_norm |
| | return None |
| |
|
| | |
| | composed = "\n".join(new_lines) |
| | self._mark_all_lines_known(composed) |
| | self._global_history.add(composed) |
| | self._processed_snapshot = full_norm |
| | |
| | self._last_emitted_norm = full_norm |
| | |
| | self._stable_count = 0 |
| |
|
| | logger.info( |
| | "Dedup: emitting %d new lines (%d chars, %d known lines in history)", |
| | len(new_lines), |
| | len(composed), |
| | self._line_history.size, |
| | ) |
| | return composed |
| |
|
| | def force_flush(self) -> str | None: |
| | """Force-emit whatever raw text is pending (for force-read button).""" |
| | if self._last_raw: |
| | raw = self._last_raw |
| | self._global_history.add(raw) |
| | self._mark_all_lines_known(raw) |
| | return raw |
| | return None |
| |
|
| | def update_config(self, config: DedupConfig) -> None: |
| | """Apply new configuration. Rebuilds internal components.""" |
| | self._cfg = config |
| | self._line_history = LineHistory( |
| | ttl=config.line_ttl, |
| | similarity=config.line_similarity, |
| | ) |
| | self._global_history = GlobalTextHistory( |
| | max_size=config.history_size, |
| | ttl=config.history_ttl, |
| | similarity=config.history_similarity, |
| | ) |
| | self._change_detector = ChangeDetector( |
| | min_chars=config.min_new_chars, |
| | min_words=config.min_new_words, |
| | min_alnum_ratio=config.min_alnum_ratio, |
| | ) |
| | logger.info("SmartDedup: config updated") |
| |
|
| | def reset(self) -> None: |
| | """Clear all state (e.g. on scene change or pipeline restart).""" |
| | self._last_snapshot = None |
| | self._last_raw = None |
| | self._stable_count = 0 |
| | self._processed_snapshot = None |
| | self._last_emitted_norm = None |
| | self._line_history.reset() |
| | self._global_history.reset() |
| | logger.info("SmartDedup: all state reset") |
| |
|
| | def reset_region(self, label: str) -> None: |
| | """No-op in snapshot-based approach — kept for backward compat.""" |
| | pass |
| |
|
| | @property |
| | def stats(self) -> dict[str, int]: |
| | """Return diagnostic stats.""" |
| | return { |
| | "tracked_regions": 0, |
| | "total_known_lines": self._line_history.size, |
| | "history_size": self._global_history.size, |
| | "stable_count": self._stable_count, |
| | } |
| |
|
| | |
| |
|
| | @staticmethod |
| | def _snapshot_similarity(a: str, b: str) -> float: |
| | """Fast similarity between two normalized snapshots.""" |
| | if a == b: |
| | return 1.0 |
| | if not a or not b: |
| | return 0.0 |
| | return SequenceMatcher(None, a, b).ratio() |
| |
|
| | @staticmethod |
| | def _is_text_growing(old_norm: str, new_norm: str) -> bool: |
| | """Check if new text is an expansion of old text (typewriter effect). |
| | |
| | Returns True if new_norm is longer AND contains most of old_norm's |
| | words at the beginning (prefix-like growth). |
| | """ |
| | if len(new_norm) <= len(old_norm): |
| | return False |
| |
|
| | |
| | if new_norm.startswith(old_norm): |
| | return True |
| |
|
| | |
| | old_words = old_norm.split() |
| | new_words = new_norm.split() |
| |
|
| | if len(new_words) <= len(old_words): |
| | return False |
| |
|
| | |
| | matching = 0 |
| | for old_w, new_w in zip(old_words, new_words): |
| | if old_w == new_w: |
| | matching += 1 |
| | elif SequenceMatcher(None, old_w, new_w).ratio() > 0.8: |
| | |
| | matching += 1 |
| |
|
| | |
| | return matching >= len(old_words) * 0.6 |
| |
|
| | def _extract_lines( |
| | self, raw_text: str, ocr_results: list[OcrResult] |
| | ) -> list[str]: |
| | """Extract individual lines from OCR results. |
| | |
| | Prefers structured ``OcrResult.lines`` when available. |
| | Deduplicates across regions (overlapping capture areas). |
| | |
| | Args: |
| | raw_text: Fallback raw text (used if no structured lines). |
| | ocr_results: OCR results with structured lines. |
| | |
| | Returns: |
| | List of unique raw line texts. |
| | """ |
| | lines: list[str] = [] |
| | seen_norms: set[str] = set() |
| |
|
| | for result in ocr_results: |
| | if result.error or result.is_empty: |
| | continue |
| | for ocr_line in result.lines: |
| | raw = ocr_line.text.strip() |
| | if not raw: |
| | continue |
| | norm = _normalize(raw) |
| | if len(norm) < 2: |
| | continue |
| |
|
| | |
| | if norm in seen_norms: |
| | continue |
| |
|
| | |
| | |
| | |
| | is_cross_dup = False |
| | if len(norm) < 60: |
| | for seen in seen_norms: |
| | if abs(len(norm) - len(seen)) > 3: |
| | continue |
| | if SequenceMatcher(None, norm, seen).ratio() >= 0.95: |
| | is_cross_dup = True |
| | break |
| | if is_cross_dup: |
| | continue |
| |
|
| | seen_norms.add(norm) |
| | lines.append(raw) |
| |
|
| | |
| | if not lines: |
| | for line in raw_text.split("\n"): |
| | stripped = line.strip() |
| | if stripped and len(_normalize(stripped)) >= 2: |
| | norm = _normalize(stripped) |
| | if norm not in seen_norms: |
| | seen_norms.add(norm) |
| | lines.append(stripped) |
| |
|
| | return lines |
| |
|
| | def _mark_all_lines_known(self, text: str) -> None: |
| | """Add all lines in text to line history.""" |
| | for line in text.split("\n"): |
| | stripped = line.strip() |
| | if stripped and len(_normalize(stripped)) >= 2: |
| | self._line_history.mark_emitted(stripped) |
| |
|