# @SPACE_X_BOTS | @Clinton_Abraham import logging, requests, urllib.parse, os, time, shutil, asyncio, json, math logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger("pyrogram").setLevel(logging.WARNING) logger = logging.getLogger(__name__) from config import Config from pyrogram import filters from database.access import clinton from translation import Translation from database.adduser import AddUser from pyrogram import Client as Clinton from hachoir.parser import createParser from hachoir.metadata import extractMetadata from helper_funcs.display_progress import humanbytes from helper_funcs.help_uploadbot import DownLoadFile from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton from helper_funcs.display_progress import progress_for_pyrogram, humanbytes, TimeFormatter @Clinton.on_message(filters.private & ~filters.via_bot & filters.regex(pattern=".*http.*")) async def echo(bot, update): await AddUser(bot, update) imog = await update.reply_text("Processing...⚡", reply_to_message_id=update.message_id) youtube_dl_username = None youtube_dl_password = None file_name = None url = update.text if "|" in url: url_parts = url.split("|") if len(url_parts) == 2: url = url_parts[0] file_name = url_parts[1] elif len(url_parts) == 4: url = url_parts[0] file_name = url_parts[1] youtube_dl_username = url_parts[2] youtube_dl_password = url_parts[3] else: for entity in update.entities: if entity.type == "text_link": url = entity.url elif entity.type == "url": o = entity.offset l = entity.length url = url[o:o + l] if url is not None: url = url.strip() if file_name is not None: file_name = file_name.strip() # https://stackoverflow.com/a/761825/4723940 if youtube_dl_username is not None: youtube_dl_username = youtube_dl_username.strip() if youtube_dl_password is not None: youtube_dl_password = youtube_dl_password.strip() logger.info(url) logger.info(file_name) else: for entity in update.entities: if entity.type == "text_link": url = entity.url elif entity.type == "url": o = entity.offset l = entity.length url = url[o:o + l] if Config.HTTP_PROXY != "": command_to_exec = [ "yt-dlp", "--no-warnings", "--youtube-skip-dash-manifest", "-j", url, "--proxy", Config.HTTP_PROXY ] else: command_to_exec = [ "yt-dlp", "--no-warnings", "--youtube-skip-dash-manifest", "-j", url ] if youtube_dl_username is not None: command_to_exec.append("--username") command_to_exec.append(youtube_dl_username) if youtube_dl_password is not None: command_to_exec.append("--password") command_to_exec.append(youtube_dl_password) process = await asyncio.create_subprocess_exec(*command_to_exec, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await process.communicate() e_response = stderr.decode().strip() t_response = stdout.decode().strip() if e_response and "nonnumeric port" not in e_response: error_message = e_response.replace(Translation.ERROR_YTDLP, "") if "This video is only available for registered users." in error_message: error_message = Translation.SET_CUSTOM_USERNAME_PASSWORD else: error_message = "Invalid url 🚸" await bot.send_message(chat_id=update.chat.id, text=Translation.NO_VOID_FORMAT_FOUND.format(str(error_message)), disable_web_page_preview=True, parse_mode="html", reply_to_message_id=update.message_id) await imog.delete(True) return False if t_response: # logger.info(t_response) x_reponse = t_response if "\n" in x_reponse: x_reponse, _ = x_reponse.split("\n") response_json = json.loads(x_reponse) save_ytdl_json_path = Config.DOWNLOAD_LOCATION + \ "/" + str(update.from_user.id) + ".json" with open(save_ytdl_json_path, "w", encoding="utf8") as outfile: json.dump(response_json, outfile, ensure_ascii=False) # logger.info(response_json) inline_keyboard = [] duration = None if "duration" in response_json: duration = response_json["duration"] if "formats" in response_json: for formats in response_json["formats"]: format_id = formats.get("format_id") format_string = formats.get("format_note") if format_string is None: format_string = formats.get("format") format_ext = formats.get("ext") approx_file_size = "" if "filesize" in formats: approx_file_size = humanbytes(formats["filesize"]) cb_string_video = "{}|{}|{}".format( "video", format_id, format_ext) cb_string_file = "{}|{}|{}".format( "file", format_id, format_ext) if format_string is not None and not "audio only" in format_string: ikeyboard = [ InlineKeyboardButton( "S " + format_string + " video " + approx_file_size + " ", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "D " + format_ext + " " + approx_file_size + " ", callback_data=(cb_string_file).encode("UTF-8") ) ] """if duration is not None: cb_string_video_message = "{}|{}|{}".format( "vm", format_id, format_ext) ikeyboard.append( InlineKeyboardButton( "VM", callback_data=( cb_string_video_message).encode("UTF-8") ) )""" else: # special weird case :\ ikeyboard = [ InlineKeyboardButton( "SVideo [" + "] ( " + approx_file_size + " )", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "DFile [" + "] ( " + approx_file_size + " )", callback_data=(cb_string_file).encode("UTF-8") ) ] inline_keyboard.append(ikeyboard) if duration is not None: cb_string_64 = "{}|{}|{}".format("audio", "64k", "mp3") cb_string_128 = "{}|{}|{}".format("audio", "128k", "mp3") cb_string = "{}|{}|{}".format("audio", "320k", "mp3") inline_keyboard.append([ InlineKeyboardButton( "MP3 " + "(" + "64 kbps" + ")", callback_data=cb_string_64.encode("UTF-8")), InlineKeyboardButton( "MP3 " + "(" + "128 kbps" + ")", callback_data=cb_string_128.encode("UTF-8")) ]) inline_keyboard.append([ InlineKeyboardButton( "MP3 " + "(" + "320 kbps" + ")", callback_data=cb_string.encode("UTF-8")) ]) else: format_id = response_json["format_id"] format_ext = response_json["ext"] cb_string_file = "{}|{}|{}".format( "file", format_id, format_ext) cb_string_video = "{}|{}|{}".format( "video", format_id, format_ext) inline_keyboard.append([ InlineKeyboardButton( "SVideo", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "DFile", callback_data=(cb_string_file).encode("UTF-8") ) ]) cb_string_file = "{}={}={}".format( "file", format_id, format_ext) cb_string_video = "{}={}={}".format( "video", format_id, format_ext) inline_keyboard.append([ InlineKeyboardButton( "video", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "file", callback_data=(cb_string_file).encode("UTF-8") ) ]) reply_markup = InlineKeyboardMarkup(inline_keyboard) await imog.delete(True) await bot.send_message( chat_id=update.chat.id, text=Translation.FORMAT_SELECTION + "\n" + Translation.SET_CUSTOM_USERNAME_PASSWORD, reply_markup=reply_markup, parse_mode="html", reply_to_message_id=update.message_id ) else: inline_keyboard = [] cb_string_file = "{}={}={}".format( "file", "LFO", "NONE") cb_string_video = "{}={}={}".format( "video", "OFL", "ENON") inline_keyboard.append([ InlineKeyboardButton( "SVideo", callback_data=(cb_string_video).encode("UTF-8") ), InlineKeyboardButton( "DFile", callback_data=(cb_string_file).encode("UTF-8") ) ]) reply_markup = InlineKeyboardMarkup(inline_keyboard) await imog.delete(True) await bot.send_message( chat_id=update.chat.id, text=Translation.FORMAT_SELECTION, reply_markup=reply_markup, parse_mode="html", reply_to_message_id=update.message_id)