| | from pathlib import Path |
| | import sys |
| | import config, dill |
| | from pyrogram.types import InputMediaDocument, Message |
| | import os, random, string, asyncio |
| | from utils.logger import Logger |
| | from datetime import datetime, timezone |
| | import os |
| | import signal |
| |
|
| | logger = Logger(__name__) |
| |
|
| | cache_dir = Path("./cache") |
| | cache_dir.mkdir(parents=True, exist_ok=True) |
| | drive_cache_path = cache_dir / "drive.data" |
| |
|
| |
|
| | def getRandomID(): |
| | global DRIVE_DATA |
| | while True: |
| | id = "".join(random.choices(string.ascii_uppercase + string.digits, k=6)) |
| | if not DRIVE_DATA: |
| | return id |
| | if id not in DRIVE_DATA.used_ids: |
| | DRIVE_DATA.used_ids.append(id) |
| | return id |
| |
|
| |
|
| | def get_current_utc_time(): |
| | return datetime.now(timezone.utc).strftime("Date - %Y-%m-%d | Time - %H:%M:%S") |
| |
|
| |
|
| | class Folder: |
| | def __init__(self, name: str, path: str) -> None: |
| | self.name = name |
| | self.contents = {} |
| | if name == "/": |
| | self.id = "root" |
| | else: |
| | self.id = getRandomID() |
| | self.type = "folder" |
| | self.trash = False |
| | self.path = ("/" + path.strip("/") + "/").replace("//", "/") |
| | self.upload_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| | self.auth_hashes = [] |
| |
|
| |
|
| | class File: |
| | def __init__( |
| | self, |
| | name: str, |
| | file_id: int, |
| | size: int, |
| | path: str, |
| | ) -> None: |
| | self.name = name |
| | self.file_id = file_id |
| | self.id = getRandomID() |
| | self.size = size |
| | self.type = "file" |
| | self.trash = False |
| | self.path = path[:-1] if path[-1] == "/" else path |
| | self.upload_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| |
|
| |
|
| | class NewDriveData: |
| | def __init__(self, contents: dict, used_ids: list) -> None: |
| | self.contents = contents |
| | self.used_ids = used_ids |
| | self.isUpdated = False |
| |
|
| | def save(self) -> None: |
| | with open(drive_cache_path, "wb") as f: |
| | dill.dump(self, f) |
| | self.isUpdated = True |
| | logger.info("Drive data saved successfully.") |
| |
|
| | def new_folder(self, path: str, name: str) -> None: |
| | logger.info(f"Creating new folder '{name}' in path '{path}'.") |
| |
|
| | folder = Folder(name, path) |
| | if path == "/": |
| | directory_folder: Folder = self.contents[path] |
| | directory_folder.contents[folder.id] = folder |
| | else: |
| | paths = path.strip("/").split("/") |
| | directory_folder: Folder = self.contents["/"] |
| | for path in paths: |
| | directory_folder = directory_folder.contents[path] |
| | directory_folder.contents[folder.id] = folder |
| |
|
| | self.save() |
| | return folder.path + folder.id |
| |
|
| | def new_file(self, path: str, name: str, file_id: int, size: int) -> None: |
| | logger.info(f"Creating new file '{name}' in path '{path}'.") |
| |
|
| | file = File(name, file_id, size, path) |
| | if path == "/": |
| | directory_folder: Folder = self.contents[path] |
| | directory_folder.contents[file.id] = file |
| | else: |
| | paths = path.strip("/").split("/") |
| | directory_folder: Folder = self.contents["/"] |
| | for path in paths: |
| | directory_folder = directory_folder.contents[path] |
| | directory_folder.contents[file.id] = file |
| |
|
| | self.save() |
| |
|
| | def get_directory( |
| | self, path: str, is_admin: bool = True, auth: str = None |
| | ) -> Folder: |
| | folder_data: Folder = self.contents["/"] |
| | auth_success = False |
| | auth_home_path = None |
| |
|
| | if path != "/": |
| | path = path.strip("/") |
| |
|
| | if "/" in path: |
| | path = path.split("/") |
| | else: |
| | path = [path] |
| |
|
| | for folder in path: |
| | folder_data = folder_data.contents[folder] |
| |
|
| | if auth in folder_data.auth_hashes: |
| | auth_success = True |
| | auth_home_path = ( |
| | "/" + folder_data.path.strip("/") + "/" + folder_data.id |
| | ) |
| |
|
| | if not is_admin and not auth_success: |
| | logger.warning(f"Unauthorized access attempt to path '{path}'.") |
| | return None |
| |
|
| | if auth_success: |
| | logger.info(f"Authorization successful for path '{path}'.") |
| | return folder_data, auth_home_path |
| |
|
| | return folder_data |
| |
|
| | def get_folder_auth(self, path: str) -> None: |
| | auth = getRandomID() |
| | folder_data: Folder = self.contents["/"] |
| |
|
| | if path != "/": |
| | path = path.strip("/") |
| |
|
| | if "/" in path: |
| | path = path.split("/") |
| | else: |
| | path = [path] |
| |
|
| | for folder in path: |
| | folder_data = folder_data.contents[folder] |
| |
|
| | folder_data.auth_hashes.append(auth) |
| | self.save() |
| | logger.info(f"Authorization hash generated for path '{path}'.") |
| | return auth |
| |
|
| | def get_file(self, path) -> File: |
| | if len(path.strip("/").split("/")) > 0: |
| | folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) |
| | file_id = path.strip("/").split("/")[-1] |
| | else: |
| | folder_path = "/" |
| | file_id = path.strip("/") |
| |
|
| | folder_data = self.get_directory(folder_path) |
| | return folder_data.contents[file_id] |
| |
|
| | def rename_file_folder(self, path: str, new_name: str) -> None: |
| | if len(path.strip("/").split("/")) > 0: |
| | folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) |
| | file_id = path.strip("/").split("/")[-1] |
| | else: |
| | folder_path = "/" |
| | file_id = path.strip("/") |
| | folder_data = self.get_directory(folder_path) |
| | folder_data.contents[file_id].name = new_name |
| | self.save() |
| | logger.info(f"Item at path '{path}' renamed to '{new_name}'.") |
| |
|
| | def trash_file_folder(self, path: str, trash: bool) -> None: |
| | action = "Trashing" if trash else "Restoring" |
| |
|
| | if len(path.strip("/").split("/")) > 0: |
| | folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) |
| | file_id = path.strip("/").split("/")[-1] |
| | else: |
| | folder_path = "/" |
| | file_id = path.strip("/") |
| | folder_data = self.get_directory(folder_path) |
| | folder_data.contents[file_id].trash = trash |
| | self.save() |
| | logger.info(f"Item at path '{path}' {action.lower()} successfully.") |
| |
|
| | def get_trashed_files_folders(self): |
| | root_dir = self.get_directory("/") |
| | trash_data = {} |
| |
|
| | def traverse_directory(folder): |
| | for item in folder.contents.values(): |
| | if item.type == "folder": |
| | if item.trash: |
| | trash_data[item.id] = item |
| | else: |
| | |
| | traverse_directory(item) |
| | elif item.type == "file": |
| | if item.trash: |
| | trash_data[item.id] = item |
| |
|
| | traverse_directory(root_dir) |
| | return trash_data |
| |
|
| | def delete_file_folder(self, path: str) -> None: |
| |
|
| | if len(path.strip("/").split("/")) > 0: |
| | folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) |
| | file_id = path.strip("/").split("/")[-1] |
| | else: |
| | folder_path = "/" |
| | file_id = path.strip("/") |
| |
|
| | folder_data = self.get_directory(folder_path) |
| | del folder_data.contents[file_id] |
| | self.save() |
| | logger.info(f"Item at path '{path}' deleted successfully.") |
| |
|
| | def search_file_folder(self, query: str): |
| | logger.info(f"Searching for items matching query '{query}'.") |
| |
|
| | root_dir = self.get_directory("/") |
| | search_results = {} |
| |
|
| | def traverse_directory(folder): |
| | for item in folder.contents.values(): |
| | if query.lower() in item.name.lower(): |
| | search_results[item.id] = item |
| | if item.type == "folder": |
| | traverse_directory(item) |
| |
|
| | traverse_directory(root_dir) |
| | logger.info(f"Search completed. Found {len(search_results)} matching items.") |
| | return search_results |
| |
|
| |
|
| | class NewBotMode: |
| | def __init__(self, drive_data: NewDriveData) -> None: |
| | self.drive_data = drive_data |
| |
|
| | |
| | self.current_folder = "/" |
| | self.current_folder_name = "/ (root directory)" |
| |
|
| | def set_folder(self, folder_path: str, name: str) -> None: |
| | self.current_folder = folder_path |
| | self.current_folder_name = name |
| | self.drive_data.save() |
| | logger.info(f"Current folder set to '{name}' at path '{folder_path}'.") |
| |
|
| |
|
| | DRIVE_DATA: NewDriveData = None |
| | BOT_MODE: NewBotMode = None |
| |
|
| |
|
| | |
| | async def backup_drive_data(loop=True): |
| | global DRIVE_DATA |
| | logger.info("Starting backup drive data task.") |
| |
|
| | while True: |
| | try: |
| | if not DRIVE_DATA.isUpdated: |
| | if not loop: |
| | break |
| | await asyncio.sleep(config.DATABASE_BACKUP_TIME) |
| | continue |
| |
|
| | logger.info("Backing up drive data to Telegram.") |
| | from utils.clients import get_client |
| |
|
| | client = get_client() |
| | time_text = f"๐
**Last Updated :** {get_current_utc_time()} (UTC +00:00)" |
| | caption = ( |
| | f"๐ **TG Drive Data Backup File**\n\n" |
| | "Do not edit or delete this message. This is a backup file for the tg drive data.\n\n" |
| | f"{time_text}" |
| | ) |
| |
|
| | media_doc = InputMediaDocument(drive_cache_path, caption=caption) |
| | msg = await client.edit_message_media( |
| | config.STORAGE_CHANNEL, |
| | config.DATABASE_BACKUP_MSG_ID, |
| | media=media_doc, |
| | file_name="drive.data", |
| | ) |
| |
|
| | DRIVE_DATA.isUpdated = False |
| | logger.info("Drive data backed up to Telegram successfully.") |
| |
|
| | try: |
| | await msg.pin() |
| | except Exception as pin_e: |
| | logger.error(f"Error pinning backup message: {pin_e}") |
| |
|
| | if not loop: |
| | break |
| |
|
| | await asyncio.sleep(config.DATABASE_BACKUP_TIME) |
| | except Exception as e: |
| | logger.error(f"Backup Error: {e}") |
| | await asyncio.sleep(10) |
| |
|
| |
|
| | async def init_drive_data(): |
| | global DRIVE_DATA |
| |
|
| | logger.info("Initializing drive data.") |
| | root_dir = DRIVE_DATA.get_directory("/") |
| | if not hasattr(root_dir, "auth_hashes"): |
| | root_dir.auth_hashes = [] |
| |
|
| | def traverse_directory(folder): |
| | for item in folder.contents.values(): |
| | if item.type == "folder": |
| | traverse_directory(item) |
| |
|
| | if not hasattr(item, "auth_hashes"): |
| | item.auth_hashes = [] |
| |
|
| | traverse_directory(root_dir) |
| | DRIVE_DATA.save() |
| | logger.info("Drive data initialization completed.") |
| |
|
| |
|
| | async def loadDriveData(): |
| | global DRIVE_DATA, BOT_MODE |
| |
|
| | logger.info("Loading drive data.") |
| | from utils.clients import get_client |
| |
|
| | try: |
| | client = get_client() |
| | try: |
| | msg: Message = await client.get_messages( |
| | config.STORAGE_CHANNEL, config.DATABASE_BACKUP_MSG_ID |
| | ) |
| | except Exception as e: |
| | logger.error(f"Error fetching backup message: {e}") |
| | raise |
| |
|
| | if not msg.document: |
| | logger.error("Backup message does not contain a document") |
| | raise Exception("Backup message does not contain a document") |
| |
|
| | if msg.document.file_name == "drive.data": |
| | dl_path = await msg.download() |
| | with open(dl_path, "rb") as f: |
| | DRIVE_DATA = dill.load(f) |
| |
|
| | logger.info("Drive data loaded from Telegram backup.") |
| | else: |
| | raise Exception("Backup drive.data file not found on Telegram.") |
| | except Exception as e: |
| | logger.warning(f"Backup load failed: {e}") |
| | logger.info("Creating new drive.data file.") |
| | DRIVE_DATA = NewDriveData({"/": Folder("/", "/")}, []) |
| | DRIVE_DATA.save() |
| |
|
| | await init_drive_data() |
| |
|
| | if config.MAIN_BOT_TOKEN: |
| | from utils.bot_mode import start_bot_mode |
| |
|
| | BOT_MODE = NewBotMode(DRIVE_DATA) |
| | await start_bot_mode(DRIVE_DATA, BOT_MODE) |
| | logger.info("Bot mode started.") |
| |
|
| |
|
| | async def initDriveDataWithoutClients(): |
| | """Initialize DRIVE_DATA without requiring Telegram clients (offline mode)""" |
| | global DRIVE_DATA, BOT_MODE |
| | |
| | logger.info("Initializing drive data in offline mode (without Telegram clients).") |
| | |
| | |
| | if drive_cache_path.exists(): |
| | try: |
| | with open(drive_cache_path, "rb") as f: |
| | DRIVE_DATA = dill.load(f) |
| | logger.info("Drive data loaded from local cache.") |
| | except Exception as e: |
| | logger.warning(f"Failed to load from local cache: {e}") |
| | logger.info("Creating new drive.data file.") |
| | DRIVE_DATA = NewDriveData({"/": Folder("/", "/")}, []) |
| | DRIVE_DATA.save() |
| | else: |
| | logger.info("No local cache found. Creating new drive.data file.") |
| | DRIVE_DATA = NewDriveData({"/": Folder("/", "/")}, []) |
| | DRIVE_DATA.save() |
| | |
| | await init_drive_data() |
| | logger.info("Drive data initialized in offline mode.") |
| |
|