|
from __future__ import annotations
|
|
import logging
|
|
from datetime import datetime
|
|
from pyrogram import Client
|
|
from typing import Any, Optional
|
|
|
|
from pyrogram.enums import ParseMode, ChatType
|
|
from pyrogram.types import Message
|
|
from pyrogram.file_id import FileId
|
|
from FileStream.bot import FileStream
|
|
from FileStream.utils.database import Database
|
|
from FileStream.config import Telegram, Server
|
|
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
|
|
|
|
|
|
async def get_file_ids(client: Client | bool, db_id: str, multi_clients, message) -> Optional[FileId]:
|
|
logging.debug("Starting of get_file_ids")
|
|
file_info = await db.get_file(db_id)
|
|
if (not "file_ids" in file_info) or not client:
|
|
logging.debug("Storing file_id of all clients in DB")
|
|
log_msg = await send_file(FileStream, db_id, file_info['file_id'], message)
|
|
await db.update_file_ids(db_id, await update_file_id(log_msg.id, multi_clients))
|
|
logging.debug("Stored file_id of all clients in DB")
|
|
if not client:
|
|
return
|
|
file_info = await db.get_file(db_id)
|
|
|
|
file_id_info = file_info.setdefault("file_ids", {})
|
|
if not str(client.id) in file_id_info:
|
|
logging.debug("Storing file_id in DB")
|
|
log_msg = await send_file(FileStream, db_id, file_info['file_id'], message)
|
|
msg = await client.get_messages(Telegram.LOG_CHANNEL, log_msg.id)
|
|
media = get_media_from_message(msg)
|
|
file_id_info[str(client.id)] = getattr(media, "file_id", "")
|
|
await db.update_file_ids(db_id, file_id_info)
|
|
logging.debug("Stored file_id in DB")
|
|
|
|
logging.debug("Middle of get_file_ids")
|
|
file_id = FileId.decode(file_id_info[str(client.id)])
|
|
setattr(file_id, "file_size", file_info['file_size'])
|
|
setattr(file_id, "mime_type", file_info['mime_type'])
|
|
setattr(file_id, "file_name", file_info['file_name'])
|
|
setattr(file_id, "unique_id", file_info['file_unique_id'])
|
|
logging.debug("Ending of get_file_ids")
|
|
return file_id
|
|
|
|
|
|
def get_media_from_message(message: "Message") -> Any:
|
|
media_types = (
|
|
"audio",
|
|
"document",
|
|
"photo",
|
|
"sticker",
|
|
"animation",
|
|
"video",
|
|
"voice",
|
|
"video_note",
|
|
)
|
|
for attr in media_types:
|
|
media = getattr(message, attr, None)
|
|
if media:
|
|
return media
|
|
|
|
|
|
def get_media_file_size(m):
|
|
media = get_media_from_message(m)
|
|
return getattr(media, "file_size", "None")
|
|
|
|
|
|
def get_name(media_msg: Message | FileId) -> str:
|
|
if isinstance(media_msg, Message):
|
|
media = get_media_from_message(media_msg)
|
|
file_name = getattr(media, "file_name", "")
|
|
|
|
elif isinstance(media_msg, FileId):
|
|
file_name = getattr(media_msg, "file_name", "")
|
|
|
|
if not file_name:
|
|
if isinstance(media_msg, Message) and media_msg.media:
|
|
media_type = media_msg.media.value
|
|
elif media_msg.file_type:
|
|
media_type = media_msg.file_type.name.lower()
|
|
else:
|
|
media_type = "file"
|
|
|
|
formats = {
|
|
"photo": "jpg", "audio": "mp3", "voice": "ogg",
|
|
"video": "mp4", "animation": "mp4", "video_note": "mp4",
|
|
"sticker": "webp"
|
|
}
|
|
|
|
ext = formats.get(media_type)
|
|
ext = "." + ext if ext else ""
|
|
|
|
date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
file_name = f"{media_type}-{date}{ext}"
|
|
|
|
return file_name
|
|
|
|
|
|
def get_file_info(message):
|
|
media = get_media_from_message(message)
|
|
if message.chat.type == ChatType.PRIVATE:
|
|
user_idx = message.from_user.id
|
|
else:
|
|
user_idx = message.chat.id
|
|
return {
|
|
"user_id": user_idx,
|
|
"file_id": getattr(media, "file_id", ""),
|
|
"file_unique_id": getattr(media, "file_unique_id", ""),
|
|
"file_name": get_name(message),
|
|
"file_size": getattr(media, "file_size", 0),
|
|
"mime_type": getattr(media, "mime_type", "None/unknown")
|
|
}
|
|
|
|
|
|
async def update_file_id(msg_id, multi_clients):
|
|
file_ids = {}
|
|
for client_id, client in multi_clients.items():
|
|
log_msg = await client.get_messages(Telegram.LOG_CHANNEL, msg_id)
|
|
media = get_media_from_message(log_msg)
|
|
file_ids[str(client.id)] = getattr(media, "file_id", "")
|
|
|
|
return file_ids
|
|
|
|
|
|
async def send_file(client: Client, db_id, file_id: str, message):
|
|
file_caption = message.caption
|
|
if file_caption is None:
|
|
file_caption = 'okay'
|
|
log_msg = await client.send_cached_media(chat_id=Telegram.LOG_CHANNEL, file_id=file_id,
|
|
caption=f'**{file_caption}**')
|
|
|
|
if message.chat.type == ChatType.PRIVATE:
|
|
await log_msg.reply_text(
|
|
text=f"reguested by : [{message.from_user.first_name}](tg://user?id={message.from_user.id})\nuser id : `{message.from_user.id}`\n file id : `{db_id}`\nStream Link : `https://telegbotnavpan2.onrender.com/watch/{db_id}`\nDownload Link : `https://telegbotnavpan2.onrender.com/dl/{db_id}`",
|
|
disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN, quote=True)
|
|
else:
|
|
await log_msg.reply_text(
|
|
text=f"file id : `{db_id}`\nStream Link : `https://telegbotnavpan2.onrender.com/watch/{db_id}`\nDownload Link : `https://telegbotnavpan2.onrender.com/dl/{db_id}`",
|
|
disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN, quote=True)
|
|
|
|
return log_msg
|
|
|
|
|
|
|