gng / grok_sensor.py
plexdx's picture
Upload 21 files
f589dab verified
"""
grok_sensor.py — X API v2 sensor with Community Notes and full mock fallback.
Queries:
1. X API v2 search/recent: 7-day tweet volume for claim keywords
2. Community Notes API: active notes matching claim hash
Returns:
XSensorResult: {velocity, community_note, note_text, query_used}
Design decisions:
- All I/O is async (httpx.AsyncClient)
- Full mock fallback when X_BEARER_TOKEN is absent (common in dev/CI)
- Rate-limit aware: exponential backoff via tenacity
- Keyword extraction: noun phrases + named entities via regex (no spaCy dependency)
"""
from __future__ import annotations
import hashlib
import os
import re
import time
from dataclasses import dataclass
from typing import Optional
import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential
log = structlog.get_logger(__name__)
X_SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent"
X_COMMUNITY_NOTES_URL = "https://api.twitter.com/2/notes/search"
MAX_QUERY_LEN = 512
@dataclass
class XSensorResult:
velocity: int # 7-day tweet volume for claim keywords
community_note: bool # Active Community Note found
note_text: str | None # Text of the most relevant note, if any
query_used: str # The search query constructed from the claim
source: str = "api" # "api" or "mock"
def _extract_keywords(text: str, max_keywords: int = 6) -> str:
"""
Extract query keywords from claim text using patterns:
- Quoted strings (already specific)
- Named entities: capitalized sequences of 1-3 words
- Numbers with context
Strips stop words to improve query precision.
"""
stop_words = {
"the", "a", "an", "is", "are", "was", "were", "be", "been",
"have", "has", "had", "do", "did", "will", "would", "could",
"should", "may", "might", "shall", "can", "this", "that",
"these", "those", "it", "its", "their", "they", "we", "you",
"i", "he", "she", "and", "or", "but", "in", "on", "at", "to",
"for", "of", "with", "by", "from", "into", "about", "as",
}
# Quoted substrings get priority
quoted = re.findall(r'"([^"]{3,40})"', text)
if quoted:
return " ".join(quoted[:2])[:MAX_QUERY_LEN]
# Named entity sequences (capitalized consecutive words)
named_entities = re.findall(r"(?:[A-Z][a-z]+)(?:\s+[A-Z][a-z]+){0,2}", text)
# Numbers with unit context
numeric = re.findall(r"\d+(?:\.\d+)?\s*(?:%|billion|million|thousand|people|cases|deaths|km|mph)", text)
candidates: list[str] = named_entities + numeric
# Fall back to all non-stop words
if not candidates:
candidates = [w for w in text.split() if w.lower() not in stop_words and len(w) > 3]
keywords = list(dict.fromkeys(candidates))[:max_keywords] # deduplicate, preserve order
query = " ".join(keywords)[:MAX_QUERY_LEN]
return query or text[:100]
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0),
reraise=False,
)
async def _search_x_volume(bearer_token: str, query: str) -> int:
"""Query X API v2 recent search to estimate 7-day tweet volume."""
async with httpx.AsyncClient(timeout=8.0) as client:
response = await client.get(
X_SEARCH_URL,
params={
"query": query,
"max_results": 10,
"tweet.fields": "created_at,public_metrics",
"start_time": _iso_7d_ago(),
},
headers={"Authorization": f"Bearer {bearer_token}"},
)
if response.status_code == 429:
log.warning("x_api.rate_limited")
return -1 # Signal to retry
response.raise_for_status()
data = response.json()
meta = data.get("meta", {})
return meta.get("result_count", 0)
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0),
reraise=False,
)
async def _check_community_notes(bearer_token: str, query: str) -> tuple[bool, str | None]:
"""
Check X Community Notes API for active notes matching the query.
Returns (has_note, note_text).
"""
async with httpx.AsyncClient(timeout=8.0) as client:
try:
response = await client.get(
X_COMMUNITY_NOTES_URL,
params={"query": query[:200], "max_results": 5},
headers={"Authorization": f"Bearer {bearer_token}"},
)
if response.status_code in (401, 403):
log.warning("x_api.notes_unauthorized")
return False, None
if response.status_code == 429:
return False, None
response.raise_for_status()
data = response.json()
notes = data.get("data", [])
if notes:
top_note = notes[0]
return True, top_note.get("text", "Community Note found.")[:400]
return False, None
except Exception as exc:
log.debug("x_api.notes_error", error=str(exc))
return False, None
async def query_x_sensor(claim_text: str, claim_hash: str) -> XSensorResult:
"""
Main entry point for the X sensor.
Falls back to deterministic mock data when bearer token is absent.
"""
bearer_token = os.getenv("X_BEARER_TOKEN", "")
query = _extract_keywords(claim_text)
if not bearer_token:
log.debug("x_sensor.mock_mode", reason="X_BEARER_TOKEN not set")
return _mock_result(claim_hash, query)
try:
# Run both queries concurrently
import asyncio
velocity_task = asyncio.create_task(_search_x_volume(bearer_token, query))
notes_task = asyncio.create_task(_check_community_notes(bearer_token, query))
velocity, (has_note, note_text) = await asyncio.gather(
velocity_task, notes_task
)
return XSensorResult(
velocity=max(0, velocity),
community_note=has_note,
note_text=note_text,
query_used=query,
source="api",
)
except Exception as exc:
log.warning("x_sensor.api_error", error=str(exc), fallback="mock")
return _mock_result(claim_hash, query)
def _mock_result(claim_hash: str, query: str) -> XSensorResult:
"""
Deterministic mock based on hash — same claim always gets same mock result.
Simulates a realistic distribution of X sensor outcomes.
"""
# Use first 2 bytes of hash as seed for deterministic variation
seed = int(claim_hash[:4], 16) if len(claim_hash) >= 4 else 0
velocity_options = [12, 87, 340, 1250, 5800, 23000, 91000]
velocity = velocity_options[seed % len(velocity_options)]
has_note = (seed % 7) == 0 # ~14% chance of community note
note_text = (
"This claim contains misleading context. Several independent "
"fact-checkers have rated it as partly false."
if has_note else None
)
return XSensorResult(
velocity=velocity,
community_note=has_note,
note_text=note_text,
query_used=query,
source="mock",
)
def _iso_7d_ago() -> str:
"""ISO 8601 timestamp for 7 days ago (X API format)."""
seven_days_ago = time.time() - (7 * 24 * 3600)
from datetime import datetime, timezone
return datetime.fromtimestamp(seven_days_ago, tz=timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)