Spaces:
Running
Running
import logging | |
import math | |
from FileStream import __version__ | |
from FileStream.bot import FileStream | |
from FileStream.server.exceptions import FIleNotFound | |
from FileStream.utils.bot_utils import gen_linkx, verify_user | |
from FileStream.config import Telegram | |
from FileStream.utils.database import Database | |
from FileStream.utils.translation import LANG, BUTTON | |
from pyrogram import filters, Client | |
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message | |
from pyrogram.enums.parse_mode import ParseMode | |
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
async def start(bot: Client, message: Message): | |
if not await verify_user(bot, message): | |
return | |
usr_cmd = message.text.split("_")[-1] | |
if usr_cmd == "/start": | |
await message.reply_text( | |
text=LANG.START_TEXT.format(message.from_user.mention, FileStream.username), | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=BUTTON.START_BUTTONS | |
) | |
else: | |
if "stream_" in message.text: | |
try: | |
file_check = await db.get_file(usr_cmd) | |
file_id = str(file_check['_id']) | |
if file_id == usr_cmd: | |
reply_markup, stream_text = await gen_linkx(m=message, _id=file_id, | |
name=[FileStream.username, FileStream.fname]) | |
await message.reply_text( | |
text=stream_text, | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=reply_markup, | |
quote=True | |
) | |
except FIleNotFound as e: | |
await message.reply_text("File Not Found") | |
except Exception as e: | |
await message.reply_text("Something Went Wrong") | |
logging.error(e) | |
elif "file_" in message.text: | |
try: | |
file_check = await db.get_file(usr_cmd) | |
db_id = str(file_check['_id']) | |
file_id = file_check['file_id'] | |
file_name = file_check['file_name'] | |
if db_id == usr_cmd: | |
await message.reply_cached_media(file_id=file_id, caption=f'**{file_name}**') | |
except FIleNotFound as e: | |
await message.reply_text("**File Not Found**") | |
except Exception as e: | |
await message.reply_text("Something Went Wrong") | |
logging.error(e) | |
else: | |
await message.reply_text(f"**Invalid Command**") | |
async def start(bot, message): | |
if not await verify_user(bot, message): | |
return | |
await message.reply_text( | |
text=LANG.ABOUT_TEXT.format(FileStream.fname, __version__), | |
disable_web_page_preview=True, | |
reply_markup=BUTTON.ABOUT_BUTTONS | |
) | |
async def help_handler(bot, message): | |
if not await verify_user(bot, message): | |
return | |
await message.reply_text( | |
text=LANG.HELP_TEXT.format(Telegram.OWNER_ID), | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=BUTTON.HELP_BUTTONS | |
) | |
# --------------------------------------------------------------------------------------------------- | |
async def my_files(bot: Client, message: Message): | |
if not await verify_user(bot, message): | |
return | |
user_files, total_files = await db.find_files(message.from_user.id, [1, 10]) | |
file_list = [] | |
async for x in user_files: | |
file_list.append([InlineKeyboardButton(x["file_name"], callback_data=f"myfile_{x['_id']}_{1}")]) | |
if total_files > 10: | |
file_list.append( | |
[ | |
InlineKeyboardButton("◄", callback_data="N/A"), | |
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", callback_data="N/A"), | |
InlineKeyboardButton("►", callback_data="userfiles_2") | |
], | |
) | |
if not file_list: | |
file_list.append( | |
[InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], | |
) | |
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) | |
await message.reply_photo(photo=Telegram.IMAGE_FILEID, | |
caption="Total files: {}".format(total_files), | |
reply_markup=InlineKeyboardMarkup(file_list)) | |