import os os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" import gc import time import random from datetime import datetime from typing import Dict, List, Tuple, Any import numpy as np from googlesearch import search # import duckduckgo_search as ddg from ddgs import DDGS from deploy.main.claim_verifier import ClaimVerifier from deploy.main.network_analyzer import NetworkAnalyzer from deploy.main.source_credibility_analyzer import SourceCredibilityAnalyzer from deploy.utils.general_utils import extract_domain from deploy.main.predict_clickbait import ClickbaitPredictor import nltk try: nltk.data.find("tokenizers/punkt") nltk.data.find("tokenizers/punkt_tab") except LookupError: nltk.download("punkt") nltk.download("punkt_tab") class FakeNewsDetector: """Main enhanced fact checker with ML integration""" def __init__(self): try: self.source_analyzer = SourceCredibilityAnalyzer() self.claim_verifier = ClaimVerifier() self.network_analyzer = NetworkAnalyzer() self.clickbait_predictor = ClickbaitPredictor() self.analysis_cache: Dict[str, Dict] = {} except Exception as e: print(f"āŒ Error initializing components: {e}") raise def _to_float(self, value: Any, default: float = 0.0) -> float: """Safely convert any numeric value to Python float""" try: if isinstance(value, (np.integer, np.floating)): return float(value) elif isinstance(value, (int, float)): return float(value) else: return default except (ValueError, TypeError): return default def _analyze_clickbait(self, headline: str) -> float: """Analyzes the headline for clickbait characteristics.""" print("🧠 ML Clickbait Analysis...") try: _, clickbait_score, _ = self.clickbait_predictor.predict(headline) clickbait_score = self._to_float(clickbait_score, 0.5) print(f" Clickbait Score: {clickbait_score:.2f}") return clickbait_score except Exception as e: print(f" āŒ Clickbait analysis error: {e}") return 0.5 # Default moderate score def _search_for_sources_old(self, headline: str, num_results: int) -> List[str]: """Searches the web for sources related to the headline.""" print("šŸ”Ž Searching and analyzing sources...") try: time.sleep(random.uniform(1.5, 3.0)) search_results = list(search(headline, num_results=num_results, lang="en")) print(f" Found {len(search_results)} search results") return search_results except Exception as e: print(f" āŒ Search error: {e}") return [] def _search_for_sources(self, headline: str, num_results: int) -> List[str]: """Searches the web for sources related to the headline with DuckDuckGo.""" print("šŸ”Ž Searching and analyzing sources...") try: search_results = [] with DDGS() as ddgs: results = ddgs.text( query=headline, max_results=num_results, region="us-en", safesearch="moderate" ) # Extract URLs from results for result in results: if 'href' in result: search_results.append(result['href']) elif 'link' in result: search_results.append(result['link']) elif 'url' in result: search_results.append(result['url']) if search_results: print(f" Found {len(search_results)} search results via DuckDuckGo") return search_results else: print(" No results returned from DuckDuckGo") return [] except ImportError: print(f" Install ddgs: pip install ddgs") return [] except Exception as e: print(f" āŒ DuckDuckGo search error: {e}") return [] def _analyze_source_credibility( self, search_results: List[str] ) -> Tuple[float, int, int]: """Analyzes the credibility of the found source domains.""" print("šŸ“Š Analyzing source credibility...") if not search_results: print(" āŒ No search results to analyze") return 0.1, 0, 0 source_scores = [] trusted_count = 0 suspicious_count = 0 for i, url in enumerate(search_results): try: domain = extract_domain(url) credibility_score = self.source_analyzer.analyze_domain_credibility( domain ) credibility_score = self._to_float(credibility_score, 0.5) source_scores.append(credibility_score) if credibility_score > 0.7: trusted_count += 1 print(f" {i+1}. {domain} āœ… ({credibility_score:.2f})") elif credibility_score < 0.3: suspicious_count += 1 print(f" {i+1}. {domain} āŒ ({credibility_score:.2f})") else: print(f" {i+1}. {domain} ā“ ({credibility_score:.2f})") except Exception as e: print(f" āŒ Error analyzing {url}: {e}") source_scores.append(0.3) # Default neutral score # Use regular Python mean instead of np.mean avg_credibility = ( sum(source_scores) / len(source_scores) if source_scores else 0.1 ) return avg_credibility, trusted_count, suspicious_count def _analyze_network_propagation( self, search_results: List[str] ) -> Dict[str, float]: """Analyzes the propagation pattern of the news across the network.""" print("🌐 Network Propagation Analysis...") if not search_results: print(" āŒ No search results for network analysis") return {"score": 0.1, "domain_diversity": 0.0} try: network_analysis = self.network_analyzer.analyze_propagation_pattern( search_results ) # Convert all values to Python floats result = { "score": self._to_float(network_analysis.get("score", 0.1)), "domain_diversity": self._to_float( network_analysis.get("domain_diversity", 0.0) ), } print(f" Propagation Score: {result['score']:.2f}") print(f" Domain Diversity: {result['domain_diversity']:.2f}") return result except Exception as e: print(f" āŒ Network analysis error: {e}") return {"score": 0.1, "domain_diversity": 0.0} def _verify_claim(self, headline: str, search_results: List[str]) -> Dict[str, Any]: """Verifies the claim against the content of the found sources.""" print("āœ… Verifying Claims...") if not search_results: print(" āŒ No search results for claim verification") return {"score": 0.3, "source_details": []} try: verification = self.claim_verifier.verify_claim_against_sources( headline, search_results ) claim_verification_score = self._to_float(verification.get("score", 0.3)) source_details = verification.get("source_details", []) print(f" '{headline}': {claim_verification_score:.2f}") return {"score": claim_verification_score, "source_details": source_details} except Exception as e: print(f" āŒ Claim verification error: {e}") return {"score": 0.3, "source_details": []} def _calculate_final_score_and_verdict( self, component_scores: Dict[str, float] ) -> Tuple[float, str, str]: """Calculates the final weighted score and determines the verdict.""" weights = { "source_credibility": 0.35, "claim_verification": 0.35, "network_propagation": 0.20, "clickbait_detection": 0.10, } final_score = sum( component_scores.get(component, 0.0) * weight for component, weight in weights.items() ) if final_score >= 0.75: verdict = "Credible — Backed by Evidence" confidence = "Very High" elif final_score >= 0.60: verdict = "Likely True — Supported by Sources" confidence = "High" elif final_score >= 0.45: verdict = "Unclear — Conflicting Information" confidence = "Moderate" elif final_score >= 0.30: verdict = "Doubtful — Weak or Biased Evidence" confidence = "Low" else: verdict = "False or Misleading — No Basis Found" confidence = "Very Low" return final_score, verdict, confidence def _print_summary(self, results: Dict): """Prints a formatted summary of the analysis results.""" final_verdict = results["final_verdict"] components = results["components"] print(f"šŸ“ˆ COMPREHENSIVE ANALYSIS RESULTS:") print( f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" ) print(f"šŸŽÆ Final Score: {final_verdict['score']:.2f}/1.000") print(f"šŸ† Verdict: {final_verdict['verdict']}") print(f"šŸ“Š Confidence: {final_verdict['confidence']}") print(f"šŸ” Component Breakdown:") for component, score in final_verdict["components"].items(): print(f" • {component.replace('_', ' ').title()}: {score:.2f}") print(f"šŸ“‹ Summary:") print( f" • Trusted Sources: {components['source_credibility']['trusted_count']}" ) print( f" • Suspicious Sources: {components['source_credibility']['suspicious_count']}" ) print( f" • Clickbait Score: {components['clickbait']['score']:.2f} (lower is better)" ) print(f" • Domain Diversity: {components['network']['domain_diversity']:.2f}") print( f" • Source Details: {components['claim_verification']['source_details'][0:1]}" ) def comprehensive_verify( self, raw_headline: str, results_to_check: int = 8 ) -> Dict: """ Comprehensive fact-checking with ML integration. This method orchestrates the analysis by calling various specialized components. """ if raw_headline in self.analysis_cache: print(f'\nāœ… Using Cached Analysis: "{raw_headline}"') print("=" * 80) cached_result = self.analysis_cache[raw_headline] self._print_summary(cached_result) return cached_result print(f'\nšŸ”Ž Comprehensive Analysis: "{raw_headline}"') print("=" * 80) if not raw_headline or not raw_headline.strip(): print("āŒ Empty or invalid headline provided") return { "headline": "", "timestamp": datetime.now().isoformat(), "final_verdict": { "verdict": "āŒ Invalid Input", "confidence": "Very High", "score": 0.0, "components": { "claim_verification": 0.0, "source_credibility": 0.0, "clickbait_detection": 0.0, "network_propagation": 0.0, }, }, "components": { "clickbait": {"score": 0.0}, "source_credibility": { "score": 0.0, "trusted_count": 0, "suspicious_count": 0, }, "network": {"score": 0.0, "domain_diversity": 0.0}, "claim_verification": {"score": 0.0, "source_details": []}, }, } # Step 1: Search for sources search_results = self._search_for_sources(raw_headline, results_to_check) if not search_results: print("āš ļø No search results found. Assigning low credibility by default.") return { "headline": raw_headline, "timestamp": datetime.now().isoformat(), "final_verdict": { "verdict": "🚫 HIGHLY QUESTIONABLE", "confidence": "Very High", "score": 0.1, "components": { "claim_verification": 0.1, "source_credibility": 0.1, "clickbait_detection": 0.1, "network_propagation": 0.1, }, }, "components": { "clickbait": {"score": 0.5}, "source_credibility": { "score": 0.1, "trusted_count": 0, "suspicious_count": 0, }, "network": {"score": 0.1, "domain_diversity": 0.0}, "claim_verification": {"score": 0.1, "source_details": []}, }, } # Step 2: Run all analysis components clickbait_score = self._analyze_clickbait(raw_headline) avg_source_credibility, trusted_count, suspicious_count = ( self._analyze_source_credibility(search_results) ) network_analysis = self._analyze_network_propagation(search_results) claim_verification_result = self._verify_claim(raw_headline, search_results) claim_verification_score = claim_verification_result["score"] # Step 3: Consolidate component scores (ensure all are Python floats) component_scores = { "claim_verification": claim_verification_score, "source_credibility": avg_source_credibility, "clickbait_detection": 1.0 - clickbait_score, # Invert score "network_propagation": network_analysis["score"], } # Step 4: Calculate final score and verdict final_score, verdict, confidence = self._calculate_final_score_and_verdict( component_scores ) # Step 5: Build the exact JSON structure you specified analysis_results = { "headline": raw_headline, "timestamp": datetime.now().isoformat(), "final_verdict": { "verdict": verdict, "confidence": confidence, "score": round(final_score, 2), "components": { "claim_verification": round( component_scores["claim_verification"], 2 ), "source_credibility": round( component_scores["source_credibility"], 2 ), "clickbait_detection": round( component_scores["clickbait_detection"], 2 ), "network_propagation": round( component_scores["network_propagation"], 2 ), }, }, "components": { "clickbait": {"score": round(clickbait_score, 2)}, "source_credibility": { "score": round(avg_source_credibility, 2), "trusted_count": trusted_count, "suspicious_count": suspicious_count, }, "network": { "score": round(network_analysis["score"], 2), "domain_diversity": round(network_analysis["domain_diversity"], 2), }, "claim_verification": { "score": round(claim_verification_score, 2), "source_details": claim_verification_result["source_details"], }, }, } self._print_summary(analysis_results) self.analysis_cache[raw_headline] = analysis_results gc.collect() return analysis_results