Spaces:
Running
Running
| from __future__ import annotations | |
| import logging | |
| from datetime import datetime | |
| from pyrogram import Client | |
| from typing import Any, Optional | |
| from pyrogram.enums import ParseMode, ChatType | |
| from pyrogram.types import Message | |
| from pyrogram.file_id import FileId | |
| #----------------------------------------------------- | |
| from FileStream.bot import FileStream | |
| from FileStream.bot import MULTI_CLIENTS | |
| from FileStream.Database import Database | |
| from FileStream.config import Telegram, Server | |
| from FileStream.Tools import Time_ISTKolNow | |
| from FileStream.Tools.cleanup import clean_text | |
| db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
| async def send_file(client: Client, db_id, file_id: str, message, send_to): | |
| file_caption = getattr(message, 'caption', None) or get_name(message) | |
| log_msg = await client.send_cached_media(chat_id=send_to, file_id=file_id, caption=f"**{file_caption}**") | |
| if message.chat.type == ChatType.PRIVATE: | |
| await log_msg.reply_text( | |
| text= | |
| f"**RᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**Uꜱᴇʀ ɪᴅ :** `{message.from_user.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`", | |
| disable_web_page_preview=True, | |
| parse_mode=ParseMode.MARKDOWN, | |
| quote=True) | |
| else: | |
| await log_msg.reply_text( | |
| text= | |
| f"**RᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** {message.chat.title} \n**Cʜᴀɴɴᴇʟ ɪᴅ :** `{message.chat.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`", | |
| disable_web_page_preview=True, | |
| parse_mode=ParseMode.MARKDOWN, | |
| quote=True) | |
| return {"message": log_msg, "sent_to": send_to} | |
| async def update_file_id(message, MULTI_CLIENTS): | |
| file_ids = {} | |
| for client_id, client in MULTI_CLIENTS.items(): | |
| #log_msg = await client.get_messages(message['location'],message['message_id']) | |
| media = get_media_from_message(await client.get_messages(message['location'],message['message_id'])) | |
| file_ids[str(client.id)] = getattr(media, "file_id", "") | |
| return file_ids | |
| #------------------------------------Update Private File IDs------------------------------------# | |
| async def get_private_file_ids(client: Client | bool, db_id: str,message) -> Optional[FileId]: | |
| file_info = await db.get_private_file(db_id) | |
| file_id_info = file_info.setdefault("file_ids", {}) | |
| if not str(client.id) in file_id_info: | |
| logging.debug("Storing file_id in DB") | |
| #msg = await client.get_messages(Telegram.PFLOG_CHANNEL,file_info['message_id']) | |
| media = get_media_from_message(await client.get_messages( | |
| Telegram.PFLOG_CHANNEL, file_info['message_id'])) | |
| file_id_info[str(client.id)] = getattr(media, "file_id", "") | |
| await db.update_private_file_ids(db_id, file_id_info) | |
| logging.debug("Stored file_id in DB") | |
| file_info = await db.get_private_file(db_id) | |
| return file_info[str(client.id)] | |
| if not client: | |
| return file_info['file']['file_id'] | |
| #------------------------Update Public File ID s --------------------------- # | |
| async def get_file_ids(client: Client | bool, db_id: str, message) -> Optional[FileId]: | |
| logging.debug("Starting of get_file_ids") | |
| file_info = await db.get_file(db_id) | |
| if (not "file_ids" in file_info) or not client: | |
| if file_info['user_type'] == "TELEGRAM": | |
| if file_info['location'] in Telegram.DATA_SOURCES: | |
| print("Already Present in Data Sources ", Telegram.DATA_SOURCES) | |
| else: | |
| log_msg = await send_file(FileStream, db_id,file_info['file']['file_id'], message,Telegram.FLOG_CHANNEL) | |
| #updated_info = update_file_info(log_msg) | |
| await db.update_file_info(db_id, update_file_info(log_msg)) | |
| await db.update_file_ids(db_id, await update_file_id(await db.get_file(db_id),MULTI_CLIENTS)) | |
| logging.debug("Stored file_id of all clients in DB") | |
| if not client: | |
| return | |
| file_info = await db.get_file(db_id) | |
| if file_info['user_type'] == "WEB": | |
| await db.update_file_ids(db_id, await update_file_id(await db.get_file(db_id), MULTI_CLIENTS)) | |
| logging.debug("Stored file_id of all clients in DB") | |
| if not client: | |
| return | |
| file_info = await db.get_file(db_id) | |
| file_id_info = file_info.setdefault("file_ids", {}) | |
| if not str(client.id) in file_id_info: | |
| logging.debug("Storing file_id in DB") | |
| #log_msg = await send_file(FileStream, db_id, file_info['file_id'], message,Telegram.FLOG_CHANNEL) | |
| msg = await client.get_messages(file_info['location'],file_info['message_id']) | |
| media = get_media_from_message(msg) | |
| file_id_info[str(client.id)] = getattr(media, "file_id", "") | |
| await db.update_file_ids(db_id, file_id_info) | |
| logging.debug("Stored file_id in DB") | |
| logging.debug("Middle of get_file_ids") | |
| file_id = FileId.decode(file_id_info[str(client.id)]) | |
| setattr(file_id, "file_size", file_info['file']['file_size']) | |
| setattr(file_id, "mime_type", file_info['file']['mime_type']) | |
| setattr(file_id, "file_name", file_info['file']['file_name']) | |
| setattr(file_id, "unique_id", file_info['file']['file_unique_id']) | |
| logging.debug("Ending of get_file_ids") | |
| return file_id | |
| #------------------------------ | |
| def get_media_from_message(message: "Message") -> Any: | |
| media_types = ( | |
| "audio", | |
| "document", | |
| "photo", | |
| "sticker", | |
| "animation", | |
| "video", | |
| "voice", | |
| "video_note", | |
| ) | |
| for attr in media_types: | |
| media = getattr(message, attr, None) | |
| if media: | |
| return media | |
| def get_media_file_size(m): | |
| media = get_media_from_message(m) | |
| return getattr(media, "file_size", "None") | |
| def get_name(media_msg: Message | FileId) -> str: | |
| if isinstance(media_msg, Message): | |
| media = get_media_from_message(media_msg) | |
| file_name = getattr(media, "file_name", "") | |
| elif isinstance(media_msg, FileId): | |
| file_name = getattr(media_msg, "file_name", "") | |
| if not file_name: | |
| if isinstance(media_msg, Message) and media_msg.media: | |
| media_type = media_msg.media.value | |
| elif media_msg.file_type: | |
| media_type = media_msg.file_type.name.lower() | |
| else: | |
| media_type = "file" | |
| formats = { | |
| "photo": "jpg", | |
| "audio": "mp3", | |
| "voice": "ogg", | |
| "video": "mp4", | |
| "animation": "mp4", | |
| "video_note": "mp4", | |
| "sticker": "webp" | |
| } | |
| ext = formats.get(media_type) | |
| ext = "." + ext if ext else "" | |
| date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| file_name = f"{media_type}-{date}{ext}" | |
| return file_name | |
| def get_file_info(message, instruction): | |
| media = get_media_from_message(message) | |
| """ | |
| //Used a single Liner Insted of this Block | |
| if message.chat.type == ChatType.PRIVATE: | |
| user_idx = message.from_user.id | |
| else: | |
| user_idx = message.chat.id | |
| """ | |
| return { | |
| "user_id": instruction['user_id'], | |
| "user_type": instruction['user_type'], | |
| "message_id": message.id, | |
| "location": message.from_user.id if (message.chat.type == ChatType.PRIVATE) else message.chat.id, | |
| "file": { | |
| "file_id": getattr(media, "file_id", ""), | |
| "caption": clean_text(getattr(message, "caption", f"{get_name(message)}")), | |
| "file_unique_id": getattr(media, "file_unique_id", ""), | |
| "file_name": get_name(message), | |
| "file_size": getattr(media, "file_size", 0), | |
| "mime_type": getattr(media, "mime_type", "None/unknown"), | |
| "tagged_users": { | |
| str(message.from_user.id) if (message.chat.type == ChatType.PRIVATE) else str(Telegram.OWNER_ID): instruction['privacy_type'] | |
| } | |
| }, | |
| "time":Time_ISTKolNow(), | |
| "privacy_type":instruction['privacy_type'] | |
| } | |
| def update_file_info(updates): | |
| media = get_media_from_message(updates['message']) | |
| return { | |
| "message_id": updates['message'].id, | |
| "location": updates['sent_to'], | |
| "file": { | |
| "file_id":getattr(media, "file_id", ""), | |
| "caption":clean_text(getattr(updates['message'], "caption",f"{get_name(updates['message'])}")), | |
| "file_unique_id":getattr(media, "file_unique_id", ""), | |
| "file_name":get_name(updates['message']), | |
| "file_size":getattr(media, "file_size", 0), | |
| "mime_type":getattr(media, "mime_type", "None/unknown"), | |
| "tagged_users": {} | |
| }, | |
| } | |