Guilherme34's picture
Upload folder using huggingface_hub
aa15bce verified
"""Persistence helper for tracking recently processed Gmail message IDs."""
from __future__ import annotations
import json
import threading
from collections import deque
from pathlib import Path
from typing import Deque, Iterable, List, Optional, Set
from ...logging_config import logger
class GmailSeenStore:
"""Maintain a bounded set of Gmail message IDs backed by a JSON file."""
def __init__(self, path: Path, max_entries: int = 300) -> None:
self._path = path
self._max_entries = max_entries
self._lock = threading.Lock()
self._entries: Deque[str] = deque()
self._index: Set[str] = set()
self._load()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def has_entries(self) -> bool:
with self._lock:
return bool(self._entries)
def is_seen(self, message_id: str) -> bool:
normalized = self._normalize(message_id)
if not normalized:
return False
with self._lock:
return normalized in self._index
def mark_seen(self, message_ids: Iterable[str]) -> None:
normalized_ids = [mid for mid in (self._normalize(mid) for mid in message_ids) if mid]
if not normalized_ids:
return
with self._lock:
for message_id in normalized_ids:
if message_id in self._index:
# Refresh recency by removing and re-appending
try:
self._entries.remove(message_id)
except ValueError: # pragma: no cover - defensive
pass
else:
self._index.add(message_id)
self._entries.append(message_id)
self._prune_locked()
self._persist_locked()
def snapshot(self) -> List[str]:
with self._lock:
return list(self._entries)
def clear(self) -> None:
with self._lock:
self._entries.clear()
self._index.clear()
self._persist_locked()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _normalize(self, message_id: Optional[str]) -> str:
if not message_id:
return ""
return str(message_id).strip()
def _load(self) -> None:
try:
data = json.loads(self._path.read_text(encoding="utf-8"))
except FileNotFoundError:
return
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"Failed to load Gmail seen-store; starting empty",
extra={"path": str(self._path), "error": str(exc)},
)
return
if not isinstance(data, list):
logger.warning(
"Gmail seen-store payload invalid; expected list",
extra={"path": str(self._path)},
)
return
for raw_id in data[-self._max_entries :]:
normalized = self._normalize(raw_id)
if normalized and normalized not in self._index:
self._entries.append(normalized)
self._index.add(normalized)
def _prune_locked(self) -> None:
while len(self._entries) > self._max_entries:
oldest = self._entries.popleft()
self._index.discard(oldest)
def _persist_locked(self) -> None:
try:
self._path.parent.mkdir(parents=True, exist_ok=True)
payload = list(self._entries)
self._path.write_text(json.dumps(payload), encoding="utf-8")
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"Failed to persist Gmail seen-store",
extra={"path": str(self._path), "error": str(exc)},
)
__all__ = ["GmailSeenStore"]