| |
|
| | import asyncio |
| | import functools |
| | from typing import (Union) |
| | from pyrogram.errors import UserNotParticipant, FloodWait |
| | from pyrogram.enums.parse_mode import ParseMode |
| | from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message,WebAppInfo |
| |
|
| |
|
| | from FileStream.utils.FileProcessors.translation import LANG |
| | from FileStream.Database import Database |
| | from FileStream.utils.FileProcessors.human_readable import humanbytes |
| | from FileStream.config import Telegram, Server |
| | from FileStream.bot import FileStream |
| | from FileStream.Tools.cleanup import Get_Title_Year |
| | |
| |
|
| |
|
| | db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
| |
|
| |
|
| | async def get_invite_link(bot, chat_id: Union[str, int]): |
| | try: |
| | invite_link = await bot.create_chat_invite_link(chat_id=chat_id) |
| | return invite_link |
| | except FloodWait as e: |
| | print(f"Sleep of {e.value}s caused by FloodWait ...") |
| | await asyncio.sleep(e.value) |
| | return await get_invite_link(bot, chat_id) |
| |
|
| |
|
| | async def is_user_joined(bot, message: Message): |
| | if Telegram.FORCE_SUB_ID and Telegram.FORCE_SUB_ID.startswith("-100"): |
| | channel_chat_id = int( |
| | Telegram.FORCE_SUB_ID) |
| | elif Telegram.FORCE_SUB_ID and (not Telegram.FORCE_SUB_ID.startswith("-100")): |
| | channel_chat_id = Telegram.FORCE_SUB_ID |
| | else: |
| | return 200 |
| | try: |
| | user = await bot.get_chat_member(chat_id=channel_chat_id,user_id=message.from_user.id) |
| | if user.status == "BANNED": |
| | await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), |
| | parse_mode=ParseMode.MARKDOWN, |
| | disable_web_page_preview=True) |
| | return False |
| | except UserNotParticipant: |
| | invite_link = await get_invite_link(bot, chat_id=channel_chat_id) |
| | |
| | if Telegram.VERIFY_PIC: |
| | ver = await message.reply_photo( |
| | photo=Telegram.VERIFY_PIC, |
| | caption="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>", |
| | parse_mode=ParseMode.HTML, |
| | reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]])) |
| | else: |
| | |
| | ver = await message.reply_text( |
| | text="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>", |
| | reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("❆ Jᴏɪɴ Oᴜʀ Cʜᴀɴɴᴇʟ ❆",url=invite_link.invite_link)]]), |
| | parse_mode=ParseMode.HTML) |
| |
|
| | await asyncio.sleep(30) |
| | try: |
| | await ver.delete() |
| | await message.delete() |
| | except Exception: |
| | pass |
| | return False |
| | except Exception: |
| | await message.reply_text( |
| | text=f"<i>Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ</i> <b><a href='https://t.me/{Telegram.UPDATES_CHANNEL}'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>", |
| | parse_mode=ParseMode.HTML, |
| | disable_web_page_preview=True) |
| | return False |
| | return True |
| | |
| | async def send_polls_func(file_info,replied_message): |
| | |
| | |
| | return |
| | |
| | |
| | async def upload_type_func(file_info,replied_message): |
| | """ |
| | //Used values Directly from file Info |
| | file_name = file_info['file_name'] |
| | file_size = humanbytes(file_info['file_size']) |
| | mime_type = file_info['mime_type'] |
| | user_id = file_info['user_id'] |
| | message_id = file_info['message_id'] |
| | """ |
| | existing_file = await db.get_file_by_fileuniqueid_only(file_info['file']['file_unique_id'], file_info['privacy_type']) |
| | if existing_file : |
| | response = await gen_link(existing_file['_id']) |
| | |
| | return {"reply_markup" : response["reply_markup"], "stream_text": response["stream_text"],"poster":response["poster"],"type":"ExistingFile"} |
| | else: |
| | name = file_info['file']['caption'] if file_info['file']['caption'] else file_info['file']['file_name'] |
| | title, year = Get_Title_Year(name) |
| | stream_text = LANG.STREAM_TEXT_Y.format(file_info['file']['file_name'],title+" "+str(year),humanbytes(file_info['file']['file_size'])) |
| | reply_markup = InlineKeyboardMarkup([ |
| | [ |
| | InlineKeyboardButton("PUBLIC",callback_data=f"pubup_{file_info['user_id']}_{file_info['message_id']}"), |
| | InlineKeyboardButton("PRIVATE",callback_data=f"privup_{file_info['user_id']}_{file_info['message_id']}"), |
| | InlineKeyboardButton("TEMPORARY",callback_data=f"tempup_{file_info['user_id']}_{file_info['message_id']}") |
| | ], |
| | [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] |
| | ]) |
| | return {"reply_markup" : reply_markup, "stream_text": stream_text, "type":"OPTIONS"} |
| |
|
| |
|
| | async def priv_func(file_name, file_size): |
| | file_size = humanbytes(file_size) |
| | stream_text = LANG.PRIV_FILE_RENAME.format(file_name, file_size) |
| | reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]]) |
| | return reply_markup, stream_text |
| |
|
| |
|
| | |
| | async def gen_priv_file_link(_id): |
| | file_info = await db.get_privfile(_id) |
| | file_name = file_info['file_name'] |
| | file_size = humanbytes(file_info['file_size']) |
| | mime_type = file_info['mime_type'] |
| |
|
| | page_link = f"{Server.URL}app/watch/{_id}" |
| | stream_link = f"{Server.URL}api/dl/{_id}" |
| | file_link = f"https://t.me/{FileStream.username}?start=privfile_{_id}" |
| |
|
| | if "video" in mime_type: |
| | stream_text = LANG.STREAM_TEXT.format(file_name, file_size, stream_link, page_link, file_link) |
| | reply_markup = InlineKeyboardMarkup([ |
| | [ |
| | InlineKeyboardButton("sᴛʀᴇᴀᴍ",url=page_link), |
| | InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) |
| | ], |
| | [ |
| | InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), |
| | InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") |
| | ], |
| | [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] |
| | ]) |
| | else: |
| | stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) |
| | reply_markup = InlineKeyboardMarkup([ |
| | [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], |
| | [ |
| | InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), |
| | InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") |
| | ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] |
| | ]) |
| | return reply_markup, stream_text |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def gen_link(_id): |
| | file_info = await db.get_file(_id), |
| | file_name = file_info['file']['file_name'], |
| | file_size = humanbytes(file_info['file']['file_size']), |
| | mime_type = file_info['file']['mime_type'], |
| | poster = file_info['poster'], |
| | title=file_info["title"], |
| | description=file_info['description'], |
| | release_date=str(file_info["release_date"]), |
| | page_link = f"{Server.URL}app/watch/{_id}", |
| | stream_link = f"{Server.URL}api/dl/{_id}", |
| | file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" |
| |
|
| | if "video" in mime_type: |
| | stream_text = LANG.STREAM_TEXT.format(poster,description,title,release_date,file_name, file_size, stream_link, page_link, file_link) |
| | reply_markup = InlineKeyboardMarkup([ |
| | [ |
| | InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), |
| | InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) |
| | ], |
| | [ |
| | InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), |
| | InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") |
| | ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] |
| | ]) |
| | else: |
| | stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) |
| | reply_markup = InlineKeyboardMarkup([ |
| | [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], |
| | [ |
| | InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), |
| | InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",callback_data=f"msgdelpvt_{_id}") |
| | ], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] |
| | ]) |
| | return {"reply_markup": reply_markup, "stream_text": stream_text, "poster":poster} |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def gen_linkx(m: Message, _id, name: list): |
| | file_info = await db.get_file(_id) |
| | file_name = file_info['file_name'] |
| | mime_type = file_info['mime_type'] |
| | file_size = humanbytes(file_info['file_size']) |
| |
|
| | page_link = f"{Server.URL}app/watch/{_id}" |
| | stream_link = f"{Server.URL}api/dl/{_id}" |
| | file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" |
| |
|
| | if "video" in mime_type: |
| | stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,page_link) |
| | reply_markup = InlineKeyboardMarkup([[ |
| | InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), |
| | InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) |
| | ]]) |
| | else: |
| | stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link,file_link) |
| | reply_markup = InlineKeyboardMarkup( |
| | [[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)]]) |
| | return reply_markup, stream_text |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def is_user_banned(message): |
| | if await db.is_user_banned(message.from_user.id): |
| | await message.reply_text(text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), |
| | parse_mode=ParseMode.MARKDOWN, |
| | disable_web_page_preview=True) |
| | return True |
| | return False |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def is_channel_banned(bot, message): |
| | if await db.is_user_banned(message.chat.id): |
| | await bot.edit_message_reply_markup( |
| | chat_id=message.chat.id, |
| | message_id=message.id, |
| | reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(f"ᴄʜᴀɴɴᴇʟ ɪs ʙᴀɴɴᴇᴅ",callback_data="N/A")]] |
| | )) |
| | return True |
| | return False |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def is_user_authorized(message): |
| | if hasattr(Telegram, 'AUTH_USERS') and Telegram.AUTH_USERS: |
| | user_id = message.from_user.id |
| | if user_id == Telegram.OWNER_ID: |
| | return True |
| | if not (user_id in Telegram.AUTH_USERS): |
| | await message.reply_text(text="Yᴏᴜ ᴀʀᴇ ɴᴏᴛ ᴀᴜᴛʜᴏʀɪᴢᴇᴅ ᴛᴏ ᴜsᴇ ᴛʜɪs ʙᴏᴛ.",parse_mode=ParseMode.MARKDOWN,disable_web_page_preview=True) |
| | return False |
| | return True |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def is_user_exist(bot, message): |
| | if not bool(await db.get_user(message.from_user.id)): |
| | |
| | await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡUsᴇʀ**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}`") |
| |
|
| |
|
| | |
| | async def user_status(bot, message): |
| | user = await db.get_user(message.from_user.id) |
| | |
| | return user |
| |
|
| |
|
| | async def is_channel_exist(bot, message): |
| | if not bool(await db.get_user(message.chat.id)): |
| | await db.add_user(message.chat.id) |
| | members = await bot.get_chat_members_count(message.chat.id) |
| | await bot.send_message(Telegram.ULOG_GROUP,f"**#NᴇᴡCʜᴀɴɴᴇʟ** \n**⬩ ᴄʜᴀᴛ ɴᴀᴍᴇ :** `{message.chat.title}`\n**⬩ ᴄʜᴀᴛ ɪᴅ :** `{message.chat.id}`\n**⬩ ᴛᴏᴛᴀʟ ᴍᴇᴍʙᴇʀs :** `{members}`" ) |
| |
|
| |
|
| | |
| | |
| | def verify_users(func): |
| | @functools.wraps(func) |
| | async def wrapper(bot, message): |
| | response = {} |
| | user = await user_status(bot, message) |
| | if user: |
| | if user['tele_status']['status'] == "ACTIVE": |
| | response['status'] = "AUTHORIZED" |
| | elif user['tele_status']['status'] == "BANNED": |
| | response['status'] = "BANNED" |
| |
|
| | await bot.send_message( |
| | Telegram.ULOG_GROUP, |
| | f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']} \n Trying to access Services**" |
| | ) |
| |
|
| | await bot.send_message( |
| | message.from_user.id, |
| | f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** You are Banned by Admins Contact Admins to unban you**" |
| | ) |
| |
|
| | else: |
| |
|
| | response['status'] = "NOT EXIST" |
| | await bot.send_message( |
| | Telegram.ULOG_GROUP, |
| | f"**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name} {message.from_user.last_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n ** Status : {response['status']}**" |
| | ) |
| | await bot.send_message( |
| | message.from_user.id, |
| | f"**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}` \n** Ask Admins to Authorise You**" |
| | ) |
| |
|
| | return await func(bot, message, response) |
| |
|
| | return wrapper |
| |
|
| |
|
| | async def verify_user(bot, message): |
| | if not await is_user_authorized(message): |
| | return False |
| |
|
| | if await is_user_banned(message): |
| | return False |
| |
|
| | await is_user_exist(bot, message) |
| |
|
| | if Telegram.FORCE_SUB: |
| | if not await is_user_joined(bot, message): |
| | return False |
| |
|
| | return True |
| |
|