|
import logging
|
|
import math
|
|
from FileStream import __version__
|
|
from FileStream.bot import FileStream
|
|
from FileStream.server.exceptions import FIleNotFound
|
|
from FileStream.utils.bot_utils import gen_linkx, verify_user
|
|
from FileStream.config import Telegram
|
|
from FileStream.utils.database import Database
|
|
from FileStream.utils.translation import LANG, BUTTON
|
|
from pyrogram import filters, Client
|
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message
|
|
from pyrogram.enums.parse_mode import ParseMode
|
|
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
|
|
|
|
@FileStream.on_message(filters.command('start') & filters.private)
|
|
async def start(bot: Client, message: Message):
|
|
if not await verify_user(bot, message):
|
|
return
|
|
usr_cmd = message.text.split("_")[-1]
|
|
|
|
if usr_cmd == "/start":
|
|
await message.reply_text(
|
|
text=LANG.START_TEXT.format(message.from_user.mention, FileStream.username),
|
|
parse_mode=ParseMode.HTML,
|
|
disable_web_page_preview=True,
|
|
reply_markup=BUTTON.START_BUTTONS
|
|
)
|
|
else:
|
|
if "stream_" in message.text:
|
|
try:
|
|
file_check = await db.get_file(usr_cmd)
|
|
file_id = str(file_check['_id'])
|
|
if file_id == usr_cmd:
|
|
reply_markup, stream_text = await gen_linkx(m=message, _id=file_id,
|
|
name=[FileStream.username, FileStream.fname])
|
|
await message.reply_text(
|
|
text=stream_text,
|
|
parse_mode=ParseMode.HTML,
|
|
disable_web_page_preview=True,
|
|
reply_markup=reply_markup,
|
|
quote=True
|
|
)
|
|
|
|
except FIleNotFound as e:
|
|
await message.reply_text("File Not Found")
|
|
except Exception as e:
|
|
await message.reply_text("Something Went Wrong")
|
|
logging.error(e)
|
|
|
|
elif "file_" in message.text:
|
|
try:
|
|
file_check = await db.get_file(usr_cmd)
|
|
db_id = str(file_check['_id'])
|
|
file_id = file_check['file_id']
|
|
file_name = file_check['file_name']
|
|
if db_id == usr_cmd:
|
|
await message.reply_cached_media(file_id=file_id, caption=f'**{file_name}**')
|
|
|
|
except FIleNotFound as e:
|
|
await message.reply_text("**File Not Found**")
|
|
except Exception as e:
|
|
await message.reply_text("Something Went Wrong")
|
|
logging.error(e)
|
|
|
|
else:
|
|
await message.reply_text(f"**Invalid Command**")
|
|
|
|
@FileStream.on_message(filters.private & filters.command(["about"]))
|
|
async def start(bot, message):
|
|
if not await verify_user(bot, message):
|
|
return
|
|
await message.reply_text(
|
|
text=LANG.ABOUT_TEXT.format(FileStream.fname, __version__),
|
|
disable_web_page_preview=True,
|
|
reply_markup=BUTTON.ABOUT_BUTTONS
|
|
)
|
|
|
|
|
|
@FileStream.on_message((filters.command('help')) & filters.private)
|
|
async def help_handler(bot, message):
|
|
if not await verify_user(bot, message):
|
|
return
|
|
await message.reply_text(
|
|
text=LANG.HELP_TEXT.format(Telegram.OWNER_ID),
|
|
parse_mode=ParseMode.HTML,
|
|
disable_web_page_preview=True,
|
|
reply_markup=BUTTON.HELP_BUTTONS
|
|
)
|
|
|
|
|
|
|
|
@FileStream.on_message(filters.command('files') & filters.private)
|
|
async def my_files(bot: Client, message: Message):
|
|
if not await verify_user(bot, message):
|
|
return
|
|
user_files, total_files = await db.find_files(message.from_user.id, [1, 10])
|
|
|
|
file_list = []
|
|
async for x in user_files:
|
|
file_list.append([InlineKeyboardButton(x["file_name"], callback_data=f"myfile_{x['_id']}_{1}")])
|
|
if total_files > 10:
|
|
file_list.append(
|
|
[
|
|
InlineKeyboardButton("◄", callback_data="N/A"),
|
|
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", callback_data="N/A"),
|
|
InlineKeyboardButton("►", callback_data="userfiles_2")
|
|
],
|
|
)
|
|
if not file_list:
|
|
file_list.append(
|
|
[InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")],
|
|
)
|
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
|
|
await message.reply_photo(photo=Telegram.IMAGE_FILEID,
|
|
caption="Total files: {}".format(total_files),
|
|
reply_markup=InlineKeyboardMarkup(file_list))
|
|
|
|
|
|
|