Spaces:
Running
Running
| import asyncpraw | |
| import asyncio | |
| import os | |
| import torch | |
| import pickle | |
| import sys | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from flask import Flask | |
| from models import db, DisasterPost | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| from ner_extractor import extract_entities | |
| from huggingface_hub import hf_hub_download | |
| # Force prints to appear immediately in Hugging Face logs | |
| def log(msg): | |
| print(msg, flush=True) | |
| log("🚀 INGEST SCRIPT LAUNCHED! Initializing...") | |
| # 1. Config & Setup | |
| SUBREDDITS = "AlistoSimulation" | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # Load .env (Try multiple locations) | |
| env_path_1 = os.path.join(BASE_DIR, '../.env') | |
| if os.path.exists(env_path_1): | |
| load_dotenv(env_path_1) | |
| log("✅ Loaded .env from alisto_project folder") | |
| else: | |
| log("⚠️ No .env file found in alisto_project folder") | |
| app = Flask(__name__) | |
| DB_PATH = os.path.join(BASE_DIR, 'alisto.db') | |
| app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}' | |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
| app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {'connect_args': {'timeout': 15}} | |
| db.init_app(app) | |
| # 2. Load Models | |
| # FIXED: Points to the Cloud Repository, not a local folder | |
| MODEL_ID = "Quivara/alisto-brain" | |
| log("🧠 Loading ALISTO Brains from Cloud (This takes 1-2 mins)...") | |
| try: | |
| # Load Tokenizer & Model from Hugging Face Hub | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, subfolder="roberta_model") | |
| roberta_model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID, subfolder="roberta_model", num_labels=2) | |
| device = torch.device("cpu") | |
| roberta_model.to(device) | |
| roberta_model.eval() | |
| log(f"✅ Context Expert loaded from {MODEL_ID}") | |
| except Exception as e: | |
| log(f"❌ Error loading Model: {e}") | |
| # We exit here because the app is useless without the brain | |
| sys.exit(1) | |
| try: | |
| log("📥 Downloading Gatekeeper (TF-IDF)...") | |
| tfidf_path = hf_hub_download(repo_id=MODEL_ID, filename="tfidf_ensemble.pkl") | |
| with open(tfidf_path, 'rb') as f: | |
| tfidf_model = pickle.load(f) | |
| log("✅ Gatekeeper (TF-IDF) loaded") | |
| except Exception as e: | |
| log(f"❌ Error loading TF-IDF (Ignore warnings): {e}") | |
| tfidf_model = None | |
| # 3. Helpers (Logic & Filters) | |
| PHILIPPINE_LOCATIONS = [ | |
| "Philippines", "PH", "Luzon", "Visayas", "Mindanao", "Metro Manila", "NCR", | |
| "Manila", "Quezon City", "Makati", "Taguig", "Pasig", "Mandaluyong", | |
| "Marikina", "Las Pinas", "Las Piñas", "Muntinlupa", "Caloocan", | |
| "Paranaque", "Parañaque", "Valenzuela", "Pasay", "Malabon", | |
| "Navotas", "San Juan", "Pateros", | |
| "Cavite", "Naic", "Bacoor", "Imus", "Dasmarinas", "Dasmariñas", | |
| "General Trias", "Tagaytay", "Kawit", "Noveleta", "Rosario", "Tanza", | |
| "Silang", "Trece Martires", "Laguna", "Calamba", "Santa Rosa", "Binan", | |
| "Biñan", "San Pedro", "Cabuyao", "Los Banos", "Los Baños", "Rizal", | |
| "Antipolo", "Cainta", "Taytay", "San Mateo", "Binangonan", "Batangas", | |
| "Bulacan", "Pampanga", "Tarlac", "Cebu", "Iloilo", "Tacloban", | |
| "Davao", "Cagayan", "Bicol", "Albay", "Isabela" | |
| ] | |
| def is_news_or_irrelevant(text): | |
| text_lower = text.lower() | |
| news_indicators = ["breaking:", "just in:", "news:", "update:", "report:", "mmda", "pagasa"] | |
| financial_indicators = ["gcash", "paypal", "budget", "loan", "selling", "donate"] | |
| irrelevant_contexts = ["how can i help", "thoughts and prayers", "discussion:", "opinion:"] | |
| if any(ind in text_lower for ind in news_indicators): return True, "News/Report" | |
| has_financial = any(ind in text_lower for ind in financial_indicators) | |
| is_life_death = any(k in text_lower for k in ["trapped", "lubog", "roof", "rescue", "drowning"]) | |
| if has_financial and not is_life_death: return True, "Financial/Non-Urgent" | |
| if any(ctx in text_lower for ctx in irrelevant_contexts): return True, "Context/NotUrgent" | |
| return False, None | |
| def predict_urgency(text): | |
| if tfidf_model: | |
| tfidf_probs = tfidf_model.predict_proba([text])[0] | |
| if tfidf_probs[1] < 0.20: return False, tfidf_probs[1], "TF-IDF Reject" | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128) | |
| with torch.no_grad(): | |
| outputs = roberta_model(**inputs) | |
| probs = F.softmax(outputs.logits, dim=-1) | |
| roberta_conf = probs[0][1].item() | |
| return (roberta_conf > 0.4), roberta_conf, "RoBERTa" | |
| def get_disaster_type(text): | |
| text_lower = text.lower() | |
| mapping = { | |
| "Earthquake": ["quake", "lindol", "shake"], "Landslide": ["landslide", "guho"], | |
| "Volcano": ["volcano", "lava", "ash", "taal"], "Fire": ["fire", "sunog", "burn"], | |
| "Typhoon": ["typhoon", "bagyo", "storm"], "Flood": ["flood", "baha", "water", "lubog"] | |
| } | |
| for dtype, keywords in mapping.items(): | |
| if any(k in text_lower for k in keywords): return dtype | |
| return "General Emergency" | |
| def get_assistance_type(text): | |
| text = text.lower() | |
| if any(k in text for k in ["rescue", "trapped", "roof"]): return "Rescue" | |
| if any(k in text for k in ["medical", "doctor", "hospital"]): return "Medical" | |
| if any(k in text for k in ["evacuate", "shelter"]): return "Evacuation" | |
| if any(k in text for k in ["food", "water"]): return "Food/Water" | |
| return "General Assistance" | |
| def assign_dynamic_urgency(text): | |
| text_lower = text.lower() | |
| high_keywords = ["bleeding", "unconscious", "life threatening", "trap", "trapped", "drowning", "lubog"] | |
| medium_keywords = ["stranded", "running out", "evacuate", "lowbat", "senior"] | |
| if any(k in text_lower for k in high_keywords): return "High" | |
| if any(k in text_lower for k in medium_keywords): return "Medium" | |
| return "Low" | |
| # 4. Processing Logic | |
| async def process_post(post): | |
| try: | |
| full_text = f"{post.title} {post.selftext}" | |
| with app.app_context(): | |
| exists = DisasterPost.query.filter_by(reddit_id=post.id).first() | |
| if exists: return | |
| # Filters | |
| is_bad, reason = is_news_or_irrelevant(full_text) | |
| if is_bad: return | |
| is_urgent, score, source = predict_urgency(full_text) | |
| if not is_urgent: return | |
| # Extraction | |
| ner_results = extract_entities(full_text) | |
| city_location = ner_results.get('location', "Unknown Location") | |
| if isinstance(city_location, list): location = city_location[0] if city_location else "Unknown Location" | |
| else: location = city_location | |
| disaster_type = get_disaster_type(full_text) | |
| dynamic_urgency = assign_dynamic_urgency(full_text) | |
| # Determine Author | |
| contact_person = ner_results.get('contact_person_name', None) | |
| final_author = contact_person if contact_person else str(post.author) | |
| log(f"🚨 ALERT SAVED: {disaster_type} in {location} ({dynamic_urgency})") | |
| # Save to DB | |
| new_post = DisasterPost( | |
| reddit_id=post.id, | |
| title=post.title, | |
| content=post.selftext or post.title, | |
| author=final_author, | |
| location=location, | |
| full_address=ner_results.get('full_address', "Check Post"), | |
| contact_number=ner_results.get('contact', None), | |
| disaster_type=disaster_type, | |
| assistance_type=get_assistance_type(full_text), | |
| urgency_level=dynamic_urgency, | |
| is_help_request=True, | |
| status='New', | |
| timestamp=datetime.utcfromtimestamp(post.created_utc) | |
| ) | |
| with app.app_context(): | |
| db.session.add(new_post) | |
| db.session.commit() | |
| except Exception as e: | |
| log(f"Processing Error: {e}") | |
| # 5. Main Loop (POLLING MODE - The Fix for Hugging Face) | |
| async def scrape_reddit(): | |
| log("🔌 Connecting to Reddit API (Polling Mode)...") | |
| client_id = os.getenv("REDDIT_CLIENT_ID") | |
| client_secret = os.getenv("REDDIT_CLIENT_SECRET") | |
| if not client_id or not client_secret: | |
| log("❌ CRITICAL ERROR: Client ID or Secret missing in .env") | |
| return | |
| # Authenticate | |
| reddit = asyncpraw.Reddit( | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| user_agent=os.getenv("REDDIT_USER_AGENT"), | |
| username=os.getenv("REDDIT_USERNAME"), | |
| password=os.getenv("REDDIT_PASSWORD") | |
| ) | |
| log(f"👁️ ALISTO ACTIVE: Polling r/{SUBREDDITS} every 60s...") | |
| last_id = None | |
| while True: | |
| try: | |
| subreddit = await reddit.subreddit(SUBREDDITS) | |
| # Fetch ONLY 1 post to minimize bandwidth and look like a human | |
| async for post in subreddit.new(limit=1): | |
| if post.id != last_id: | |
| log(f"📥 New Post Detected: {post.title}") | |
| await process_post(post) | |
| last_id = post.id | |
| else: | |
| # Silence "no new post" messages to keep logs clean | |
| pass | |
| # Wait 60 seconds (The Fix for 403 Error) | |
| await asyncio.sleep(60) | |
| except Exception as e: | |
| log(f"⚠️ Connection glitch (Retrying in 2m): {e}") | |
| await asyncio.sleep(120) | |
| await reddit.close() | |
| if __name__ == "__main__": | |
| try: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| loop.run_until_complete(scrape_reddit()) | |
| except KeyboardInterrupt: | |
| log("\n🛑 Stopped by user") |