Spaces:
Running
Running
#By @TrueSaiyan | |
#Simply create a new file add the code to the file fill in the enviromental details(bot token etc) save the | |
#file as a whateveryoulike.py then using python run the code | |
import asyncio | |
import logging | |
import os | |
import re | |
from datetime import datetime, timedelta | |
import pyrogram | |
from pyrogram import Client, filters | |
from pyrogram.enums import ChatMemberStatus | |
from pyrogram.types import ChatPermissions, InlineKeyboardButton, InlineKeyboardMarkup | |
from pyrogram.errors.exceptions.bad_request_400 import UserAdminInvalid | |
API_ID = os.environ["API_ID"] | |
API_HASH = os.environ["API_HASH"] | |
BOT_TOKEN = os.environ["BOT_TOKEN"] | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger("TruesAutoBlacklister") | |
logger.info("Logging has been set up.") | |
app = Client( | |
"TruesBlacklister", | |
api_id=int(API_ID), #Get API_ID from my.telegram.org | |
api_hash=API_HASH, #As above | |
bot_token=BOT_TOKEN, #Get bot_token from @botfather | |
) | |
logger.info("Initialized Pyrogram client 1") | |
# Define the list of blacklisted words | |
blacklist = ["counterfeit", "meth", "shard", "heroin", "ghb"] | |
async def check_blacklist(client, message): | |
if any( | |
re.search(rf"\b{re.escape(word)}\b", message.text, re.IGNORECASE) | |
for word in blacklist | |
): | |
logger.info("Message contains a blacklisted word.") | |
chat_member = await client.get_chat_member( | |
message.chat.id, message.from_user.id | |
) | |
logger.info( | |
f"User status: {chat_member.status}, user id: {chat_member.user.id}, username: @{chat_member.user.username}" | |
) | |
if chat_member.status not in [ | |
ChatMemberStatus.ADMINISTRATOR, | |
ChatMemberStatus.OWNER, | |
]: | |
try: | |
await message.delete() | |
logger.info("User is not an admin. Deleted the message.") | |
await client.restrict_chat_member( | |
chat_id=message.chat.id, | |
user_id=message.from_user.id, | |
permissions=ChatPermissions(can_send_messages=False), | |
) | |
except UserAdminInvalid as e: | |
logger.error("Failed to mute the user. Skipping action.") | |
buttons = [ | |
[InlineKeyboardButton("Unmute", callback_data="unmute")], | |
[InlineKeyboardButton("Ban", callback_data="ban")], | |
] | |
reply_markup = InlineKeyboardMarkup(buttons) | |
await client.send_message( | |
chat_id=message.chat.id, | |
text=f"@{message.from_user.username}, Your message has been deleted due to a blacklisted word. Please select an action:", | |
reply_to_message_id=message.id, | |
reply_markup=reply_markup, | |
) | |
else: | |
logger.info("User is an admin. Skipping action.") | |
else: | |
logger.info("Message does not contain a blacklisted word.") | |
async def handle_button(client, callback_query): | |
try: | |
if callback_query.data == "unmute": | |
await client.restrict_chat_member( | |
chat_id=callback_query.message.chat.id, | |
user_id=callback_query.from_user.id, | |
permissions=ChatPermissions(can_send_messages=True), | |
) | |
await callback_query.message.edit_text(f"@{callback_query.from_user.username} has been unmuted.") | |
await asyncio.sleep(10) # Pause for 10 seconds | |
await callback_query.message.delete() | |
elif callback_query.data == "ban": | |
await client.ban_chat_member( | |
chat_id=callback_query.message.chat.id, user_id=callback_query.from_user.id | |
) | |
logger.info(f"@{callback_query.from_user.username} has banned themselves.") | |
await callback_query.message.edit_text(f"@{callback_query.from_user.username} has been banned.") | |
await asyncio.sleep(10) # Pause for 10 seconds | |
await callback_query.message.delete() | |
except UserAdminInvalid as e: | |
# logger.info(f"Caught UserAdminInvalid error: {e}") | |
await callback_query.answer("This button is not for you!") | |
except Exception as e: | |
logger.error(f"An error occurred: {e}") | |
app.run() | |