| |
| import re |
| import unicodedata |
| from typing import Dict, List, Tuple |
| from datetime import datetime |
| import torch |
| from transformers import pipeline |
|
|
| class SimpleModerator: |
| def __init__(self): |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.toxic_model = None |
| self.hate_model = None |
| |
| |
| self.leet_map = { |
| "@": "a", "4": "a", "3": "e", "1": "i", "!": "i", |
| "0": "o", "$": "s", "5": "s", "7": "t", "+": "t", |
| "2": "to", "&": "and", "8": "ate", "6": "g" |
| } |
| |
| |
| self.char_normalize = { |
| 'à': 'a', 'á': 'a', 'â': 'a', 'ã': 'a', 'ä': 'a', |
| 'è': 'e', 'é': 'e', 'ê': 'e', 'ë': 'e', |
| 'ì': 'i', 'í': 'i', 'î': 'i', 'ï': 'i', |
| 'ò': 'o', 'ó': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o', |
| 'ù': 'u', 'ú': 'u', 'û': 'u', 'ü': 'u', |
| 'ý': 'y', 'ÿ': 'y', |
| 'ç': 'c', 'ñ': 'n' |
| } |
| |
| |
| self.delete_patterns = [ |
| "kill yourself", "kill urself", "kys", "commit suicide", "end your life", |
| "take your life", "suicide", "kill myself", "kill himself", "kill herself", |
| "i will kill you", "ill kill you", "gonna kill you", "murder you", |
| "i will murder", "going to kill", "gonna murder", "i will hurt you", |
| "should die", "deserve to die", "hope you die", "wish you were dead", |
| "should be dead", "drop dead", "go die", "just die", |
| ] |
| |
| |
| self.slur_patterns = [ |
| "nigger", "nigga", "faggot", "fag", "retard", "spic", "kike", |
| "chink", "cunt", "whore", "slut" |
| ] |
| |
| |
| self.flag_patterns = [ |
| "stupid", "idiot", "dumb", "moron", "loser", "ugly", "fat", |
| "worthless", "pathetic", "useless", "failure", "imbecile", |
| "disgusting", "terrible", "awful", "despise", |
| "shut up", "shut the fuck", "piss off" |
| ] |
| |
| |
| self.toxic_words = [ |
| "hate", "hating", "hated", "hater", |
| "fuck", "fucking", "fucked", "bitch", "asshole", "ass", |
| "damn", "hell", "crap", "shit", "piss" |
| ] |
| |
| print(f"🖥️ Device: {self.device}") |
| |
| def load_models(self): |
| """Load the ML models""" |
| print("📥 Loading models...") |
| try: |
| print("Loading toxic-bert model...") |
| self.toxic_model = pipeline( |
| "text-classification", |
| model="unitary/toxic-bert", |
| device=0 if self.device == "cuda" else -1, |
| truncation=True, |
| max_length=512 |
| ) |
| print("✅ Toxic-BERT loaded") |
| |
| print("Loading dehatebert model...") |
| self.hate_model = pipeline( |
| "text-classification", |
| model="Hate-speech-CNERG/dehatebert-mono-english", |
| device=0 if self.device == "cuda" else -1, |
| truncation=True, |
| max_length=512 |
| ) |
| print("✅ DeHateBERT loaded") |
| |
| print("🎉 Models ready!") |
| return True |
| except Exception as e: |
| print(f"❌ Error loading models: {e}") |
| import traceback |
| traceback.print_exc() |
| return False |
| |
| def normalize_text(self, text: str) -> str: |
| """Enhanced normalization for better pattern matching""" |
| text = text.lower() |
| |
| for char, normal in self.char_normalize.items(): |
| text = text.replace(char, normal) |
| |
| for leet, normal in self.leet_map.items(): |
| text = text.replace(leet, normal) |
| |
| text = unicodedata.normalize("NFKD", text) |
| text = re.sub(r'\s+', ' ', text) |
| text = re.sub(r'[^\w\s]', ' ', text) |
| text = re.sub(r'\s+', ' ', text) |
| text = re.sub(r'(.)\1{2,}', r'\1\1', text) |
| |
| return text.strip() |
| |
| def check_patterns(self, text: str) -> Tuple[str, List[str], float]: |
| """Check text against patterns and return decision with confidence""" |
| normalized = self.normalize_text(text) |
| words = normalized.split() |
| matched_patterns = [] |
| max_confidence = 0.0 |
| |
| |
| for pattern in self.delete_patterns: |
| if pattern in normalized: |
| matched_patterns.append(f"DELETE:{pattern}") |
| max_confidence = 1.0 |
| |
| |
| for slur in self.slur_patterns: |
| if slur in words or slur in normalized: |
| matched_patterns.append(f"DELETE:slur:{slur}") |
| max_confidence = 1.0 |
| |
| if matched_patterns and max_confidence == 1.0: |
| return "DELETE", matched_patterns, 1.0 |
| |
| |
| toxic_count = 0 |
| for word in words: |
| if word in self.toxic_words: |
| toxic_count += 1 |
| matched_patterns.append(f"FLAG:toxic_word:{word}") |
| |
| |
| for pattern in self.flag_patterns: |
| if pattern in normalized: |
| matched_patterns.append(f"FLAG:{pattern}") |
| |
| if matched_patterns: |
| confidence = min(0.7 + (toxic_count * 0.1), 0.95) |
| return "FLAG", matched_patterns, confidence |
| |
| return "ALLOW", [], 0.0 |
| |
| def get_model_scores(self, text: str) -> Dict: |
| """Get model predictions with proper error handling""" |
| scores = { |
| "toxic_score": 0.0, |
| "toxic_label": "unknown", |
| "hate_score": 0.0, |
| "hate_label": "unknown" |
| } |
| |
| |
| if self.toxic_model is not None: |
| try: |
| toxic_result = self.toxic_model(text[:512])[0] |
| scores["toxic_score"] = float(toxic_result["score"]) |
| scores["toxic_label"] = toxic_result["label"] |
| print(f"Toxic score: {scores['toxic_score']:.3f} ({scores['toxic_label']})") |
| except Exception as e: |
| print(f"Toxic model error: {e}") |
| else: |
| print("Toxic model not loaded") |
| |
| |
| if self.hate_model is not None: |
| try: |
| hate_result = self.hate_model(text[:512])[0] |
| hate_score = float(hate_result["score"]) |
| |
| |
| if hate_result["label"] == "NON_HATE": |
| scores["hate_score"] = 1.0 - hate_score |
| scores["hate_label"] = "non_hate" |
| else: |
| scores["hate_score"] = hate_score |
| scores["hate_label"] = "hate" |
| |
| print(f"Hate score: {scores['hate_score']:.3f} ({scores['hate_label']})") |
| except Exception as e: |
| print(f"Hate model error: {e}") |
| else: |
| print("Hate model not loaded") |
| |
| return scores |
| |
| def moderate(self, text: str) -> Dict: |
| """Main moderation function - combines pattern matching and ML models""" |
| |
| |
| pattern_decision, matched, pattern_confidence = self.check_patterns(text) |
| |
| |
| scores = self.get_model_scores(text) |
| |
| toxic_score = scores["toxic_score"] |
| hate_score = scores["hate_score"] |
| |
| |
| action = "allow" |
| reason = "No issues detected" |
| final_confidence = 0.0 |
| |
| |
| if pattern_decision == "DELETE": |
| action = "delete" |
| reason = f"Pattern match: {matched[0].replace('DELETE:', '')}" |
| final_confidence = 1.0 |
| elif pattern_decision == "FLAG": |
| action = "flag" |
| reason = f"Pattern match: {matched[0].replace('FLAG:', '')}" |
| final_confidence = pattern_confidence |
| |
| |
| elif toxic_score > 0.90: |
| action = "delete" |
| reason = f"Extreme toxicity detected: {toxic_score:.2f}" |
| final_confidence = toxic_score |
| elif hate_score > 0.85: |
| action = "delete" |
| reason = f"Extreme hate speech detected: {hate_score:.2f}" |
| final_confidence = hate_score |
| elif toxic_score > 0.70: |
| action = "flag" |
| reason = f"High toxicity: {toxic_score:.2f}" |
| final_confidence = toxic_score |
| elif hate_score > 0.50: |
| action = "flag" |
| reason = f"Hate speech indicators: {hate_score:.2f}" |
| final_confidence = hate_score |
| |
| |
| if pattern_decision == "FLAG" and (toxic_score > 0.95 or hate_score > 0.90): |
| action = "delete" |
| reason = f"Pattern + Model agreement: {reason}" |
| final_confidence = max(pattern_confidence, toxic_score, hate_score) |
| |
| normalized_text = self.normalize_text(text) |
| |
| return { |
| "action": action, |
| "reason": reason, |
| "toxic_score": toxic_score, |
| "hate_score": hate_score, |
| "pattern_matches": matched, |
| "pattern_confidence": pattern_confidence, |
| "model_confidence": max(toxic_score, hate_score), |
| "final_confidence": final_confidence, |
| "normalized_text": normalized_text, |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| |
| _moderator_instance = None |
|
|
| def get_moderator(): |
| """Get or create moderator instance""" |
| global _moderator_instance |
| if _moderator_instance is None: |
| print("🔄 Creating new moderator instance...") |
| _moderator_instance = SimpleModerator() |
| success = _moderator_instance.load_models() |
| if not success: |
| print("⚠️ Warning: Models failed to load, using pattern matching only") |
| return _moderator_instance |