modelx / src /nodes /socialAgentNode.py
nivakaran's picture
Upload folder using huggingface_hub
b4c4175 verified
"""
src/nodes/socialAgentNode.py
MODULAR - Social Agent Node with Subgraph Architecture
Monitors trending topics, events, people, social intelligence across geographic scopes
Updated: Uses Tool Factory pattern for parallel execution safety.
Each agent instance gets its own private set of tools.
Updated: Now loads user-defined keywords and profiles from intel config.
"""
import json
import uuid
import os
from typing import Dict, Any, List
from datetime import datetime
from src.states.socialAgentState import SocialAgentState
from src.utils.tool_factory import create_tool_set
from src.llms.groqllm import GroqLLM
def load_intel_config() -> dict:
"""Load intel config from JSON file (same as main.py)."""
config_path = os.path.join(
os.path.dirname(__file__), "..", "..", "data", "intel_config.json"
)
default_config = {
"user_profiles": {"twitter": [], "facebook": [], "linkedin": []},
"user_keywords": [],
"user_products": [],
}
try:
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return default_config
class SocialAgentNode:
"""
Modular Social Agent - Geographic social intelligence collection.
Module 1: Trending Topics (Sri Lanka specific trends)
Module 2: Social Media (Sri Lanka, Asia, World scopes)
Module 3: Feed Generation (Categorize, Summarize, Format)
Module 4: User-Defined Keywords & Profiles (from frontend config)
Thread Safety:
Each SocialAgentNode instance creates its own private ToolSet,
enabling safe parallel execution with other agents.
"""
def __init__(self, llm=None):
"""Initialize with Groq LLM and private tool set"""
# Create PRIVATE tool instances for this agent
# This enables parallel execution without shared state conflicts
self.tools = create_tool_set()
if llm is None:
groq = GroqLLM()
self.llm = groq.get_llm()
else:
self.llm = llm
# Load user-defined intel config (keywords, profiles, products)
self.intel_config = load_intel_config()
self.user_keywords = self.intel_config.get("user_keywords", [])
self.user_profiles = self.intel_config.get("user_profiles", {})
self.user_products = self.intel_config.get("user_products", [])
print(
f"[SocialAgent] Loaded {len(self.user_keywords)} user keywords, "
f"{sum(len(v) for v in self.user_profiles.values())} profiles"
)
# Geographic scopes
self.geographic_scopes = {
"sri_lanka": ["sri lanka", "colombo", "srilanka"],
"asia": [
"india",
"pakistan",
"bangladesh",
"maldives",
"singapore",
"malaysia",
"thailand",
],
"world": ["global", "international", "breaking news", "world events"],
}
# Trending categories
self.trending_categories = [
"events",
"people",
"viral",
"breaking",
"technology",
"culture",
]
# ============================================
# MODULE 1: TRENDING TOPICS COLLECTION
# ============================================
def collect_sri_lanka_trends(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 1: Collect Sri Lankan trending topics
"""
print("[MODULE 1] Collecting Sri Lankan Trending Topics")
trending_results = []
# Twitter - Sri Lanka Trends
try:
twitter_tool = self.tools.get("scrape_twitter")
if twitter_tool:
twitter_data = twitter_tool.invoke(
{"query": "sri lanka trending viral", "max_items": 20}
)
trending_results.append(
{
"source_tool": "scrape_twitter",
"raw_content": str(twitter_data),
"category": "trending",
"scope": "sri_lanka",
"platform": "twitter",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Twitter Sri Lanka Trends")
except Exception as e:
print(f" ⚠️ Twitter error: {e}")
# Reddit - Sri Lanka
try:
reddit_tool = self.tools.get("scrape_reddit")
if reddit_tool:
reddit_data = reddit_tool.invoke(
{
"keywords": [
"sri lanka trending",
"sri lanka viral",
"sri lanka news",
],
"limit": 20,
"subreddit": "srilanka",
}
)
trending_results.append(
{
"source_tool": "scrape_reddit",
"raw_content": str(reddit_data),
"category": "trending",
"scope": "sri_lanka",
"platform": "reddit",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Reddit Sri Lanka Trends")
except Exception as e:
print(f" ⚠️ Reddit error: {e}")
return {
"worker_results": trending_results,
"latest_worker_results": trending_results,
}
# ============================================
# MODULE 2: SOCIAL MEDIA COLLECTION
# ============================================
def collect_sri_lanka_social_media(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 2A: Collect Sri Lankan social media across all platforms
"""
print("[MODULE 2A] Collecting Sri Lankan Social Media")
social_results = []
# Twitter - Sri Lanka Events & People
try:
twitter_tool = self.tools.get("scrape_twitter")
if twitter_tool:
twitter_data = twitter_tool.invoke(
{"query": "sri lanka events people celebrities", "max_items": 15}
)
social_results.append(
{
"source_tool": "scrape_twitter",
"raw_content": str(twitter_data),
"category": "social",
"scope": "sri_lanka",
"platform": "twitter",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Twitter Sri Lanka Social")
except Exception as e:
print(f" ⚠️ Twitter error: {e}")
# Facebook - Sri Lanka
try:
facebook_tool = self.tools.get("scrape_facebook")
if facebook_tool:
facebook_data = facebook_tool.invoke(
{
"keywords": ["sri lanka events", "sri lanka trending"],
"max_items": 10,
}
)
social_results.append(
{
"source_tool": "scrape_facebook",
"raw_content": str(facebook_data),
"category": "social",
"scope": "sri_lanka",
"platform": "facebook",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Facebook Sri Lanka Social")
except Exception as e:
print(f" ⚠️ Facebook error: {e}")
# LinkedIn - Sri Lanka Professional
try:
linkedin_tool = self.tools.get("scrape_linkedin")
if linkedin_tool:
linkedin_data = linkedin_tool.invoke(
{
"keywords": ["sri lanka events", "sri lanka people"],
"max_items": 5,
}
)
social_results.append(
{
"source_tool": "scrape_linkedin",
"raw_content": str(linkedin_data),
"category": "social",
"scope": "sri_lanka",
"platform": "linkedin",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ LinkedIn Sri Lanka Professional")
except Exception as e:
print(f" ⚠️ LinkedIn error: {e}")
# Instagram - Sri Lanka
try:
instagram_tool = self.tools.get("scrape_instagram")
if instagram_tool:
instagram_data = instagram_tool.invoke(
{"keywords": ["srilankaevents", "srilankatrending"], "max_items": 5}
)
social_results.append(
{
"source_tool": "scrape_instagram",
"raw_content": str(instagram_data),
"category": "social",
"scope": "sri_lanka",
"platform": "instagram",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Instagram Sri Lanka")
except Exception as e:
print(f" ⚠️ Instagram error: {e}")
return {
"worker_results": social_results,
"social_media_results": social_results,
}
def collect_asia_social_media(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 2B: Collect Asian regional social media
"""
print("[MODULE 2B] Collecting Asian Regional Social Media")
asia_results = []
# Twitter - Asian Events
try:
twitter_tool = self.tools.get("scrape_twitter")
if twitter_tool:
twitter_data = twitter_tool.invoke(
{
"query": "asia trending india pakistan bangladesh",
"max_items": 15,
}
)
asia_results.append(
{
"source_tool": "scrape_twitter",
"raw_content": str(twitter_data),
"category": "social",
"scope": "asia",
"platform": "twitter",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Twitter Asia Trends")
except Exception as e:
print(f" ⚠️ Twitter error: {e}")
# Facebook - Asia
try:
facebook_tool = self.tools.get("scrape_facebook")
if facebook_tool:
facebook_data = facebook_tool.invoke(
{"keywords": ["asia trending", "india events"], "max_items": 10}
)
asia_results.append(
{
"source_tool": "scrape_facebook",
"raw_content": str(facebook_data),
"category": "social",
"scope": "asia",
"platform": "facebook",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Facebook Asia")
except Exception as e:
print(f" ⚠️ Facebook error: {e}")
# Reddit - Asian subreddits
try:
reddit_tool = self.tools.get("scrape_reddit")
if reddit_tool:
reddit_data = reddit_tool.invoke(
{
"keywords": ["asia trending", "india", "pakistan"],
"limit": 10,
"subreddit": "asia",
}
)
asia_results.append(
{
"source_tool": "scrape_reddit",
"raw_content": str(reddit_data),
"category": "social",
"scope": "asia",
"platform": "reddit",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Reddit Asia")
except Exception as e:
print(f" ⚠️ Reddit error: {e}")
return {"worker_results": asia_results, "social_media_results": asia_results}
def collect_world_social_media(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 2C: Collect world/global trending topics
"""
print("[MODULE 2C] Collecting World Trending Topics")
world_results = []
# Twitter - World Trends
try:
twitter_tool = self.tools.get("scrape_twitter")
if twitter_tool:
twitter_data = twitter_tool.invoke(
{"query": "world trending global breaking news", "max_items": 15}
)
world_results.append(
{
"source_tool": "scrape_twitter",
"raw_content": str(twitter_data),
"category": "social",
"scope": "world",
"platform": "twitter",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Twitter World Trends")
except Exception as e:
print(f" ⚠️ Twitter error: {e}")
# Reddit - World News
try:
reddit_tool = self.tools.get("scrape_reddit")
if reddit_tool:
reddit_data = reddit_tool.invoke(
{
"keywords": ["breaking", "trending", "viral"],
"limit": 15,
"subreddit": "worldnews",
}
)
world_results.append(
{
"source_tool": "scrape_reddit",
"raw_content": str(reddit_data),
"category": "social",
"scope": "world",
"platform": "reddit",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(" ✓ Reddit World News")
except Exception as e:
print(f" ⚠️ Reddit error: {e}")
return {"worker_results": world_results, "social_media_results": world_results}
def collect_user_defined_targets(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 2D: Collect data for USER-DEFINED keywords and profiles.
These are configured via the frontend Intelligence Settings UI.
"""
print("[MODULE 2D] Collecting User-Defined Targets")
user_results = []
# Reload config to get latest user settings
self.intel_config = load_intel_config()
self.user_keywords = self.intel_config.get("user_keywords", [])
self.user_profiles = self.intel_config.get("user_profiles", {})
self.user_products = self.intel_config.get("user_products", [])
# Skip if no user config
if not self.user_keywords and not any(self.user_profiles.values()):
print(" ⏭️ No user-defined targets configured")
return {"worker_results": [], "user_target_results": []}
# ============================================
# Scrape USER KEYWORDS across Twitter
# ============================================
if self.user_keywords:
print(f" 📝 Scraping {len(self.user_keywords)} user keywords...")
twitter_tool = self.tools.get("scrape_twitter")
for keyword in self.user_keywords[:10]: # Limit to 10 keywords
try:
if twitter_tool:
twitter_data = twitter_tool.invoke(
{"query": keyword, "max_items": 5}
)
user_results.append(
{
"source_tool": "scrape_twitter",
"raw_content": str(twitter_data),
"category": "user_keyword",
"scope": "sri_lanka",
"platform": "twitter",
"keyword": keyword,
"timestamp": datetime.utcnow().isoformat(),
}
)
print(f" ✓ Keyword: '{keyword}'")
except Exception as e:
print(f" ⚠️ Keyword '{keyword}' error: {e}")
# ============================================
# Scrape USER PRODUCTS
# ============================================
if self.user_products:
print(f" 📦 Scraping {len(self.user_products)} user products...")
twitter_tool = self.tools.get("scrape_twitter")
for product in self.user_products[:5]: # Limit to 5 products
try:
if twitter_tool:
twitter_data = twitter_tool.invoke(
{
"query": f"{product} review OR {product} Sri Lanka",
"max_items": 3,
}
)
user_results.append(
{
"source_tool": "scrape_twitter",
"raw_content": str(twitter_data),
"category": "user_product",
"scope": "sri_lanka",
"platform": "twitter",
"product": product,
"timestamp": datetime.utcnow().isoformat(),
}
)
print(f" ✓ Product: '{product}'")
except Exception as e:
print(f" ⚠️ Product '{product}' error: {e}")
# ============================================
# Scrape USER TWITTER PROFILES
# ============================================
twitter_profiles = self.user_profiles.get("twitter", [])
if twitter_profiles:
print(f" 👤 Scraping {len(twitter_profiles)} Twitter profiles...")
twitter_tool = self.tools.get("scrape_twitter")
for profile in twitter_profiles[:10]: # Limit to 10 profiles
try:
# Clean profile handle
handle = profile.replace("@", "").strip()
if twitter_tool:
# Search for tweets mentioning this profile
twitter_data = twitter_tool.invoke(
{"query": f"from:{handle} OR @{handle}", "max_items": 5}
)
user_results.append(
{
"source_tool": "scrape_twitter",
"raw_content": str(twitter_data),
"category": "user_profile",
"scope": "sri_lanka",
"platform": "twitter",
"profile": f"@{handle}",
"timestamp": datetime.utcnow().isoformat(),
}
)
print(f" ✓ Profile: @{handle}")
except Exception as e:
print(f" ⚠️ Profile @{profile} error: {e}")
print(f" ✅ User targets: {len(user_results)} results collected")
return {"worker_results": user_results, "user_target_results": user_results}
# ============================================
# MODULE 3: FEED GENERATION
# ============================================
def categorize_by_geography(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 3A: Categorize all collected results by geographic scope
"""
print("[MODULE 3A] Categorizing Results by Geography")
all_results = state.get("worker_results", []) or []
# Initialize categories
sri_lanka_data = []
asia_data = []
world_data = []
geographic_data = {"sri_lanka": [], "asia": [], "world": []}
for r in all_results:
scope = r.get("scope", "unknown")
content = r.get("raw_content", "")
# Parse content
try:
data = json.loads(content)
if isinstance(data, dict) and "error" in data:
continue
if isinstance(data, str):
data = json.loads(data)
posts = []
if isinstance(data, list):
posts = data
elif isinstance(data, dict):
posts = data.get("results", []) or data.get("data", [])
if not posts:
posts = [data]
# Categorize
if scope == "sri_lanka":
sri_lanka_data.extend(posts[:10])
geographic_data["sri_lanka"].extend(posts[:10])
elif scope == "asia":
asia_data.extend(posts[:10])
geographic_data["asia"].extend(posts[:10])
elif scope == "world":
world_data.extend(posts[:10])
geographic_data["world"].extend(posts[:10])
except Exception:
continue
# Create structured feeds
structured_feeds = {
"sri lanka": sri_lanka_data,
"asia": asia_data,
"world": world_data,
}
print(
f" ✓ Categorized: {len(sri_lanka_data)} Sri Lanka, {len(asia_data)} Asia, {len(world_data)} World"
)
return {
"structured_output": structured_feeds,
"geographic_feeds": geographic_data,
"sri_lanka_feed": sri_lanka_data,
"asia_feed": asia_data,
"world_feed": world_data,
}
def generate_llm_summary(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 3B: Use Groq LLM to generate executive summary AND structured insights
"""
print("[MODULE 3B] Generating LLM Summary + Structured Insights")
structured_feeds = state.get("structured_output", {})
llm_summary = "AI summary currently unavailable."
llm_insights = []
try:
# Collect sample posts for analysis
all_posts = []
for region, posts in structured_feeds.items():
for p in posts[:5]: # Top 5 per region
text = p.get("text", "") or p.get("title", "")
if text and len(text) > 20:
all_posts.append(f"[{region.upper()}] {text[:200]}")
if not all_posts:
return {"llm_summary": llm_summary, "llm_insights": []}
posts_text = "\n".join(all_posts[:15])
# Generate summary AND structured insights
analysis_prompt = f"""Analyze these social media posts from Sri Lanka and the region. Generate:
1. A 3-sentence executive summary of key trends
2. Up to 5 unique intelligence insights
Posts:
{posts_text}
Respond in this exact JSON format:
{{
"executive_summary": "Brief 3-sentence summary of key social trends and developments",
"insights": [
{{"summary": "Unique insight #1 (not copying post text)", "severity": "low/medium/high", "impact_type": "risk/opportunity"}},
{{"summary": "Unique insight #2", "severity": "low/medium/high", "impact_type": "risk/opportunity"}}
]
}}
Rules:
- Generate NEW insights, don't just copy post text
- Identify patterns and emerging trends
- Classify severity based on potential impact
- Mark positive developments as "opportunity", concerning ones as "risk"
JSON only, no explanation:"""
llm_response = self.llm.invoke(analysis_prompt)
content = (
llm_response.content
if hasattr(llm_response, "content")
else str(llm_response)
)
# Parse JSON response
import re
content = content.strip()
if content.startswith("```"):
content = re.sub(r"^```\w*\n?", "", content)
content = re.sub(r"\n?```$", "", content)
result = json.loads(content)
llm_summary = result.get("executive_summary", llm_summary)
llm_insights = result.get("insights", [])
print(f" ✓ LLM generated {len(llm_insights)} unique insights")
except json.JSONDecodeError as e:
print(f" ⚠️ JSON parse error: {e}")
# Fallback to simple summary
try:
fallback_prompt = f"Summarize these social media trends in 3 sentences:\n{posts_text[:1500]}"
response = self.llm.invoke(fallback_prompt)
llm_summary = (
response.content if hasattr(response, "content") else str(response)
)
except Exception as fallback_error:
print(f" ⚠️ LLM fallback also failed: {fallback_error}")
except Exception as e:
print(f" ⚠️ LLM Error: {e}")
return {"llm_summary": llm_summary, "llm_insights": llm_insights}
def format_final_output(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 3C: Format final feed output with LLM-enhanced insights
"""
print("[MODULE 3C] Formatting Final Output")
llm_summary = state.get("llm_summary", "No summary available")
llm_insights = state.get("llm_insights", []) # NEW: Get LLM-generated insights
structured_feeds = state.get("structured_output", {})
trending_count = len(
[
r
for r in state.get("worker_results", [])
if r.get("category") == "trending"
]
)
social_count = len(
[
r
for r in state.get("worker_results", [])
if r.get("category") == "social"
]
)
sri_lanka_items = len(structured_feeds.get("sri lanka", []))
asia_items = len(structured_feeds.get("asia", []))
world_items = len(structured_feeds.get("world", []))
bulletin = f"""🌏 COMPREHENSIVE SOCIAL INTELLIGENCE FEED
{datetime.utcnow().strftime("%d %b %Y • %H:%M UTC")}
📊 EXECUTIVE SUMMARY (AI-Generated)
{llm_summary}
📈 DATA COLLECTION STATS
• Trending Topics: {trending_count} items
• Social Media Posts: {social_count} items
• Geographic Coverage: Sri Lanka, Asia, World
🔍 GEOGRAPHIC BREAKDOWN
• Sri Lanka: {sri_lanka_items} trending items
• Asia: {asia_items} regional items
• World: {world_items} global items
🌐 COVERAGE CATEGORIES
• Events: Public gatherings, launches, announcements
• People: Influencers, celebrities, public figures
• Viral Content: Trending posts, hashtags, memes
• Breaking: Real-time developments
🎯 INTELLIGENCE FOCUS
Monitoring social sentiment, trending topics, events, and people across:
- Sri Lanka (local intelligence)
- Asia (regional context: India, Pakistan, Bangladesh, ASEAN)
- World (global trends affecting local sentiment)
Source: Multi-platform aggregation (Twitter, Facebook, LinkedIn, Instagram, Reddit)
"""
# Create list for domain_insights (FRONTEND COMPATIBLE)
domain_insights = []
timestamp = datetime.utcnow().isoformat()
# PRIORITY 1: Add LLM-generated unique insights (these are curated and unique)
for insight in llm_insights:
if isinstance(insight, dict) and insight.get("summary"):
domain_insights.append(
{
"source_event_id": str(uuid.uuid4()),
"domain": "social",
"summary": f"🔍 {insight.get('summary', '')}", # Mark as AI-analyzed
"severity": insight.get("severity", "medium"),
"impact_type": insight.get("impact_type", "risk"),
"timestamp": timestamp,
"is_llm_generated": True, # Flag for frontend
}
)
print(f" ✓ Added {len(llm_insights)} LLM-generated insights")
# PRIORITY 2: Add top raw posts only if we need more (fallback)
# Only add raw posts if LLM didn't generate enough insights
if len(domain_insights) < 5:
# Sri Lankan districts for geographic tagging
districts = [
"colombo",
"gampaha",
"kalutara",
"kandy",
"matale",
"nuwara eliya",
"galle",
"matara",
"hambantota",
"jaffna",
"kilinochchi",
"mannar",
"mullaitivu",
"vavuniya",
"puttalam",
"kurunegala",
"anuradhapura",
"polonnaruwa",
"badulla",
"monaragala",
"ratnapura",
"kegalle",
"ampara",
"batticaloa",
"trincomalee",
]
# Add Sri Lanka posts as fallback
sri_lanka_data = structured_feeds.get("sri lanka", [])
for post in sri_lanka_data[:5]:
post_text = post.get("text", "") or post.get("title", "")
if not post_text or len(post_text) < 20:
continue
# Detect district
detected_district = "Sri Lanka"
for district in districts:
if district.lower() in post_text.lower():
detected_district = district.title()
break
# Determine severity
severity = "low"
if any(
kw in post_text.lower()
for kw in ["protest", "riot", "emergency", "violence", "crisis"]
):
severity = "high"
elif any(
kw in post_text.lower()
for kw in ["trending", "viral", "breaking", "update"]
):
severity = "medium"
domain_insights.append(
{
"source_event_id": str(uuid.uuid4()),
"domain": "social",
"summary": f"{detected_district}: {post_text[:200]}",
"severity": severity,
"impact_type": (
"risk" if severity in ["high", "medium"] else "opportunity"
),
"timestamp": timestamp,
"is_llm_generated": False,
}
)
# Add executive summary insight
domain_insights.append(
{
"source_event_id": str(uuid.uuid4()),
"structured_data": structured_feeds,
"domain": "social",
"summary": f"📊 Social Intelligence Summary: {llm_summary[:300]}",
"severity": "medium",
"impact_type": "risk",
"is_llm_generated": True,
}
)
print(f" ✓ Created {len(domain_insights)} total social intelligence insights")
return {
"final_feed": bulletin,
"feed_history": [bulletin],
"domain_insights": domain_insights,
}
# ============================================
# MODULE 4: FEED AGGREGATOR & STORAGE
# ============================================
def aggregate_and_store_feeds(self, state: SocialAgentState) -> Dict[str, Any]:
"""
Module 4: Aggregate, deduplicate, and store feeds
- Check uniqueness using Neo4j (URL + content hash)
- Store unique posts in Neo4j
- Store unique posts in ChromaDB for RAG
- Append to CSV dataset for ML training
"""
print("[MODULE 4] Aggregating and Storing Feeds")
from src.utils.db_manager import (
Neo4jManager,
ChromaDBManager,
extract_post_data,
)
import csv
import os
# Initialize database managers
neo4j_manager = Neo4jManager()
chroma_manager = ChromaDBManager()
# Get all worker results from state
all_worker_results = state.get("worker_results", [])
# Statistics
total_posts = 0
unique_posts = 0
duplicate_posts = 0
stored_neo4j = 0
stored_chroma = 0
stored_csv = 0
# Setup CSV dataset
dataset_dir = os.getenv("DATASET_PATH", "./datasets/social_feeds")
os.makedirs(dataset_dir, exist_ok=True)
csv_filename = f"social_feeds_{datetime.now().strftime('%Y%m')}.csv"
csv_path = os.path.join(dataset_dir, csv_filename)
# CSV headers
csv_headers = [
"post_id",
"timestamp",
"platform",
"category",
"scope",
"poster",
"post_url",
"title",
"text",
"content_hash",
"engagement_score",
"engagement_likes",
"engagement_shares",
"engagement_comments",
"source_tool",
]
# Check if CSV exists to determine if we need to write headers
file_exists = os.path.exists(csv_path)
try:
# Open CSV file in append mode
with open(csv_path, "a", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
# Write headers if new file
if not file_exists:
writer.writeheader()
print(f" ✓ Created new CSV dataset: {csv_path}")
else:
print(f" ✓ Appending to existing CSV: {csv_path}")
# Process each worker result
for worker_result in all_worker_results:
category = worker_result.get("category", "unknown")
platform = worker_result.get("platform", "unknown")
source_tool = worker_result.get("source_tool", "")
scope = worker_result.get("scope", "")
# Parse raw content
raw_content = worker_result.get("raw_content", "")
if not raw_content:
continue
try:
# Try to parse JSON content
if isinstance(raw_content, str):
data = json.loads(raw_content)
else:
data = raw_content
# Handle different data structures
posts = []
if isinstance(data, list):
posts = data
elif isinstance(data, dict):
# Check for common result keys
posts = (
data.get("results")
or data.get("data")
or data.get("posts")
or data.get("items")
or []
)
# If still empty, treat the dict itself as a post
if not posts and (data.get("title") or data.get("text")):
posts = [data]
# Process each post
for raw_post in posts:
total_posts += 1
# Skip if error object
if isinstance(raw_post, dict) and "error" in raw_post:
continue
# Extract normalized post data
post_data = extract_post_data(
raw_post=raw_post,
category=category,
platform=platform,
source_tool=source_tool,
)
if not post_data:
continue
# Check uniqueness with Neo4j
is_dup = neo4j_manager.is_duplicate(
post_url=post_data["post_url"],
content_hash=post_data["content_hash"],
)
if is_dup:
duplicate_posts += 1
continue
# Unique post - store it
unique_posts += 1
# Store in Neo4j
if neo4j_manager.store_post(post_data):
stored_neo4j += 1
# Store in ChromaDB
if chroma_manager.add_document(post_data):
stored_chroma += 1
# Store in CSV
try:
csv_row = {
"post_id": post_data["post_id"],
"timestamp": post_data["timestamp"],
"platform": post_data["platform"],
"category": post_data["category"],
"scope": scope,
"poster": post_data["poster"],
"post_url": post_data["post_url"],
"title": post_data["title"],
"text": post_data["text"],
"content_hash": post_data["content_hash"],
"engagement_score": post_data["engagement"].get(
"score", 0
),
"engagement_likes": post_data["engagement"].get(
"likes", 0
),
"engagement_shares": post_data["engagement"].get(
"shares", 0
),
"engagement_comments": post_data["engagement"].get(
"comments", 0
),
"source_tool": post_data["source_tool"],
}
writer.writerow(csv_row)
stored_csv += 1
except Exception as e:
print(f" ⚠️ CSV write error: {e}")
except Exception as e:
print(f" ⚠️ Error processing worker result: {e}")
continue
except Exception as e:
print(f" ⚠️ CSV file error: {e}")
# Close database connections
neo4j_manager.close()
# Print statistics
print("\n 📊 AGGREGATION STATISTICS")
print(f" Total Posts Processed: {total_posts}")
print(f" Unique Posts: {unique_posts}")
print(f" Duplicate Posts: {duplicate_posts}")
print(f" Stored in Neo4j: {stored_neo4j}")
print(f" Stored in ChromaDB: {stored_chroma}")
print(f" Stored in CSV: {stored_csv}")
print(f" Dataset Path: {csv_path}")
# Get database counts
neo4j_total = neo4j_manager.get_post_count() if neo4j_manager.driver else 0
chroma_total = (
chroma_manager.get_document_count() if chroma_manager.collection else 0
)
print("\n 💾 DATABASE TOTALS")
print(f" Neo4j Total Posts: {neo4j_total}")
print(f" ChromaDB Total Docs: {chroma_total}")
return {
"aggregator_stats": {
"total_processed": total_posts,
"unique_posts": unique_posts,
"duplicate_posts": duplicate_posts,
"stored_neo4j": stored_neo4j,
"stored_chroma": stored_chroma,
"stored_csv": stored_csv,
"neo4j_total": neo4j_total,
"chroma_total": chroma_total,
},
"dataset_path": csv_path,
}