gng / consumer.py
plexdx's picture
Upload 21 files
f589dab verified
"""
producers/consumer.py β€” Aggregate consumer for all platform topics.
Reads from raw.twitter, raw.instagram, raw.youtube concurrently.
Deduplicates by content hash (xxhash), upserts into Qdrant + Memgraph.
Architecture:
Redpanda topics β†’ AIOKafkaConsumer β†’ dedup Set β†’ embed (BGE-M3)
β†’ Qdrant upsert
β†’ Memgraph MERGE
"""
from __future__ import annotations
import asyncio, json, os, time, xxhash
from typing import Any
from aiokafka import AIOKafkaConsumer
BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092")
TOPICS = ["raw.twitter", "raw.instagram", "raw.youtube"]
CONSUMER_GROUP = "fact-engine-ingest"
# In-process dedup set (resets on restart β€” Redis Bloom filter for production)
_seen_hashes: set[str] = set()
async def upsert_to_qdrant(doc: dict[str, Any], vector: list[float]) -> None:
"""Upsert claim document + embedding into Qdrant 'claims' collection."""
try:
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
client = QdrantClient(url=os.getenv("QDRANT_URL", "http://localhost:6333"), timeout=5)
content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest()
point = PointStruct(
id=abs(hash(content_hash)) % (2**63), # Qdrant needs uint64
vector=vector,
payload={
"text": doc["text"][:1000],
"source_url": doc.get("source_url", ""),
"domain": doc.get("domain", "unknown"),
"author_verified": doc.get("verified", False),
"platform": doc.get("platform", "web"),
"ingested_at": time.time(),
"hash": content_hash,
},
)
client.upsert(collection_name="claims", points=[point])
except Exception as exc:
print(f"[consumer] Qdrant upsert error: {exc}")
async def upsert_to_memgraph(doc: dict[str, Any]) -> None:
"""MERGE Author β†’ Claim node in Memgraph trust graph."""
try:
import neo4j
content_hash = xxhash.xxh64(doc["text"].encode()).hexdigest()
driver = neo4j.GraphDatabase.driver(
f"bolt://{os.getenv('MEMGRAPH_HOST','localhost')}:{os.getenv('MEMGRAPH_PORT','7687')}",
auth=("", ""),
)
cypher = """
MERGE (a:Author {handle: $handle})
SET a.verified = $verified, a.account_type = $account_type
MERGE (c:Claim {hash: $hash})
SET c.text = $text, c.ingested_at = $ts
MERGE (a)-[:REPORTED {timestamp: $ts}]->(c)
"""
with driver.session() as session:
session.run(cypher, {
"handle": doc.get("author", "unknown"),
"verified": doc.get("verified", False),
"account_type": doc.get("account_type", "user"),
"hash": content_hash,
"text": doc["text"][:500],
"ts": time.time(),
})
driver.close()
except Exception as exc:
print(f"[consumer] Memgraph upsert error: {exc}")
async def process_message(msg_value: dict[str, Any]) -> None:
"""Deduplicate β†’ embed β†’ upsert to both stores."""
text = msg_value.get("text", "").strip()
if not text or len(text.split()) < 5:
return
content_hash = xxhash.xxh64(text.encode()).hexdigest()
if content_hash in _seen_hashes:
return
_seen_hashes.add(content_hash)
# Cap set size
if len(_seen_hashes) > 100_000:
_seen_hashes.clear()
# Embed
try:
from fastembed import TextEmbedding
model = TextEmbedding(model_name="BAAI/bge-m3", max_length=512,
cache_dir=os.getenv("EMBED_CACHE_DIR", "/tmp/fastembed_cache"))
vector = list(next(model.embed([text])))
except Exception:
vector = [0.0] * 1024
await asyncio.gather(
upsert_to_qdrant(msg_value, vector),
upsert_to_memgraph(msg_value),
)
print(f"[consumer] Ingested: {text[:60]}... (hash={content_hash[:8]})")
async def run_consumer() -> None:
consumer = AIOKafkaConsumer(
*TOPICS,
bootstrap_servers=BROKERS,
group_id=CONSUMER_GROUP,
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset="latest",
enable_auto_commit=True,
)
await consumer.start()
print(f"[consumer] Subscribed to {TOPICS}")
try:
async for msg in consumer:
await process_message(msg.value)
finally:
await consumer.stop()
if __name__ == "__main__":
asyncio.run(run_consumer())