Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import json | |
| import logging | |
| from collections import defaultdict | |
| from telegram import Update | |
| from telegram.ext import ( | |
| ApplicationBuilder, | |
| MessageHandler, | |
| CommandHandler, | |
| ContextTypes, | |
| filters, | |
| ) | |
| # ------------------ CONFIG ------------------ | |
| TOKEN = os.getenv("BOT_TOKEN") # Set this in your environment | |
| DATA_FILE = "bot_data.json" | |
| logging.basicConfig( | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| level=logging.INFO, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ------------------ ABUSIVE WORD FILTER ------------------ | |
| ABUSIVE_WORDS = [ | |
| "fuck", "shit", "bitch", "asshole", "cunt", "motherfucker", | |
| "dick", "pussy", "slut", "whore", "bastard", | |
| "mumu", "ode", "werey", "weyre", "olodo", "ashawo", "oloshi", | |
| "agbaya", "oloriburuku", "shege", "dan iska", "apoda", | |
| "obun", "didirin", "ewu", "ndisime" | |
| ] | |
| abusive_regex = re.compile( | |
| r"(?i)\b(" + "|".join(re.escape(word) for word in ABUSIVE_WORDS) + r")\b" | |
| ) | |
| # ------------------ PERSISTENT STORAGE ------------------ | |
| def load_data(): | |
| if not os.path.exists(DATA_FILE): | |
| return {"groups": [], "warnings": {}} | |
| with open(DATA_FILE, "r") as f: | |
| return json.load(f) | |
| def save_data(data): | |
| with open(DATA_FILE, "w") as f: | |
| json.dump(data, f) | |
| data_store = load_data() | |
| # Structure: | |
| # data_store["groups"] = [group_ids] | |
| # data_store["warnings"] = { "chat_id:user_id": warning_count } | |
| # ------------------ ABUSE HANDLER ------------------ | |
| async def track_groups_and_check_abuse(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| message = update.message | |
| if not message or not message.text: | |
| return | |
| chat = message.chat | |
| # Track groups | |
| if chat.type in ["group", "supergroup"]: | |
| if chat.id not in data_store["groups"]: | |
| data_store["groups"].append(chat.id) | |
| save_data(data_store) | |
| # Check abuse | |
| if not abusive_regex.search(message.text): | |
| return | |
| user = message.from_user | |
| key = f"{chat.id}:{user.id}" | |
| try: | |
| await message.delete() | |
| except Exception as e: | |
| logger.warning(f"Failed to delete message: {e}") | |
| # Increase warning count | |
| data_store["warnings"][key] = data_store["warnings"].get(key, 0) + 1 | |
| warnings = data_store["warnings"][key] | |
| save_data(data_store) | |
| if warnings == 1: | |
| await chat.send_message( | |
| f"⚠️ {user.first_name}, first warning. Do not use abusive language." | |
| ) | |
| elif warnings == 2: | |
| await chat.send_message( | |
| f"⚠️ {user.first_name}, FINAL warning. Next time you will be removed." | |
| ) | |
| else: | |
| await chat.send_message( | |
| f"🚫 {user.first_name} has been removed for repeated abusive language." | |
| ) | |
| try: | |
| await context.bot.ban_chat_member(chat.id, user.id) | |
| except Exception as e: | |
| logger.error(f"Failed to ban user: {e}") | |
| data_store["warnings"][key] = 0 | |
| save_data(data_store) | |
| # ------------------ PRIVATE BROADCAST ------------------ | |
| async def vikingkull_omega(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| message = update.message | |
| if message.chat.type != "private": | |
| return | |
| if not data_store["groups"]: | |
| await message.reply_text("No groups stored yet.") | |
| return | |
| post_text = "" | |
| photo_id = None | |
| if message.photo: | |
| photo_id = message.photo[-1].file_id | |
| post_text = (message.caption or "").replace("/vikingkull_omega", "").strip() | |
| elif message.text: | |
| post_text = message.text.replace("/vikingkull_omega", "").strip() | |
| if not post_text and not photo_id: | |
| await message.reply_text( | |
| "Usage:\n" | |
| "/vikingkull_omega your message\n" | |
| "OR send photo with caption." | |
| ) | |
| return | |
| success = 0 | |
| for group_id in list(data_store["groups"]): | |
| try: | |
| if photo_id: | |
| await context.bot.send_photo( | |
| chat_id=group_id, | |
| photo=photo_id, | |
| caption=post_text | |
| ) | |
| else: | |
| await context.bot.send_message( | |
| chat_id=group_id, | |
| text=post_text | |
| ) | |
| success += 1 | |
| except Exception as e: | |
| logger.warning(f"Removing invalid group {group_id}: {e}") | |
| data_store["groups"].remove(group_id) | |
| save_data(data_store) | |
| await message.reply_text(f"✅ Broadcast sent to {success} group(s).") | |
| # ------------------ DAILY COMMAND ------------------ | |
| async def daily_broadcast(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| message = update.message | |
| chat = message.chat | |
| # Admin-only in groups | |
| if chat.type in ["group", "supergroup"]: | |
| member = await context.bot.get_chat_member(chat.id, message.from_user.id) | |
| if member.status not in ["administrator", "creator"]: | |
| return | |
| if not context.args: | |
| await message.reply_text("Usage: /daily Your message here") | |
| return | |
| text = " ".join(context.args) | |
| try: | |
| await message.delete() | |
| except: | |
| pass | |
| await chat.send_message( | |
| f"📢 Daily Update\n\n{text}" | |
| ) | |
| # ------------------ MAIN ------------------ | |
| def main(): | |
| if not TOKEN: | |
| raise ValueError("BOT_TOKEN environment variable not set.") | |
| app = ApplicationBuilder().token(TOKEN).build() | |
| app.add_handler(CommandHandler("daily", daily_broadcast)) | |
| app.add_handler(CommandHandler("vikingkull_omega", vikingkull_omega)) | |
| app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, track_groups_and_check_abuse)) | |
| logger.info("Bot is running...") | |
| app.run_polling() | |
| if __name__ == "__main__": | |
| main() |