import logging from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORTLINK_URL, SHORTLINK_API, IS_SHORTLINK, LOG_CHANNEL, TUTORIAL, GRP_LNK, CHNL_LNK, CUSTOM_FILE_CAPTION, SECOND_SHORTLINK_URL, SECOND_SHORTLINK_API from imdb import Cinemagoer import asyncio from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup from pyrogram.errors import FloodWait, UserIsBlocked, MessageNotModified, PeerIdInvalid from pyrogram import enums from typing import Union from Script import script import pytz import random import re import os from datetime import datetime, date import string from typing import List from database.users_chats_db import db from bs4 import BeautifulSoup import requests import aiohttp from shortzy import Shortzy import http.client import json logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) BTN_URL_REGEX = re.compile( r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))" ) imdb = Cinemagoer() TOKENS = {} VERIFIED = {} BANNED = {} SECOND_SHORTENER = {} SMART_OPEN = '“' SMART_CLOSE = '”' START_CHAR = ('\'', '"', SMART_OPEN) # temp db for banned class temp(object): BANNED_USERS = [] BANNED_CHATS = [] ME = None CURRENT=int(os.environ.get("SKIP", 2)) CANCEL = False MELCOW = {} U_NAME = None B_NAME = None GETALL = {} SHORT = {} SETTINGS = {} async def is_subscribed(bot, query): try: user = await bot.get_chat_member(AUTH_CHANNEL, except UserNotParticipant: pass except Exception as e: logger.exception(e) else: if user.status != enums.ChatMemberStatus.BANNED: return True return False async def get_poster(query, bulk=False, id=False, file=None): if not id: # query = (query.strip()).lower() title = query year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) if year: year = list_to_str(year[:1]) title = (query.replace(year, "")).strip() elif file is not None: year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) if year: year = list_to_str(year[:1]) else: year = None movieid = imdb.search_movie(title.lower(), results=10) if not movieid: return None if year: filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid)) if not filtered: filtered = movieid else: filtered = movieid movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) if not movieid: movieid = filtered if bulk: return movieid movieid = movieid[0].movieID else: movieid = query movie = imdb.get_movie(movieid) if movie.get("original air date"): date = movie["original air date"] elif movie.get("year"): date = movie.get("year") else: date = "N/A" plot = "" if not LONG_IMDB_DESCRIPTION: plot = movie.get('plot') if plot and len(plot) > 0: plot = plot[0] else: plot = movie.get('plot outline') if plot and len(plot) > 800: plot = plot[0:800] + "..." return { 'title': movie.get('title'), 'votes': movie.get('votes'), "aka": list_to_str(movie.get("akas")), "seasons": movie.get("number of seasons"), "box_office": movie.get('box office'), 'localized_title': movie.get('localized title'), 'kind': movie.get("kind"), "imdb_id": f"tt{movie.get('imdbID')}", "cast": list_to_str(movie.get("cast")), "runtime": list_to_str(movie.get("runtimes")), "countries": list_to_str(movie.get("countries")), "certificates": list_to_str(movie.get("certificates")), "languages": list_to_str(movie.get("languages")), "director": list_to_str(movie.get("director")), "writer":list_to_str(movie.get("writer")), "producer":list_to_str(movie.get("producer")), "composer":list_to_str(movie.get("composer")) , "cinematographer":list_to_str(movie.get("cinematographer")), "music_team": list_to_str(movie.get("music department")), "distributors": list_to_str(movie.get("distributors")), 'release_date': date, 'year': movie.get('year'), 'genres': list_to_str(movie.get("genres")), 'poster': movie.get('full-size cover url'), 'plot': plot, 'rating': str(movie.get("rating")), 'url':f'{movieid}' } # async def broadcast_messages(user_id, message): try: await message.copy(chat_id=user_id) return True, "Success" except FloodWait as e: await asyncio.sleep(e.x) return await broadcast_messages(user_id, message) except InputUserDeactivated: await db.delete_user(int(user_id))"{user_id}-Removed from Database, since deleted account.") return False, "Deleted" except UserIsBlocked:"{user_id} -Blocked the bot.") return False, "Blocked" except PeerIdInvalid: await db.delete_user(int(user_id))"{user_id} - PeerIdInvalid") return False, "Error" except Exception as e: return False, "Error" async def broadcast_messages_group(chat_id, message): try: kd = await message.copy(chat_id=chat_id) try: await except: pass return True, "Success" except FloodWait as e: await asyncio.sleep(e.x) return await broadcast_messages_group(chat_id, message) except Exception as e: return False, "Error" async def search_gagala(text): usr_agent = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/61.0.3163.100 Safari/537.36' } text = text.replace(" ", '+') url = f'{text}' response = requests.get(url, headers=usr_agent) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') titles = soup.find_all( 'h3' ) return [title.getText() for title in titles] async def get_settings(group_id): settings = temp.SETTINGS.get(group_id) if not settings: settings = await db.get_settings(group_id) temp.SETTINGS[group_id] = settings return settings async def save_group_settings(group_id, key, value): current = await get_settings(group_id) current[key] = value temp.SETTINGS[group_id] = current await db.update_settings(group_id, current) def get_size(size): """Get size in readable format""" units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] size = float(size) i = 0 while size >= 1024.0 and i < len(units): i += 1 size /= 1024.0 return "%.2f %s" % (size, units[i]) def split_list(l, n): for i in range(0, len(l), n): yield l[i:i + n] def get_file_id(msg: Message): if for message_type in ( "photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker" ): obj = getattr(msg, message_type) if obj: setattr(obj, "message_type", message_type) return obj def extract_user(message: Message) -> Union[int, str]: """extracts the user from a message""" # user_id = None user_first_name = None if message.reply_to_message: user_id = user_first_name = message.reply_to_message.from_user.first_name elif len(message.command) > 1: if ( len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION ): required_entity = message.entities[1] user_id = user_first_name = required_entity.user.first_name else: user_id = message.command[1] # don't want to make a request -_- user_first_name = user_id try: user_id = int(user_id) except ValueError: pass else: user_id = user_first_name = message.from_user.first_name return (user_id, user_first_name) def list_to_str(k): if not k: return "N/A" elif len(k) == 1: return str(k[0]) elif MAX_LIST_ELM: k = k[:int(MAX_LIST_ELM)] return ' '.join(f'{elem}, ' for elem in k) else: return ' '.join(f'{elem}, ' for elem in k) def last_online(from_user): time = "" if from_user.is_bot: time += "🤖 Bot :(" elif from_user.status == enums.UserStatus.RECENTLY: time += "Recently" elif from_user.status == enums.UserStatus.LAST_WEEK: time += "Within the last week" elif from_user.status == enums.UserStatus.LAST_MONTH: time += "Within the last month" elif from_user.status == enums.UserStatus.LONG_AGO: time += "A long time ago :(" elif from_user.status == enums.UserStatus.ONLINE: time += "Currently Online" elif from_user.status == enums.UserStatus.OFFLINE: time += from_user.last_online_date.strftime("%a, %d %b %Y, %H:%M:%S") return time def split_quotes(text: str) -> List: if not any(text.startswith(char) for char in START_CHAR): return text.split(None, 1) counter = 1 # ignore first char -> is some kind of quote while counter < len(text): if text[counter] == "\\": counter += 1 elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): break counter += 1 else: return text.split(None, 1) # 1 to avoid starting quote, and counter is exclusive so avoids ending key = remove_escapes(text[1:counter].strip()) # index will be in range, or `else` would have been executed and returned rest = text[counter + 1:].strip() if not key: key = text[0] + text[0] return list(filter(None, [key, rest])) def gfilterparser(text, keyword): if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t")) buttons = [] note_data = "" prev = 0 i = 0 alerts = [] for match in BTN_URL_REGEX.finditer(text): # Check if btnurl is escaped n_escapes = 0 to_check = match.start(1) - 1 while to_check > 0 and text[to_check] == "\\": n_escapes += 1 to_check -= 1 # if even, not escaped -> create button if n_escapes % 2 == 0: note_data += text[prev:match.start(1)] prev = match.end(1) if == "buttonalert": # create a thruple with button label, url, and newline status if bool( and buttons: buttons[-1].append(InlineKeyboardButton(, callback_data=f"gfilteralert:{i}:{keyword}" )) else: buttons.append([InlineKeyboardButton(, callback_data=f"gfilteralert:{i}:{keyword}" )]) i += 1 alerts.append( elif bool( and buttons: buttons[-1].append(InlineKeyboardButton(," ", "") )) else: buttons.append([InlineKeyboardButton(," ", "") )]) else: note_data += text[prev:to_check] prev = match.start(1) - 1 else: note_data += text[prev:] try: return note_data, buttons, alerts except: return note_data, buttons, None def parser(text, keyword): if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t")) buttons = [] note_data = "" prev = 0 i = 0 alerts = [] for match in BTN_URL_REGEX.finditer(text): # Check if btnurl is escaped n_escapes = 0 to_check = match.start(1) - 1 while to_check > 0 and text[to_check] == "\\": n_escapes += 1 to_check -= 1 # if even, not escaped -> create button if n_escapes % 2 == 0: note_data += text[prev:match.start(1)] prev = match.end(1) if == "buttonalert": # create a thruple with button label, url, and newline status if bool( and buttons: buttons[-1].append(InlineKeyboardButton(, callback_data=f"alertmessage:{i}:{keyword}" )) else: buttons.append([InlineKeyboardButton(, callback_data=f"alertmessage:{i}:{keyword}" )]) i += 1 alerts.append( elif bool( and buttons: buttons[-1].append(InlineKeyboardButton(," ", "") )) else: buttons.append([InlineKeyboardButton(," ", "") )]) else: note_data += text[prev:to_check] prev = match.start(1) - 1 else: note_data += text[prev:] try: return note_data, buttons, alerts except: return note_data, buttons, None def remove_escapes(text: str) -> str: res = "" is_escaped = False for counter in range(len(text)): if is_escaped: res += text[counter] is_escaped = False elif text[counter] == "\\": is_escaped = True else: res += text[counter] return res def humanbytes(size): if not size: return "" power = 2**10 n = 0 Dic_powerN = async def get_shortlink(chat_id, link, second=False):
    settings = await get_settings(chat_id) #fetching settings for group
    if 'shortlink' in settings.keys():
        URL = settings['shortlink']
        API = settings['shortlink_api']
    else:
        URL = SHORTLINK_URL
        API = SHORTLINK_API
    if URL.startswith("shorturllink") or URL.startswith("") or URL.startswith("") or second:
        URL = SECOND_SHORTLINK_URL
        API = SECOND_SHORTLINK_API
    if URL == "":
        url = f'https://{URL}/easy_api'
        params = {
            "key": API,
            "link": link,
        }
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
                    data = await response.text()
                    return data
        except Exception as e:
            logger.error(e)
            return link
    else:
        shortzy = Shortzy(api_key=API, base_site=URL)
        link = await shortzy.convert(link)
        return link async def get_tutorial(chat_id):    base_site=URL) # # link = await shortzy.convert(link) # # return link async def get_tutorial(chat_id): settings = await get_settings(chat_id) #fetching settings for group if 'tutorial' in settings.keys(): TUTORIAL_URL = settings['tutorial'] else: TUTORIAL_URL = TUTORIAL return TUTORIAL_URL async def get_verify_shorted_link(link): API = SHORTLINK_API URL = SHORTLINK_URL https = link.split(":")[0] if "http" == https: https = "https" link = link.replace("http", https) if URL == "": url = f"https://{URL}/shortLink" params = {"token": API, "format": "json", "link": link, } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.json(content_type="text/html") if data["status"] == "success": return data["shortlink"] else: logger.error(f"Error: {data['message']}") return f'https://{URL}/shortLink?token={API}&format=json&link={link}' except Exception as e: logger.error(e) return f'https://{URL}/shortLink?token={API}&format=json&link={link}' else: url = f'https://{URL}/api' params = {'api': API, 'url': link, } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.json() if data["status"] == "success": return data['shortenedUrl'] else: logger.error(f"Error: {data['message']}") return f'https://{URL}/api?api={API}&link={link}' except Exception as e: logger.error(e) return f'{URL}/api?api={API}&link={link}' async def check_token(bot, userid, token): user = await bot.get_users(userid) if not await db.is_user_exist( await db.add_user(, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(, user.mention)) if in TOKENS.keys(): TKN = TOKENS[] if token in TKN.keys(): is_used = TKN[token] if is_used == True: return False else: return True else: return False async def get_token(bot, userid, link): user = await bot.get_users(userid) if not await db.is_user_exist( await db.add_user(, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(, user.mention)) token = ''.join(random.choices(string.ascii_letters + string.digits, k=7)) TOKENS[] = {token: False} link = f"{link}verify-{}-{token}" shortened_verify_url = await get_verify_shorted_link(link) return str(shortened_verify_url) async def verify_user(bot, userid, token): user = await bot.get_users(userid) if not await db.is_user_exist( await db.add_user(, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(, user.mention)) TOKENS[] = {token: True} tz = pytz.timezone('Asia/Kolkata') today = VERIFIED[] = str(today) async def check_verification(bot, userid): user = await bot.get_users(userid) if not await db.is_user_exist( await db.add_user(, user.first_name) await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(, user.mention)) tz = pytz.timezone('Asia/Kolkata') today = if in VERIFIED.keys(): EXP = VERIFIED[] years, month, day = EXP.split('-') comp = date(int(years), int(month), int(day)) if compHᴇʏ ᴛʜᴇʀᴇ {user_name} 👋🏽 \n\n✅ Sᴇᴄᴜʀᴇ ʟɪɴᴋ ᴛᴏ ʏᴏᴜʀ ғɪʟᴇ ʜᴀs sᴜᴄᴄᴇssғᴜʟʟʏ ʙᴇᴇɴ ɢᴇɴᴇʀᴀᴛᴇᴅ ᴘʟᴇᴀsᴇ ᴄʟɪᴄᴋ ᴅᴏᴡɴʟᴏᴀᴅ ʙᴜᴛᴛᴏɴ\n\n🗃️ Fɪʟᴇ Nᴀᴍᴇ : {title}\n🔖 Fɪʟᴇ Sɪᴢᴇ : {size}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📤 Dᴏᴡɴʟᴏᴀᴅ 📥", url=await get_shortlink(chat_id, f"{temp.U_NAME}?start=files_{file.file_id}"))]])) else: for file in files: f_caption = file.caption title = file.file_name size = get_size(file.file_size) if CUSTOM_FILE_CAPTION: try: f_caption = CUSTOM_FILE_CAPTION.format(file_name='' if title is None else title, file_size='' if size is None else size, file_caption='' if f_caption is None else f_caption) except Exception as e: print(e) f_caption = f_caption if f_caption is None: f_caption = f"{title}" await bot.send_cached_media( chat_id=userid, file_id=file.file_id, caption=f_caption, protect_content=True if ident == "filep" else False, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton('Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ', url=GRP_LNK), InlineKeyboardButton('Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ', url=CHNL_LNK) ],[ InlineKeyboardButton("Bᴏᴛ Oᴡɴᴇʀ", url="") ] ] ) ) except UserIsBlocked: await query.answer('Uɴʙʟᴏᴄᴋ ᴛʜᴇ ʙᴏᴛ ᴍᴀʜɴ !', show_alert=True) except PeerIdInvalid: await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) except Exception as e: await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) '''if IS_SHORTLINK == True: for file in files: title = file.file_name size = get_size(file.file_size) await bot.send_message(chat_id=userid, text=f"Hᴇʏ ᴛʜᴇʀᴇ {user_name} 👋🏽 \n\n✅ Sᴇᴄᴜʀᴇ ʟɪɴᴋ ᴛᴏ ʏᴏᴜʀ ғɪʟᴇ ʜᴀs sᴜᴄᴄᴇssғᴜʟʟʏ ʙᴇᴇɴ ɢᴇɴᴇʀᴀᴛᴇᴅ ᴘʟᴇᴀsᴇ ᴄʟɪᴄᴋ ᴅᴏᴡɴʟᴏᴀᴅ ʙᴜᴛᴛᴏɴ\n\n🗃️ Fɪʟᴇ Nᴀᴍᴇ : {title}\n🔖 Fɪʟᴇ Sɪᴢᴇ : {size}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📤 Dᴏᴡɴʟᴏᴀᴅ 📥", url=await get_shortlink(chat_id, f"{temp.U_NAME}?start=files_{file.file_id}"))]]) ) else: for file in files: f_caption = file.caption title = file.file_name size = get_size(file.file_size) if CUSTOM_FILE_CAPTION: try: f_caption = CUSTOM_FILE_CAPTION.format(file_name='' if title is None else title, file_size='' if size is None else size, file_caption='' if f_caption is None else f_caption) except Exception as e: print(e) f_caption = f_caption if f_caption is None: f_caption = f"{title}" await bot.send_cached_media( chat_id=userid, file_id=file.file_id, caption=f_caption, protect_content=True if ident == "filep" else False, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton('Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ', url=GRP_LNK), InlineKeyboardButton('Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ', url=CHNL_LNK) ],[ InlineKeyboardButton("Bᴏᴛ Oᴡɴᴇʀ", url="") ] ] ) )'''