|
import logging |
|
from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid |
|
from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORTLINK_URL, SHORTLINK_API, IS_SHORTLINK, LOG_CHANNEL, TUTORIAL, GRP_LNK, CHNL_LNK, CUSTOM_FILE_CAPTION, SECOND_SHORTLINK_URL, SECOND_SHORTLINK_API |
|
from imdb import Cinemagoer |
|
import asyncio |
|
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup |
|
from pyrogram.errors import FloodWait, UserIsBlocked, MessageNotModified, PeerIdInvalid |
|
from pyrogram import enums |
|
from typing import Union |
|
from Script import script |
|
import pytz |
|
import random |
|
import re |
|
import os |
|
from datetime import datetime, date |
|
import string |
|
from typing import List |
|
from database.users_chats_db import db |
|
from bs4 import BeautifulSoup |
|
import requests |
|
import aiohttp |
|
from shortzy import Shortzy |
|
import http.client |
|
import json |
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.INFO) |
|
|
|
BTN_URL_REGEX = re.compile( |
|
r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))" |
|
) |
|
|
|
imdb = Cinemagoer() |
|
TOKENS = {} |
|
VERIFIED = {} |
|
BANNED = {} |
|
SECOND_SHORTENER = {} |
|
SMART_OPEN = '“' |
|
SMART_CLOSE = '”' |
|
START_CHAR = ('\'', '"', SMART_OPEN) |
|
|
|
|
|
class temp(object): |
|
BANNED_USERS = [] |
|
BANNED_CHATS = [] |
|
ME = None |
|
CURRENT=int(os.environ.get("SKIP", 2)) |
|
CANCEL = False |
|
MELCOW = {} |
|
U_NAME = None |
|
B_NAME = None |
|
GETALL = {} |
|
SHORT = {} |
|
SETTINGS = {} |
|
|
|
async def is_subscribed(bot, query): |
|
try: |
|
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) |
|
except UserNotParticipant: |
|
pass |
|
except Exception as e: |
|
logger.exception(e) |
|
else: |
|
if user.status != enums.ChatMemberStatus.BANNED: |
|
return True |
|
|
|
return False |
|
|
|
async def get_poster(query, bulk=False, id=False, file=None): |
|
if not id: |
|
|
|
query = (query.strip()).lower() |
|
title = query |
|
year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) |
|
if year: |
|
year = list_to_str(year[:1]) |
|
title = (query.replace(year, "")).strip() |
|
elif file is not None: |
|
year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) |
|
if year: |
|
year = list_to_str(year[:1]) |
|
else: |
|
year = None |
|
movieid = imdb.search_movie(title.lower(), results=10) |
|
if not movieid: |
|
return None |
|
if year: |
|
filtered=list(filter(lambda k: str(k.get('year')) == str(year), movieid)) |
|
if not filtered: |
|
filtered = movieid |
|
else: |
|
filtered = movieid |
|
movieid=list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) |
|
if not movieid: |
|
movieid = filtered |
|
if bulk: |
|
return movieid |
|
movieid = movieid[0].movieID |
|
else: |
|
movieid = query |
|
movie = imdb.get_movie(movieid) |
|
if movie.get("original air date"): |
|
date = movie["original air date"] |
|
elif movie.get("year"): |
|
date = movie.get("year") |
|
else: |
|
date = "N/A" |
|
plot = "" |
|
if not LONG_IMDB_DESCRIPTION: |
|
plot = movie.get('plot') |
|
if plot and len(plot) > 0: |
|
plot = plot[0] |
|
else: |
|
plot = movie.get('plot outline') |
|
if plot and len(plot) > 800: |
|
plot = plot[0:800] + "..." |
|
|
|
return { |
|
'title': movie.get('title'), |
|
'votes': movie.get('votes'), |
|
"aka": list_to_str(movie.get("akas")), |
|
"seasons": movie.get("number of seasons"), |
|
"box_office": movie.get('box office'), |
|
'localized_title': movie.get('localized title'), |
|
'kind': movie.get("kind"), |
|
"imdb_id": f"tt{movie.get('imdbID')}", |
|
"cast": list_to_str(movie.get("cast")), |
|
"runtime": list_to_str(movie.get("runtimes")), |
|
"countries": list_to_str(movie.get("countries")), |
|
"certificates": list_to_str(movie.get("certificates")), |
|
"languages": list_to_str(movie.get("languages")), |
|
"director": list_to_str(movie.get("director")), |
|
"writer":list_to_str(movie.get("writer")), |
|
"producer":list_to_str(movie.get("producer")), |
|
"composer":list_to_str(movie.get("composer")) , |
|
"cinematographer":list_to_str(movie.get("cinematographer")), |
|
"music_team": list_to_str(movie.get("music department")), |
|
"distributors": list_to_str(movie.get("distributors")), |
|
'release_date': date, |
|
'year': movie.get('year'), |
|
'genres': list_to_str(movie.get("genres")), |
|
'poster': movie.get('full-size cover url'), |
|
'plot': plot, |
|
'rating': str(movie.get("rating")), |
|
'url':f'https://www.imdb.com/title/tt{movieid}' |
|
} |
|
|
|
|
|
async def broadcast_messages(user_id, message): |
|
try: |
|
await message.copy(chat_id=user_id) |
|
return True, "Success" |
|
except FloodWait as e: |
|
await asyncio.sleep(e.x) |
|
return await broadcast_messages(user_id, message) |
|
except InputUserDeactivated: |
|
await db.delete_user(int(user_id)) |
|
logging.info(f"{user_id}-Removed from Database, since deleted account.") |
|
return False, "Deleted" |
|
except UserIsBlocked: |
|
logging.info(f"{user_id} -Blocked the bot.") |
|
return False, "Blocked" |
|
except PeerIdInvalid: |
|
await db.delete_user(int(user_id)) |
|
logging.info(f"{user_id} - PeerIdInvalid") |
|
return False, "Error" |
|
except Exception as e: |
|
return False, "Error" |
|
|
|
async def broadcast_messages_group(chat_id, message): |
|
try: |
|
kd = await message.copy(chat_id=chat_id) |
|
try: |
|
await kd.pin() |
|
except: |
|
pass |
|
return True, "Success" |
|
except FloodWait as e: |
|
await asyncio.sleep(e.x) |
|
return await broadcast_messages_group(chat_id, message) |
|
except Exception as e: |
|
return False, "Error" |
|
|
|
async def search_gagala(text): |
|
usr_agent = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' |
|
'Chrome/61.0.3163.100 Safari/537.36' |
|
} |
|
text = text.replace(" ", '+') |
|
url = f'https://www.google.com/search?q={text}' |
|
response = requests.get(url, headers=usr_agent) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
titles = soup.find_all( 'h3' ) |
|
return [title.getText() for title in titles] |
|
|
|
async def get_settings(group_id): |
|
settings = temp.SETTINGS.get(group_id) |
|
if not settings: |
|
settings = await db.get_settings(group_id) |
|
temp.SETTINGS[group_id] = settings |
|
return settings |
|
|
|
async def save_group_settings(group_id, key, value): |
|
current = await get_settings(group_id) |
|
current[key] = value |
|
temp.SETTINGS[group_id] = current |
|
await db.update_settings(group_id, current) |
|
|
|
def get_size(size): |
|
"""Get size in readable format""" |
|
|
|
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] |
|
size = float(size) |
|
i = 0 |
|
while size >= 1024.0 and i < len(units): |
|
i += 1 |
|
size /= 1024.0 |
|
return "%.2f %s" % (size, units[i]) |
|
|
|
def split_list(l, n): |
|
for i in range(0, len(l), n): |
|
yield l[i:i + n] |
|
|
|
def get_file_id(msg: Message): |
|
if msg.media: |
|
for message_type in ( |
|
"photo", |
|
"animation", |
|
"audio", |
|
"document", |
|
"video", |
|
"video_note", |
|
"voice", |
|
"sticker" |
|
): |
|
obj = getattr(msg, message_type) |
|
if obj: |
|
setattr(obj, "message_type", message_type) |
|
return obj |
|
|
|
def extract_user(message: Message) -> Union[int, str]: |
|
"""extracts the user from a message""" |
|
|
|
user_id = None |
|
user_first_name = None |
|
if message.reply_to_message: |
|
user_id = message.reply_to_message.from_user.id |
|
user_first_name = message.reply_to_message.from_user.first_name |
|
|
|
elif len(message.command) > 1: |
|
if ( |
|
len(message.entities) > 1 and |
|
message.entities[1].type == enums.MessageEntityType.TEXT_MENTION |
|
): |
|
|
|
required_entity = message.entities[1] |
|
user_id = required_entity.user.id |
|
user_first_name = required_entity.user.first_name |
|
else: |
|
user_id = message.command[1] |
|
|
|
user_first_name = user_id |
|
try: |
|
user_id = int(user_id) |
|
except ValueError: |
|
pass |
|
else: |
|
user_id = message.from_user.id |
|
user_first_name = message.from_user.first_name |
|
return (user_id, user_first_name) |
|
|
|
def list_to_str(k): |
|
if not k: |
|
return "N/A" |
|
elif len(k) == 1: |
|
return str(k[0]) |
|
elif MAX_LIST_ELM: |
|
k = k[:int(MAX_LIST_ELM)] |
|
return ' '.join(f'{elem}, ' for elem in k) |
|
else: |
|
return ' '.join(f'{elem}, ' for elem in k) |
|
|
|
def last_online(from_user): |
|
time = "" |
|
if from_user.is_bot: |
|
time += "🤖 Bot :(" |
|
elif from_user.status == enums.UserStatus.RECENTLY: |
|
time += "Recently" |
|
elif from_user.status == enums.UserStatus.LAST_WEEK: |
|
time += "Within the last week" |
|
elif from_user.status == enums.UserStatus.LAST_MONTH: |
|
time += "Within the last month" |
|
elif from_user.status == enums.UserStatus.LONG_AGO: |
|
time += "A long time ago :(" |
|
elif from_user.status == enums.UserStatus.ONLINE: |
|
time += "Currently Online" |
|
elif from_user.status == enums.UserStatus.OFFLINE: |
|
time += from_user.last_online_date.strftime("%a, %d %b %Y, %H:%M:%S") |
|
return time |
|
|
|
|
|
def split_quotes(text: str) -> List: |
|
if not any(text.startswith(char) for char in START_CHAR): |
|
return text.split(None, 1) |
|
counter = 1 |
|
while counter < len(text): |
|
if text[counter] == "\\": |
|
counter += 1 |
|
elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): |
|
break |
|
counter += 1 |
|
else: |
|
return text.split(None, 1) |
|
|
|
|
|
key = remove_escapes(text[1:counter].strip()) |
|
|
|
rest = text[counter + 1:].strip() |
|
if not key: |
|
key = text[0] + text[0] |
|
return list(filter(None, [key, rest])) |
|
|
|
def gfilterparser(text, keyword): |
|
if "buttonalert" in text: |
|
text = (text.replace("\n", "\\n").replace("\t", "\\t")) |
|
buttons = [] |
|
note_data = "" |
|
prev = 0 |
|
i = 0 |
|
alerts = [] |
|
for match in BTN_URL_REGEX.finditer(text): |
|
|
|
n_escapes = 0 |
|
to_check = match.start(1) - 1 |
|
while to_check > 0 and text[to_check] == "\\": |
|
n_escapes += 1 |
|
to_check -= 1 |
|
|
|
|
|
if n_escapes % 2 == 0: |
|
note_data += text[prev:match.start(1)] |
|
prev = match.end(1) |
|
if match.group(3) == "buttonalert": |
|
|
|
if bool(match.group(5)) and buttons: |
|
buttons[-1].append(InlineKeyboardButton( |
|
text=match.group(2), |
|
callback_data=f"gfilteralert:{i}:{keyword}" |
|
)) |
|
else: |
|
buttons.append([InlineKeyboardButton( |
|
text=match.group(2), |
|
callback_data=f"gfilteralert:{i}:{keyword}" |
|
)]) |
|
i += 1 |
|
alerts.append(match.group(4)) |
|
elif bool(match.group(5)) and buttons: |
|
buttons[-1].append(InlineKeyboardButton( |
|
text=match.group(2), |
|
url=match.group(4).replace(" ", "") |
|
)) |
|
else: |
|
buttons.append([InlineKeyboardButton( |
|
text=match.group(2), |
|
url=match.group(4).replace(" ", "") |
|
)]) |
|
|
|
else: |
|
note_data += text[prev:to_check] |
|
prev = match.start(1) - 1 |
|
else: |
|
note_data += text[prev:] |
|
|
|
try: |
|
return note_data, buttons, alerts |
|
except: |
|
return note_data, buttons, None |
|
|
|
def parser(text, keyword): |
|
if "buttonalert" in text: |
|
text = (text.replace("\n", "\\n").replace("\t", "\\t")) |
|
buttons = [] |
|
note_data = "" |
|
prev = 0 |
|
i = 0 |
|
alerts = [] |
|
for match in BTN_URL_REGEX.finditer(text): |
|
|
|
n_escapes = 0 |
|
to_check = match.start(1) - 1 |
|
while to_check > 0 and text[to_check] == "\\": |
|
n_escapes += 1 |
|
to_check -= 1 |
|
|
|
|
|
if n_escapes % 2 == 0: |
|
note_data += text[prev:match.start(1)] |
|
prev = match.end(1) |
|
if match.group(3) == "buttonalert": |
|
|
|
if bool(match.group(5)) and buttons: |
|
buttons[-1].append(InlineKeyboardButton( |
|
text=match.group(2), |
|
callback_data=f"alertmessage:{i}:{keyword}" |
|
)) |
|
else: |
|
buttons.append([InlineKeyboardButton( |
|
text=match.group(2), |
|
callback_data=f"alertmessage:{i}:{keyword}" |
|
)]) |
|
i += 1 |
|
alerts.append(match.group(4)) |
|
elif bool(match.group(5)) and buttons: |
|
buttons[-1].append(InlineKeyboardButton( |
|
text=match.group(2), |
|
url=match.group(4).replace(" ", "") |
|
)) |
|
else: |
|
buttons.append([InlineKeyboardButton( |
|
text=match.group(2), |
|
url=match.group(4).replace(" ", "") |
|
)]) |
|
|
|
else: |
|
note_data += text[prev:to_check] |
|
prev = match.start(1) - 1 |
|
else: |
|
note_data += text[prev:] |
|
|
|
try: |
|
return note_data, buttons, alerts |
|
except: |
|
return note_data, buttons, None |
|
|
|
def remove_escapes(text: str) -> str: |
|
res = "" |
|
is_escaped = False |
|
for counter in range(len(text)): |
|
if is_escaped: |
|
res += text[counter] |
|
is_escaped = False |
|
elif text[counter] == "\\": |
|
is_escaped = True |
|
else: |
|
res += text[counter] |
|
return res |
|
|
|
|
|
def humanbytes(size): |
|
if not size: |
|
return "" |
|
power = 2**10 |
|
n = 0 |
|
Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} |
|
while size > power: |
|
size /= power |
|
n += 1 |
|
return str(round(size, 2)) + " " + Dic_powerN[n] + 'B' |
|
|
|
async def get_shortlink(chat_id, link, second=False): |
|
settings = await get_settings(chat_id) |
|
if 'shortlink' in settings.keys(): |
|
URL = settings['shortlink'] |
|
API = settings['shortlink_api'] |
|
else: |
|
URL = SHORTLINK_URL |
|
API = SHORTLINK_API |
|
if URL.startswith("shorturllink") or URL.startswith("terabox.in") or URL.startswith("urlshorten.in") or second: |
|
URL = SECOND_SHORTLINK_URL |
|
API = SECOND_SHORTLINK_API |
|
if URL == "api.shareus.io": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
url = f'https://{URL}/easy_api' |
|
params = { |
|
"key": API, |
|
"link": link, |
|
} |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: |
|
data = await response.text() |
|
return data |
|
except Exception as e: |
|
logger.error(e) |
|
return link |
|
else: |
|
shortzy = Shortzy(api_key=API, base_site=URL) |
|
link = await shortzy.convert(link) |
|
return link |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_tutorial(chat_id): |
|
settings = await get_settings(chat_id) |
|
if 'tutorial' in settings.keys(): |
|
TUTORIAL_URL = settings['tutorial'] |
|
else: |
|
TUTORIAL_URL = TUTORIAL |
|
return TUTORIAL_URL |
|
|
|
async def get_verify_shorted_link(link): |
|
API = SHORTLINK_API |
|
URL = SHORTLINK_URL |
|
https = link.split(":")[0] |
|
if "http" == https: |
|
https = "https" |
|
link = link.replace("http", https) |
|
|
|
if URL == "api.shareus.in": |
|
url = f"https://{URL}/shortLink" |
|
params = {"token": API, |
|
"format": "json", |
|
"link": link, |
|
} |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: |
|
data = await response.json(content_type="text/html") |
|
if data["status"] == "success": |
|
return data["shortlink"] |
|
else: |
|
logger.error(f"Error: {data['message']}") |
|
return f'https://{URL}/shortLink?token={API}&format=json&link={link}' |
|
|
|
except Exception as e: |
|
logger.error(e) |
|
return f'https://{URL}/shortLink?token={API}&format=json&link={link}' |
|
else: |
|
url = f'https://{URL}/api' |
|
params = {'api': API, |
|
'url': link, |
|
} |
|
try: |
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: |
|
data = await response.json() |
|
if data["status"] == "success": |
|
return data['shortenedUrl'] |
|
else: |
|
logger.error(f"Error: {data['message']}") |
|
return f'https://{URL}/api?api={API}&link={link}' |
|
|
|
except Exception as e: |
|
logger.error(e) |
|
return f'{URL}/api?api={API}&link={link}' |
|
|
|
async def check_token(bot, userid, token): |
|
user = await bot.get_users(userid) |
|
if not await db.is_user_exist(user.id): |
|
await db.add_user(user.id, user.first_name) |
|
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) |
|
if user.id in TOKENS.keys(): |
|
TKN = TOKENS[user.id] |
|
if token in TKN.keys(): |
|
is_used = TKN[token] |
|
if is_used == True: |
|
return False |
|
else: |
|
return True |
|
else: |
|
return False |
|
|
|
async def get_token(bot, userid, link): |
|
user = await bot.get_users(userid) |
|
if not await db.is_user_exist(user.id): |
|
await db.add_user(user.id, user.first_name) |
|
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) |
|
token = ''.join(random.choices(string.ascii_letters + string.digits, k=7)) |
|
TOKENS[user.id] = {token: False} |
|
link = f"{link}verify-{user.id}-{token}" |
|
shortened_verify_url = await get_verify_shorted_link(link) |
|
return str(shortened_verify_url) |
|
|
|
async def verify_user(bot, userid, token): |
|
user = await bot.get_users(userid) |
|
if not await db.is_user_exist(user.id): |
|
await db.add_user(user.id, user.first_name) |
|
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) |
|
TOKENS[user.id] = {token: True} |
|
tz = pytz.timezone('Asia/Kolkata') |
|
today = date.today() |
|
VERIFIED[user.id] = str(today) |
|
|
|
async def check_verification(bot, userid): |
|
user = await bot.get_users(userid) |
|
if not await db.is_user_exist(user.id): |
|
await db.add_user(user.id, user.first_name) |
|
await bot.send_message(LOG_CHANNEL, script.LOG_TEXT_P.format(user.id, user.mention)) |
|
tz = pytz.timezone('Asia/Kolkata') |
|
today = date.today() |
|
if user.id in VERIFIED.keys(): |
|
EXP = VERIFIED[user.id] |
|
years, month, day = EXP.split('-') |
|
comp = date(int(years), int(month), int(day)) |
|
if comp<today: |
|
return False |
|
else: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
async def send_all(bot, userid, files, ident, chat_id, user_name, query): |
|
settings = await get_settings(chat_id) |
|
if 'is_shortlink' in settings.keys(): |
|
ENABLE_SHORTLINK = settings['is_shortlink'] |
|
else: |
|
await save_group_settings(message.chat.id, 'is_shortlink', False) |
|
ENABLE_SHORTLINK = False |
|
try: |
|
if ENABLE_SHORTLINK: |
|
for file in files: |
|
title = file.file_name |
|
size = get_size(file.file_size) |
|
await bot.send_message(chat_id=userid, text=f"<b>Hᴇʏ ᴛʜᴇʀᴇ {user_name} 👋🏽 \n\n✅ Sᴇᴄᴜʀᴇ ʟɪɴᴋ ᴛᴏ ʏᴏᴜʀ ғɪʟᴇ ʜᴀs sᴜᴄᴄᴇssғᴜʟʟʏ ʙᴇᴇɴ ɢᴇɴᴇʀᴀᴛᴇᴅ ᴘʟᴇᴀsᴇ ᴄʟɪᴄᴋ ᴅᴏᴡɴʟᴏᴀᴅ ʙᴜᴛᴛᴏɴ\n\n🗃️ Fɪʟᴇ Nᴀᴍᴇ : {title}\n🔖 Fɪʟᴇ Sɪᴢᴇ : {size}</b>", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📤 Dᴏᴡɴʟᴏᴀᴅ 📥", url=await get_shortlink(chat_id, f"https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}"))]])) |
|
else: |
|
for file in files: |
|
f_caption = file.caption |
|
title = file.file_name |
|
size = get_size(file.file_size) |
|
if CUSTOM_FILE_CAPTION: |
|
try: |
|
f_caption = CUSTOM_FILE_CAPTION.format(file_name='' if title is None else title, |
|
file_size='' if size is None else size, |
|
file_caption='' if f_caption is None else f_caption) |
|
except Exception as e: |
|
print(e) |
|
f_caption = f_caption |
|
if f_caption is None: |
|
f_caption = f"{title}" |
|
await bot.send_cached_media( |
|
chat_id=userid, |
|
file_id=file.file_id, |
|
caption=f_caption, |
|
protect_content=True if ident == "filep" else False, |
|
reply_markup=InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton('Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ', url=GRP_LNK), |
|
InlineKeyboardButton('Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ', url=CHNL_LNK) |
|
],[ |
|
InlineKeyboardButton("Bᴏᴛ Oᴡɴᴇʀ", url="t.me/Kgashok04") |
|
] |
|
] |
|
) |
|
) |
|
except UserIsBlocked: |
|
await query.answer('Uɴʙʟᴏᴄᴋ ᴛʜᴇ ʙᴏᴛ ᴍᴀʜɴ !', show_alert=True) |
|
except PeerIdInvalid: |
|
await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) |
|
except Exception as e: |
|
await query.answer('Hᴇʏ, Sᴛᴀʀᴛ Bᴏᴛ Fɪʀsᴛ Aɴᴅ Cʟɪᴄᴋ Sᴇɴᴅ Aʟʟ', show_alert=True) |
|
'''if IS_SHORTLINK == True: |
|
for file in files: |
|
title = file.file_name |
|
size = get_size(file.file_size) |
|
await bot.send_message(chat_id=userid, text=f"<b>Hᴇʏ ᴛʜᴇʀᴇ {user_name} 👋🏽 \n\n✅ Sᴇᴄᴜʀᴇ ʟɪɴᴋ ᴛᴏ ʏᴏᴜʀ ғɪʟᴇ ʜᴀs sᴜᴄᴄᴇssғᴜʟʟʏ ʙᴇᴇɴ ɢᴇɴᴇʀᴀᴛᴇᴅ ᴘʟᴇᴀsᴇ ᴄʟɪᴄᴋ ᴅᴏᴡɴʟᴏᴀᴅ ʙᴜᴛᴛᴏɴ\n\n🗃️ Fɪʟᴇ Nᴀᴍᴇ : {title}\n🔖 Fɪʟᴇ Sɪᴢᴇ : {size}</b>", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("📤 Dᴏᴡɴʟᴏᴀᴅ 📥", url=await get_shortlink(chat_id, f"https://telegram.me/{temp.U_NAME}?start=files_{file.file_id}"))]]) |
|
) |
|
else: |
|
for file in files: |
|
f_caption = file.caption |
|
title = file.file_name |
|
size = get_size(file.file_size) |
|
if CUSTOM_FILE_CAPTION: |
|
try: |
|
f_caption = CUSTOM_FILE_CAPTION.format(file_name='' if title is None else title, |
|
file_size='' if size is None else size, |
|
file_caption='' if f_caption is None else f_caption) |
|
except Exception as e: |
|
print(e) |
|
f_caption = f_caption |
|
if f_caption is None: |
|
f_caption = f"{title}" |
|
await bot.send_cached_media( |
|
chat_id=userid, |
|
file_id=file.file_id, |
|
caption=f_caption, |
|
protect_content=True if ident == "filep" else False, |
|
reply_markup=InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton('Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ', url=GRP_LNK), |
|
InlineKeyboardButton('Uᴘᴅᴀᴛᴇs Cʜᴀɴɴᴇʟ', url=CHNL_LNK) |
|
],[ |
|
InlineKeyboardButton("Bᴏᴛ Oᴡɴᴇʀ", url="t.me/Kgashok04") |
|
] |
|
] |
|
) |
|
)''' |