|
from pyrogram.types import * |
|
from pyrogram.errors import FloodWait |
|
from pyrogram import Client, filters, enums |
|
from pyrogram.errors.exceptions.forbidden_403 import ChatWriteForbidden |
|
from pyrogram.errors.exceptions.bad_request_400 import ChatAdminRequired, UserAdminInvalid |
|
|
|
from utils import extract_time, extract_user, admin_check, admin_filter |
|
from info import ADMINS |
|
from Script import script |
|
from time import time |
|
import asyncio |
|
|
|
|
|
|
|
@Client.on_message(filters.command("ban")) |
|
async def ban_user(_, message): |
|
is_admin = await admin_check(message) |
|
if not is_admin: return |
|
user_id, user_first_name = extract_user(message) |
|
try: await message.chat.ban_member(user_id=user_id) |
|
except Exception as error: await message.reply_text(str(error)) |
|
else: |
|
if str(user_id).lower().startswith("@"): |
|
await message.reply_text(f"Someone else is dusting off..! \n{user_first_name} \nIs forbidden.") |
|
else: |
|
await message.reply_text(f"Someone else is dusting off..! \n<a href='tg://user?id={user_id}'>{user_first_name}</a> Is forbidden") |
|
|
|
|
|
@Client.on_message(filters.command("tban")) |
|
async def temp_ban_user(_, message): |
|
is_admin = await admin_check(message) |
|
if not is_admin: return |
|
if not len(message.command) > 1: return |
|
user_id, user_first_name = extract_user(message) |
|
until_date_val = extract_time(message.command[1]) |
|
if until_date_val is None: return await message.reply_text(text=f"Invalid time type specified. \nExpected m, h, or d, Got it: {message.command[1][-1]}") |
|
try: await message.chat.ban_member(user_id=user_id, until_date=until_date_val) |
|
except Exception as error: await message.reply_text(str(error)) |
|
else: |
|
if str(user_id).lower().startswith("@"): |
|
await message.reply_text(f"Someone else is dusting off..!\n{user_first_name}\nbanned for {message.command[1]}!") |
|
else: |
|
await message.reply_text(f"Someone else is dusting off..!\n<a href='tg://user?id={user_id}'>Lavane</a>\n banned for {message.command[1]}!") |
|
|
|
|
|
@Client.on_message(filters.command(["unban", "unmute"])) |
|
async def un_ban_user(_, message): |
|
is_admin = await admin_check(message) |
|
if not is_admin: return |
|
user_id, user_first_name = extract_user(message) |
|
try: await message.chat.unban_member(user_id=user_id) |
|
except Exception as error: await message.reply_text(str(error)) |
|
else: |
|
if str(user_id).lower().startswith("@"): |
|
await message.reply_text(f"Okay, changed ... now {user_first_name} To You can join the group!") |
|
else: |
|
await message.reply_text(f"Okay, changed ... now <a href='tg://user?id={user_id}'>{user_first_name}</a> To You can join the group!") |
|
|
|
|
|
@Client.on_message(filters.command("mute")) |
|
async def mute_user(_, message): |
|
is_admin = await admin_check(message) |
|
if not is_admin: return |
|
user_id, user_first_name = extract_user(message) |
|
try: await message.chat.restrict_member(user_id=user_id, permissions=ChatPermissions()) |
|
except Exception as error: await message.reply_text(str(error)) |
|
else: |
|
if str(user_id).lower().startswith("@"): |
|
await message.reply_text(f"ππ» {user_first_name} Lavender's mouth is shut! π€") |
|
else: |
|
await message.reply_text(f"ππ» <a href='tg://user?id={user_id}'>Of lavender</a> The mouth is closed! π€") |
|
|
|
|
|
@Client.on_message(filters.command("tmute")) |
|
async def temp_mute_user(_, message): |
|
is_admin = await admin_check(message) |
|
if not is_admin: return |
|
if not len(message.command) > 1: return |
|
user_id, user_first_name = extract_user(message) |
|
until_date_val = extract_time(message.command[1]) |
|
if until_date_val is None: |
|
return await message.reply_text(f"Invalid time type specified. Expected m, h, or d, Got it: {message.command[1][-1]}") |
|
try: |
|
await message.chat.restrict_member(user_id=user_id, permissions=ChatPermissions(), until_date=until_date_val) |
|
except Exception as error: |
|
await message.reply_text(str(error)) |
|
else: |
|
if str(user_id).lower().startswith("@"): |
|
await message.reply_text(f"Be quiet for a while! π {user_first_name} muted for {message.command[1]}!") |
|
else: |
|
await message.reply_text(f"Be quiet for a while! π <a href='tg://user?id={user_id}'>Of lavender</a> Mouth muted for {message.command[1]}!") |
|
|
|
|
|
@Client.on_message(filters.command("pin") & filters.create(admin_filter)) |
|
async def pin(_, message: Message): |
|
if not message.reply_to_message: return |
|
await message.reply_to_message.pin() |
|
|
|
|
|
@Client.on_message(filters.command("unpin") & filters.create(admin_filter)) |
|
async def unpin(_, message: Message): |
|
if not message.reply_to_message: return |
|
await message.reply_to_message.unpin() |
|
|
|
|
|
|
|
@Client.on_message(filters.command("purge") & (filters.group | filters.channel)) |
|
async def purge(client, message): |
|
if message.chat.type not in ((enums.ChatType.SUPERGROUP, enums.ChatType.CHANNEL)): return |
|
is_admin = await admin_check(message) |
|
if not is_admin: return |
|
status_message = await message.reply_text("...", quote=True) |
|
await message.delete() |
|
message_ids = [] |
|
count_del_etion_s = 0 |
|
if message.reply_to_message: |
|
for a_s_message_id in range(message.reply_to_message.id, message.id): |
|
message_ids.append(a_s_message_id) |
|
if len(message_ids) == "100": |
|
await client.delete_messages(chat_id=message.chat.id, message_ids=message_ids, revoke=True) |
|
count_del_etion_s += len(message_ids) |
|
message_ids = [] |
|
if len(message_ids) > 0: |
|
await client.delete_messages(chat_id=message.chat.id, message_ids=message_ids, revoke=True) |
|
count_del_etion_s += len(message_ids) |
|
await status_message.edit_text(f"deleted {count_del_etion_s} messages") |
|
await status_message.delete() |
|
|
|
|
|
@Client.on_message(filters.group & filters.command('inkick')) |
|
async def inkick(client, message): |
|
user = await client.get_chat_member(message.chat.id, message.from_user.id) |
|
if user.status not in (enums.ChatMemberStatus.ADMINISTRATOR, enums.ChatMemberStatus.OWNER): |
|
note = await message.reply_text(script.CREATOR_REQUIRED) |
|
await asyncio.sleep(3) |
|
await note.delete() |
|
return await message.delete() |
|
if len(message.command) > 1: |
|
input_str = message.command |
|
sent_message = await message.reply_text(script.START_KICK) |
|
await asyncio.sleep(2) |
|
await message.delete() |
|
count = 0 |
|
for member in client.get_chat_members(message.chat.id): |
|
if member.user.status in input_str and not member.status in (enums.ChatMemberStatus.ADMINISTRATOR, enums.ChatMemberStatus.OWNER): |
|
try: |
|
client.ban_chat_member(message.chat.id, member.user.id, int(time() + 45)) |
|
count += 1 |
|
except (ChatAdminRequired, UserAdminInvalid): |
|
await sent_message.edit(script.ADMIN_REQUIRED) |
|
await client.leave_chat(message.chat.id) |
|
break |
|
except FloodWait as e: |
|
await asyncio.sleep(e.value) |
|
try: |
|
await sent_message.edit(script.KICKED.format(count)) |
|
except ChatWriteForbidden: pass |
|
else: |
|
await message.reply_text(script.INPUT_REQUIRED) |
|
|
|
|
|
@Client.on_message(filters.group & filters.command('dkick')) |
|
async def dkick(client, message): |
|
user = await client.get_chat_member(message.chat.id, message.from_user.id) |
|
if user.status not in (enums.ChatMemberStatus.ADMINISTRATOR, enums.ChatMemberStatus.OWNER): |
|
note = await message.reply_text(script.CREATOR_REQUIRED) |
|
await asyncio.sleep(3) |
|
await note.delete() |
|
return await message.delete() |
|
sent_message = await message.reply_text(script.START_KICK) |
|
await message.delete() |
|
count = 0 |
|
for member in client.get_chat_members(message.chat.id): |
|
if member.user.is_deleted and not member.status in (enums.ChatMemberStatus.ADMINISTRATOR, enums.ChatMemberStatus.OWNER): |
|
try: |
|
await client.ban_chat_member(message.chat.id, member.user.id, int(time() + 45)) |
|
count += 1 |
|
except (ChatAdminRequired, UserAdminInvalid): |
|
await sent_message.edit(script.ADMIN_REQUIRED) |
|
await client.leave_chat(message.chat.id) |
|
break |
|
except FloodWait as e: |
|
await asyncio.sleep(e.value) |
|
try: |
|
await sent_message.edit(script.DKICK.format(count)) |
|
except ChatWriteForbidden: pass |
|
|
|
|
|
@Client.on_message((filters.channel | filters.group) & filters.command('instatus')) |
|
async def instatus(client, message): |
|
user = await client.get_chat_member(message.chat.id, message.from_user.id) |
|
if user.status not in (enums.ChatMemberStatus.ADMINISTRATOR, enums.ChatMemberStatus.OWNER, ADMINS): |
|
note = await message.reply("you are not administrator in this chat") |
|
await asyncio.sleep(3) |
|
await message.delete() |
|
return await note.delete() |
|
sent_message = await message.reply_text("π Processing.....") |
|
recently = 0 |
|
within_week = 0 |
|
within_month = 0 |
|
long_time_ago = 0 |
|
deleted_acc = 0 |
|
uncached = 0 |
|
bot = 0 |
|
for member in client.get_chat_members(message.chat.id): |
|
if member.user.is_deleted: deleted_acc += 1 |
|
elif member.user.is_bot: bot += 1 |
|
elif member.user.status == enums.UserStatus.RECENTLY: recently += 1 |
|
elif member.user.status == enums.UserStatus.LAST_WEEK: within_week += 1 |
|
elif member.user.status == enums.UserStatus.LAST_MONTH: within_month += 1 |
|
elif member.user.status == enums.UserStatus.LONG_AGO: long_time_ago += 1 |
|
else: uncached += 1 |
|
if message.chat.type == enums.ChatType.CHANNEL: |
|
await sent_message.edit(f"{message.chat.title}\nChat Member Status\n\nRecently - {recently}\nWithin Week - {within_week}\nWithin Month - {within_month}\nLong Time Ago - {long_time_ago}\n\nDeleted Account - {deleted_acc}\nBot - {bot}\nUnCached - {uncached}") |
|
elif message.chat.type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: |
|
await sent_message.edit(f"{message.chat.title}\nChat Member Status\n\nRecently - {recently}\nWithin Week - {within_week}\nWithin Month - {within_month}\nLong Time Ago - {long_time_ago}\n\nDeleted Account - {deleted_acc}\nBot - {bot}\nUnCached - {uncached}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|