| """ |
| grok_sensor.py — X API v2 sensor with Community Notes and full mock fallback. |
| |
| Queries: |
| 1. X API v2 search/recent: 7-day tweet volume for claim keywords |
| 2. Community Notes API: active notes matching claim hash |
| |
| Returns: |
| XSensorResult: {velocity, community_note, note_text, query_used} |
| |
| Design decisions: |
| - All I/O is async (httpx.AsyncClient) |
| - Full mock fallback when X_BEARER_TOKEN is absent (common in dev/CI) |
| - Rate-limit aware: exponential backoff via tenacity |
| - Keyword extraction: noun phrases + named entities via regex (no spaCy dependency) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import os |
| import re |
| import time |
| from dataclasses import dataclass |
| from typing import Optional |
|
|
| import httpx |
| import structlog |
| from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
| log = structlog.get_logger(__name__) |
|
|
| X_SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent" |
| X_COMMUNITY_NOTES_URL = "https://api.twitter.com/2/notes/search" |
|
|
| MAX_QUERY_LEN = 512 |
|
|
|
|
| @dataclass |
| class XSensorResult: |
| velocity: int |
| community_note: bool |
| note_text: str | None |
| query_used: str |
| source: str = "api" |
|
|
|
|
| def _extract_keywords(text: str, max_keywords: int = 6) -> str: |
| """ |
| Extract query keywords from claim text using patterns: |
| - Quoted strings (already specific) |
| - Named entities: capitalized sequences of 1-3 words |
| - Numbers with context |
| Strips stop words to improve query precision. |
| """ |
| stop_words = { |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", |
| "have", "has", "had", "do", "did", "will", "would", "could", |
| "should", "may", "might", "shall", "can", "this", "that", |
| "these", "those", "it", "its", "their", "they", "we", "you", |
| "i", "he", "she", "and", "or", "but", "in", "on", "at", "to", |
| "for", "of", "with", "by", "from", "into", "about", "as", |
| } |
|
|
| |
| quoted = re.findall(r'"([^"]{3,40})"', text) |
| if quoted: |
| return " ".join(quoted[:2])[:MAX_QUERY_LEN] |
|
|
| |
| named_entities = re.findall(r"(?:[A-Z][a-z]+)(?:\s+[A-Z][a-z]+){0,2}", text) |
| |
| numeric = re.findall(r"\d+(?:\.\d+)?\s*(?:%|billion|million|thousand|people|cases|deaths|km|mph)", text) |
|
|
| candidates: list[str] = named_entities + numeric |
| |
| if not candidates: |
| candidates = [w for w in text.split() if w.lower() not in stop_words and len(w) > 3] |
|
|
| keywords = list(dict.fromkeys(candidates))[:max_keywords] |
| query = " ".join(keywords)[:MAX_QUERY_LEN] |
| return query or text[:100] |
|
|
|
|
| @retry( |
| stop=stop_after_attempt(2), |
| wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0), |
| reraise=False, |
| ) |
| async def _search_x_volume(bearer_token: str, query: str) -> int: |
| """Query X API v2 recent search to estimate 7-day tweet volume.""" |
| async with httpx.AsyncClient(timeout=8.0) as client: |
| response = await client.get( |
| X_SEARCH_URL, |
| params={ |
| "query": query, |
| "max_results": 10, |
| "tweet.fields": "created_at,public_metrics", |
| "start_time": _iso_7d_ago(), |
| }, |
| headers={"Authorization": f"Bearer {bearer_token}"}, |
| ) |
| if response.status_code == 429: |
| log.warning("x_api.rate_limited") |
| return -1 |
| response.raise_for_status() |
| data = response.json() |
| meta = data.get("meta", {}) |
| return meta.get("result_count", 0) |
|
|
|
|
| @retry( |
| stop=stop_after_attempt(2), |
| wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0), |
| reraise=False, |
| ) |
| async def _check_community_notes(bearer_token: str, query: str) -> tuple[bool, str | None]: |
| """ |
| Check X Community Notes API for active notes matching the query. |
| Returns (has_note, note_text). |
| """ |
| async with httpx.AsyncClient(timeout=8.0) as client: |
| try: |
| response = await client.get( |
| X_COMMUNITY_NOTES_URL, |
| params={"query": query[:200], "max_results": 5}, |
| headers={"Authorization": f"Bearer {bearer_token}"}, |
| ) |
| if response.status_code in (401, 403): |
| log.warning("x_api.notes_unauthorized") |
| return False, None |
| if response.status_code == 429: |
| return False, None |
| response.raise_for_status() |
| data = response.json() |
| notes = data.get("data", []) |
| if notes: |
| top_note = notes[0] |
| return True, top_note.get("text", "Community Note found.")[:400] |
| return False, None |
| except Exception as exc: |
| log.debug("x_api.notes_error", error=str(exc)) |
| return False, None |
|
|
|
|
| async def query_x_sensor(claim_text: str, claim_hash: str) -> XSensorResult: |
| """ |
| Main entry point for the X sensor. |
| Falls back to deterministic mock data when bearer token is absent. |
| """ |
| bearer_token = os.getenv("X_BEARER_TOKEN", "") |
| query = _extract_keywords(claim_text) |
|
|
| if not bearer_token: |
| log.debug("x_sensor.mock_mode", reason="X_BEARER_TOKEN not set") |
| return _mock_result(claim_hash, query) |
|
|
| try: |
| |
| import asyncio |
| velocity_task = asyncio.create_task(_search_x_volume(bearer_token, query)) |
| notes_task = asyncio.create_task(_check_community_notes(bearer_token, query)) |
|
|
| velocity, (has_note, note_text) = await asyncio.gather( |
| velocity_task, notes_task |
| ) |
|
|
| return XSensorResult( |
| velocity=max(0, velocity), |
| community_note=has_note, |
| note_text=note_text, |
| query_used=query, |
| source="api", |
| ) |
| except Exception as exc: |
| log.warning("x_sensor.api_error", error=str(exc), fallback="mock") |
| return _mock_result(claim_hash, query) |
|
|
|
|
| def _mock_result(claim_hash: str, query: str) -> XSensorResult: |
| """ |
| Deterministic mock based on hash — same claim always gets same mock result. |
| Simulates a realistic distribution of X sensor outcomes. |
| """ |
| |
| seed = int(claim_hash[:4], 16) if len(claim_hash) >= 4 else 0 |
| velocity_options = [12, 87, 340, 1250, 5800, 23000, 91000] |
| velocity = velocity_options[seed % len(velocity_options)] |
| has_note = (seed % 7) == 0 |
| note_text = ( |
| "This claim contains misleading context. Several independent " |
| "fact-checkers have rated it as partly false." |
| if has_note else None |
| ) |
| return XSensorResult( |
| velocity=velocity, |
| community_note=has_note, |
| note_text=note_text, |
| query_used=query, |
| source="mock", |
| ) |
|
|
|
|
| def _iso_7d_ago() -> str: |
| """ISO 8601 timestamp for 7 days ago (X API format).""" |
| seven_days_ago = time.time() - (7 * 24 * 3600) |
| from datetime import datetime, timezone |
| return datetime.fromtimestamp(seven_days_ago, tz=timezone.utc).strftime( |
| "%Y-%m-%dT%H:%M:%SZ" |
| ) |
|
|