Spaces:
Running
Running
from __future__ import annotations | |
import logging | |
from datetime import datetime | |
from pyrogram import Client | |
from typing import Any, Optional | |
from pyrogram.enums import ParseMode, ChatType | |
from pyrogram.types import Message | |
from pyrogram.file_id import FileId | |
from FileStream.bot import FileStream | |
from FileStream.utils.database import Database | |
from FileStream.config import Telegram, Server | |
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
async def get_file_ids(client: Client | bool, db_id: str, multi_clients, message) -> Optional[FileId]: | |
logging.debug("Starting of get_file_ids") | |
file_info = await db.get_file(db_id) | |
if (not "file_ids" in file_info) or not client: | |
logging.debug("Storing file_id of all clients in DB") | |
log_msg = await send_file(FileStream, db_id, file_info['file_id'], message) | |
await db.update_file_ids(db_id, await update_file_id(log_msg.id, multi_clients)) | |
logging.debug("Stored file_id of all clients in DB") | |
if not client: | |
return | |
file_info = await db.get_file(db_id) | |
file_id_info = file_info.setdefault("file_ids", {}) | |
if not str(client.id) in file_id_info: | |
logging.debug("Storing file_id in DB") | |
log_msg = await send_file(FileStream, db_id, file_info['file_id'], message) | |
msg = await client.get_messages(Telegram.LOG_CHANNEL, log_msg.id) | |
media = get_media_from_message(msg) | |
file_id_info[str(client.id)] = getattr(media, "file_id", "") | |
await db.update_file_ids(db_id, file_id_info) | |
logging.debug("Stored file_id in DB") | |
logging.debug("Middle of get_file_ids") | |
file_id = FileId.decode(file_id_info[str(client.id)]) | |
setattr(file_id, "file_size", file_info['file_size']) | |
setattr(file_id, "mime_type", file_info['mime_type']) | |
setattr(file_id, "file_name", file_info['file_name']) | |
setattr(file_id, "unique_id", file_info['file_unique_id']) | |
logging.debug("Ending of get_file_ids") | |
return file_id | |
def get_media_from_message(message: "Message") -> Any: | |
media_types = ( | |
"audio", | |
"document", | |
"photo", | |
"sticker", | |
"animation", | |
"video", | |
"voice", | |
"video_note", | |
) | |
for attr in media_types: | |
media = getattr(message, attr, None) | |
if media: | |
return media | |
def get_media_file_size(m): | |
media = get_media_from_message(m) | |
return getattr(media, "file_size", "None") | |
def get_name(media_msg: Message | FileId) -> str: | |
if isinstance(media_msg, Message): | |
media = get_media_from_message(media_msg) | |
file_name = getattr(media, "file_name", "") | |
elif isinstance(media_msg, FileId): | |
file_name = getattr(media_msg, "file_name", "") | |
if not file_name: | |
if isinstance(media_msg, Message) and media_msg.media: | |
media_type = media_msg.media.value | |
elif media_msg.file_type: | |
media_type = media_msg.file_type.name.lower() | |
else: | |
media_type = "file" | |
formats = { | |
"photo": "jpg", "audio": "mp3", "voice": "ogg", | |
"video": "mp4", "animation": "mp4", "video_note": "mp4", | |
"sticker": "webp" | |
} | |
ext = formats.get(media_type) | |
ext = "." + ext if ext else "" | |
date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
file_name = f"{media_type}-{date}{ext}" | |
return file_name | |
def get_file_info(message): | |
media = get_media_from_message(message) | |
if message.chat.type == ChatType.PRIVATE: | |
user_idx = message.from_user.id | |
else: | |
user_idx = message.chat.id | |
return { | |
"user_id": user_idx, | |
"file_id": getattr(media, "file_id", ""), | |
"file_unique_id": getattr(media, "file_unique_id", ""), | |
"file_name": get_name(message), | |
"file_size": getattr(media, "file_size", 0), | |
"mime_type": getattr(media, "mime_type", "None/unknown") | |
} | |
async def update_file_id(msg_id, multi_clients): | |
file_ids = {} | |
for client_id, client in multi_clients.items(): | |
log_msg = await client.get_messages(Telegram.LOG_CHANNEL, msg_id) | |
media = get_media_from_message(log_msg) | |
file_ids[str(client.id)] = getattr(media, "file_id", "") | |
return file_ids | |
async def send_file(client: Client, db_id, file_id: str, message): | |
file_caption = message.caption | |
if file_caption is None: | |
file_caption = 'okay' | |
log_msg = await client.send_cached_media(chat_id=Telegram.LOG_CHANNEL, file_id=file_id, | |
caption=f'**{file_caption}**') | |
if message.chat.type == ChatType.PRIVATE: | |
await log_msg.reply_text( | |
text=f"reguested by : [{message.from_user.first_name}](tg://user?id={message.from_user.id})\nuser id : `{message.from_user.id}`\n file id : `{db_id}`\nStream Link : `https://telegbotnavpan2.onrender.com/watch/{db_id}`\nDownload Link : `https://telegbotnavpan2.onrender.com/dl/{db_id}`", | |
disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN, quote=True) | |
else: | |
await log_msg.reply_text( | |
text=f"file id : `{db_id}`\nStream Link : `https://telegbotnavpan2.onrender.com/watch/{db_id}`\nDownload Link : `https://telegbotnavpan2.onrender.com/dl/{db_id}`", | |
disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN, quote=True) | |
return log_msg | |
# return await client.send_cached_media(Telegram.BIN_CHANNEL, file_id) | |