| """ |
| producers/consumer.py β Aggregate consumer for all platform topics. |
| |
| Reads from raw.twitter, raw.instagram, raw.youtube concurrently. |
| Deduplicates by content hash (xxhash), upserts into Qdrant + Memgraph. |
| |
| Architecture: |
| Redpanda topics β AIOKafkaConsumer β dedup Set β embed (BGE-M3) |
| β Qdrant upsert |
| β Memgraph MERGE |
| """ |
| from __future__ import annotations |
| import asyncio, json, os, time, xxhash |
| from typing import Any |
| from aiokafka import AIOKafkaConsumer |
|
|
| BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092") |
| TOPICS = ["raw.twitter", "raw.instagram", "raw.youtube"] |
| CONSUMER_GROUP = "fact-engine-ingest" |
|
|
| |
| _seen_hashes: set[str] = set() |
|
|
|
|
| async def upsert_to_qdrant(doc: dict[str, Any], vector: list[float]) -> None: |
| """Upsert claim document + embedding into Qdrant 'claims' collection.""" |
| try: |
| from qdrant_client import QdrantClient |
| from qdrant_client.models import PointStruct |
| client = QdrantClient(url=os.getenv("QDRANT_URL", "http://localhost:6333"), timeout=5) |
| content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest() |
| point = PointStruct( |
| id=abs(hash(content_hash)) % (2**63), |
| vector=vector, |
| payload={ |
| "text": doc["text"][:1000], |
| "source_url": doc.get("source_url", ""), |
| "domain": doc.get("domain", "unknown"), |
| "author_verified": doc.get("verified", False), |
| "platform": doc.get("platform", "web"), |
| "ingested_at": time.time(), |
| "hash": content_hash, |
| }, |
| ) |
| client.upsert(collection_name="claims", points=[point]) |
| except Exception as exc: |
| print(f"[consumer] Qdrant upsert error: {exc}") |
|
|
|
|
| async def upsert_to_memgraph(doc: dict[str, Any]) -> None: |
| """MERGE Author β Claim node in Memgraph trust graph.""" |
| try: |
| import neo4j |
| content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest() |
| driver = neo4j.GraphDatabase.driver( |
| f"bolt://{os.getenv('MEMGRAPH_HOST','localhost')}:{os.getenv('MEMGRAPH_PORT','7687')}", |
| auth=("", ""), |
| ) |
| cypher = """ |
| MERGE (a:Author {handle: $handle}) |
| SET a.verified = $verified, a.account_type = $account_type |
| MERGE (c:Claim {hash: $hash}) |
| SET c.text = $text, c.ingested_at = $ts |
| MERGE (a)-[:REPORTED {timestamp: $ts}]->(c) |
| """ |
| with driver.session() as session: |
| session.run(cypher, { |
| "handle": doc.get("author", "unknown"), |
| "verified": doc.get("verified", False), |
| "account_type": doc.get("account_type", "user"), |
| "hash": content_hash, |
| "text": doc["text"][:500], |
| "ts": time.time(), |
| }) |
| driver.close() |
| except Exception as exc: |
| print(f"[consumer] Memgraph upsert error: {exc}") |
|
|
|
|
| async def process_message(msg_value: dict[str, Any]) -> None: |
| """Deduplicate β embed β upsert to both stores.""" |
| text = msg_value.get("text", "").strip() |
| if not text or len(text.split()) < 5: |
| return |
|
|
| content_hash = xxhash.xxh64(text.encode()).hexdigest() |
| if content_hash in _seen_hashes: |
| return |
| _seen_hashes.add(content_hash) |
| |
| if len(_seen_hashes) > 100_000: |
| _seen_hashes.clear() |
|
|
| |
| try: |
| from fastembed import TextEmbedding |
| model = TextEmbedding(model_name="BAAI/bge-m3", max_length=512, |
| cache_dir=os.getenv("EMBED_CACHE_DIR", "/tmp/fastembed_cache")) |
| vector = list(next(model.embed([text]))) |
| except Exception: |
| vector = [0.0] * 1024 |
|
|
| await asyncio.gather( |
| upsert_to_qdrant(msg_value, vector), |
| upsert_to_memgraph(msg_value), |
| ) |
| print(f"[consumer] Ingested: {text[:60]}... (hash={content_hash[:8]})") |
|
|
|
|
| async def run_consumer() -> None: |
| consumer = AIOKafkaConsumer( |
| *TOPICS, |
| bootstrap_servers=BROKERS, |
| group_id=CONSUMER_GROUP, |
| value_deserializer=lambda v: json.loads(v.decode()), |
| auto_offset_reset="latest", |
| enable_auto_commit=True, |
| ) |
| await consumer.start() |
| print(f"[consumer] Subscribed to {TOPICS}") |
| try: |
| async for msg in consumer: |
| await process_message(msg.value) |
| finally: |
| await consumer.stop() |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(run_consumer()) |
|
|