|
import datetime
|
|
import math
|
|
from FileStream import __version__
|
|
from FileStream.bot import FileStream
|
|
from FileStream.config import Telegram, Server
|
|
from FileStream.utils.translation import LANG, BUTTON
|
|
from FileStream.utils.bot_utils import gen_link
|
|
from FileStream.utils.database import Database
|
|
from FileStream.utils.human_readable import humanbytes
|
|
from FileStream.server.exceptions import FIleNotFound
|
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
|
|
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES
|
|
from pyrogram.enums.parse_mode import ParseMode
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
|
|
|
|
|
|
@FileStream.on_callback_query()
|
|
async def cb_data(bot, update: CallbackQuery):
|
|
usr_cmd = update.data.split("_")
|
|
if usr_cmd[0] == "home":
|
|
await update.message.edit_text(
|
|
text=LANG.START_TEXT.format(update.from_user.mention, FileStream.username),
|
|
disable_web_page_preview=True,
|
|
reply_markup=BUTTON.START_BUTTONS
|
|
)
|
|
elif usr_cmd[0] == "help":
|
|
await update.message.edit_text(
|
|
text=LANG.HELP_TEXT.format(Telegram.OWNER_ID),
|
|
disable_web_page_preview=True,
|
|
reply_markup=BUTTON.HELP_BUTTONS
|
|
)
|
|
elif usr_cmd[0] == "about":
|
|
await update.message.edit_text(
|
|
text=LANG.ABOUT_TEXT.format(FileStream.fname, __version__),
|
|
disable_web_page_preview=True,
|
|
reply_markup=BUTTON.ABOUT_BUTTONS
|
|
)
|
|
|
|
|
|
|
|
elif usr_cmd[0] == "N/A":
|
|
await update.answer("N/A", True)
|
|
elif usr_cmd[0] == "close":
|
|
await update.message.delete()
|
|
elif usr_cmd[0] == "msgdelete":
|
|
await update.message.edit_caption(
|
|
caption= "**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n",
|
|
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("ʏᴇs", callback_data=f"msgdelyes_{usr_cmd[1]}_{usr_cmd[2]}"), InlineKeyboardButton("ɴᴏ", callback_data=f"myfile_{usr_cmd[1]}_{usr_cmd[2]}")]])
|
|
)
|
|
elif usr_cmd[0] == "msgdelyes":
|
|
await delete_user_file(usr_cmd[1], int(usr_cmd[2]), update)
|
|
return
|
|
elif usr_cmd[0] == "msgdelpvt":
|
|
await update.message.edit_caption(
|
|
caption= "**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n",
|
|
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("ʏᴇs", callback_data=f"msgdelpvtyes_{usr_cmd[1]}"), InlineKeyboardButton("ɴᴏ", callback_data=f"mainstream_{usr_cmd[1]}")]])
|
|
)
|
|
elif usr_cmd[0] == "msgdelpvtyes":
|
|
await delete_user_filex(usr_cmd[1], update)
|
|
return
|
|
|
|
elif usr_cmd[0] == "mainstream":
|
|
_id = usr_cmd[1]
|
|
reply_markup, stream_text = await gen_link(_id=_id)
|
|
await update.message.edit_text(
|
|
text=stream_text,
|
|
parse_mode=ParseMode.HTML,
|
|
disable_web_page_preview=True,
|
|
reply_markup=reply_markup,
|
|
)
|
|
|
|
elif usr_cmd[0] == "userfiles":
|
|
file_list, total_files = await gen_file_list_button(int(usr_cmd[1]), update.from_user.id)
|
|
await update.message.edit_caption(
|
|
caption="Total files: {}".format(total_files),
|
|
reply_markup=InlineKeyboardMarkup(file_list)
|
|
)
|
|
elif usr_cmd[0] == "myfile":
|
|
await gen_file_menu(usr_cmd[1], usr_cmd[2], update)
|
|
return
|
|
elif usr_cmd[0] == "sendfile":
|
|
myfile = await db.get_file(usr_cmd[1])
|
|
file_name = myfile['file_name']
|
|
await update.answer(f"Sending File {file_name}")
|
|
await update.message.reply_cached_media(myfile['file_id'], caption=f'**{file_name}**')
|
|
else:
|
|
await update.message.delete()
|
|
|
|
|
|
|
|
|
|
|
|
async def gen_file_list_button(file_list_no: int, user_id: int):
|
|
|
|
file_range=[file_list_no*10-10+1, file_list_no*10]
|
|
user_files, total_files=await db.find_files(user_id, file_range)
|
|
|
|
file_list=[]
|
|
async for x in user_files:
|
|
file_list.append([InlineKeyboardButton(x["file_name"], callback_data=f"myfile_{x['_id']}_{file_list_no}")])
|
|
if total_files > 10:
|
|
file_list.append(
|
|
[InlineKeyboardButton("◄", callback_data="{}".format("userfiles_"+str(file_list_no-1) if file_list_no > 1 else 'N/A')),
|
|
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", callback_data="N/A"),
|
|
InlineKeyboardButton("►", callback_data="{}".format("userfiles_"+str(file_list_no+1) if total_files > file_list_no*10 else 'N/A'))]
|
|
)
|
|
if not file_list:
|
|
file_list.append(
|
|
[InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")])
|
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
|
|
return file_list, total_files
|
|
|
|
async def gen_file_menu(_id, file_list_no, update: CallbackQuery):
|
|
try:
|
|
myfile_info=await db.get_file(_id)
|
|
except FIleNotFound:
|
|
await update.answer("File Not Found")
|
|
return
|
|
|
|
file_id=FileId.decode(myfile_info['file_id'])
|
|
|
|
if file_id.file_type in PHOTO_TYPES:
|
|
file_type = "Image"
|
|
elif file_id.file_type == FileType.VOICE:
|
|
file_type = "Voice"
|
|
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, FileType.VIDEO_NOTE):
|
|
file_type = "Video"
|
|
elif file_id.file_type == FileType.DOCUMENT:
|
|
file_type = "Document"
|
|
elif file_id.file_type == FileType.STICKER:
|
|
file_type = "Sticker"
|
|
elif file_id.file_type == FileType.AUDIO:
|
|
file_type = "Audio"
|
|
else:
|
|
file_type = "Unknown"
|
|
|
|
page_link = f"{Server.URL}watch/{myfile_info['_id']}"
|
|
stream_link = f"{Server.URL}dl/{myfile_info['_id']}"
|
|
if "video" in file_type.lower():
|
|
MYFILES_BUTTONS = InlineKeyboardMarkup(
|
|
[
|
|
[InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)],
|
|
[InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
|
|
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")],
|
|
[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))]
|
|
]
|
|
)
|
|
else:
|
|
MYFILES_BUTTONS = InlineKeyboardMarkup(
|
|
[
|
|
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)],
|
|
[InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
|
|
InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")],
|
|
[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))]
|
|
]
|
|
)
|
|
|
|
TiMe = myfile_info['time']
|
|
if type(TiMe) == float:
|
|
date = datetime.datetime.fromtimestamp(TiMe)
|
|
await update.edit_message_caption(
|
|
caption="**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`".format(myfile_info['file_name'],
|
|
humanbytes(int(myfile_info['file_size'])),
|
|
file_type,
|
|
TiMe if isinstance(TiMe,str) else date.date()),
|
|
reply_markup=MYFILES_BUTTONS )
|
|
|
|
|
|
async def delete_user_file(_id, file_list_no: int, update:CallbackQuery):
|
|
|
|
try:
|
|
myfile_info=await db.get_file(_id)
|
|
except FIleNotFound:
|
|
await update.answer("File Already Deleted")
|
|
return
|
|
|
|
await db.delete_one_file(myfile_info['_id'])
|
|
await db.count_links(update.from_user.id, "-")
|
|
await update.message.edit_caption(
|
|
caption= "**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" + update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ", ""),
|
|
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]])
|
|
)
|
|
|
|
async def delete_user_filex(_id, update:CallbackQuery):
|
|
|
|
try:
|
|
myfile_info=await db.get_file(_id)
|
|
except FIleNotFound:
|
|
await update.answer("File Already Deleted")
|
|
return
|
|
|
|
await db.delete_one_file(myfile_info['_id'])
|
|
await db.count_links(update.from_user.id, "-")
|
|
await update.message.edit_caption(
|
|
caption= "**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n",
|
|
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]])
|
|
)
|
|
|
|
|