import os import requests import json import urllib.request import time from threading import Thread from requests.exceptions import RequestException from tqdm import tqdm from indexer import indexer import logging from threading import Event CACHE_DIR = os.getenv("CACHE_DIR") download_progress = {} class Instance: def __init__(self, id, url, cache_dir, index_file, token, repo, load_balancer_api, max_retries=20, initial_delay=1): self.version = "0.2.4 V Alpha" self.id = id self.url = url self.CACHE_DIR = cache_dir self.INDEX_FILE = index_file self.TOKEN = token self.REPO = repo self.FILM_STORE_JSON_PATH = os.path.join(cache_dir, "film_store.json") self.TV_STORE_JSON_PATH = os.path.join(cache_dir, "tv_store.json") self.download_threads = {} self.file_structure = None self.load_balancer_api = load_balancer_api self.max_retries = max_retries self.initial_delay = initial_delay self.last_report_time = time.time() # Initialize the last report time self.re_register_event = Event() # Ensure CACHE_DIR exists if not os.path.exists(self.CACHE_DIR): os.makedirs(self.CACHE_DIR) for path in [self.FILM_STORE_JSON_PATH, self.TV_STORE_JSON_PATH]: if not os.path.exists(path): with open(path, 'w') as json_file: json.dump({}, json_file) # Index the file structure indexer() # Load the file structure JSON if not os.path.exists(self.INDEX_FILE): raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.") with open(self.INDEX_FILE, 'r') as f: self.file_structure = json.load(f) # Start prefetching metadata and monitoring registration self.register_to_load_balancer() registration_thread = Thread(target=self.monitor_registration) registration_thread.daemon = True registration_thread.start() def compile_report(self): self.last_report_time = time.time() # Update the last report time film_store_path = os.path.join(self.CACHE_DIR, "film_store.json") tv_store_path = os.path.join(self.CACHE_DIR, "tv_store.json") cache_size = self.get_cache_size() report = { "instance_id": self.id, "instance_url": self.url, "film_store": self.read_json(film_store_path), "tv_store": self.read_json(tv_store_path), "cache_size": cache_size } return report def register_to_load_balancer(self): result = self.load_balancer_api.register_instance(self.id, self.url) if result is not None: logging.info(f'Registered instance {self.id} to load balancer.') else: logging.error(f'Failed to register instance {self.id} to load balancer.') def monitor_registration(self): while True: if time.time() - self.last_report_time > 60: # Check if 1 minute has passed logging.info('1 minute passed since last report. Re-registering...') self.register_to_load_balancer() self.last_report_time = time.time() # Reset the last report time time.sleep(30) # Check every 30 seconds def get_cache_size(self): total_size = 0 for dirpath, dirnames, filenames in os.walk(CACHE_DIR): for f in filenames: fp = os.path.join(dirpath, f) total_size += os.path.getsize(fp) return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"} @staticmethod def read_json(file_path): if os.path.exists(file_path): with open(file_path, 'r') as json_file: return json.load(json_file) return {} @staticmethod def get_system_proxies(): """ Retrieves the system's HTTP and HTTPS proxies. Returns: dict: A dictionary containing the proxies. """ try: proxies = urllib.request.getproxies() print("System proxies:", proxies) return { "http": proxies.get("http"), "https": proxies.get("http") } except Exception as e: print(f"Error getting system proxies: {e}") return {} @staticmethod def download_film(file_url, token, cache_path, proxies, film_id, title, chunk_size=100 * 1024 * 1024): """ Downloads a file from the specified URL and saves it to the cache path. Tracks the download progress. Args: file_url (str): The URL of the file to download. token (str): The authorization token for the request. cache_path (str): The path to save the downloaded file. proxies (dict): Proxies for the request. film_id (str): Unique identifier for the film download. title (str): The title of the film. chunk_size (int): Size of each chunk to download. """ print(f"Downloading file from URL: {file_url} to {cache_path} with proxies: {proxies}") headers = {'Authorization': f'Bearer {token}'} try: response = requests.get(file_url, headers=headers, proxies=proxies, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) download_progress[film_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()} os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar: for data in response.iter_content(chunk_size=chunk_size): file.write(data) pbar.update(len(data)) download_progress[film_id]["downloaded"] += len(data) print(f'File cached to {cache_path} successfully.') Instance.update_film_store_json(title, cache_path) download_progress[film_id]["status"] = "Completed" except RequestException as e: print(f"Error downloading file: {e}") download_progress[film_id]["status"] = "Failed" except IOError as e: print(f"Error writing file {cache_path}: {e}") download_progress[film_id]["status"] = "Failed" finally: if download_progress[film_id]["status"] != "Downloading": download_progress[film_id]["end_time"] = time.time() @staticmethod def get_download_progress(id): """ Gets the download progress for a specific film. Args: film_id (str): The unique identifier for the film download. Returns: dict: A dictionary containing the total size, downloaded size, progress percentage, status, and ETA. """ if id in download_progress: total = download_progress[id]["total"] downloaded = download_progress[id]["downloaded"] status = download_progress[id].get("status", "In Progress") progress = (downloaded / total) * 100 if total > 0 else 0 eta = None if status == "Downloading" and downloaded > 0: elapsed_time = time.time() - download_progress[id]["start_time"] estimated_total_time = elapsed_time * (total / downloaded) eta = estimated_total_time - elapsed_time elif status == "Completed": eta = 0 return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta} return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None} def update_film_store_json(self,title, cache_path): """ Updates the film store JSON with the new file. Args: title (str): The title of the film. cache_path (str): The local path where the file is saved. """ film_store_data = {} if os.path.exists(self.FILM_STORE_JSON_PATH): with open(self.FILM_STORE_JSON_PATH, 'r') as json_file: film_store_data = json.load(json_file) film_store_data[title] = cache_path with open(self.FILM_STORE_JSON_PATH, 'w') as json_file: json.dump(film_store_data, json_file, indent=2) print(f'Film store updated with {title}.') @staticmethod def download_episode(file_url, token, cache_path, proxies, episode_id, title, chunk_size=100 * 1024 * 1024): """ Downloads a file from the specified URL and saves it to the cache path. Tracks the download progress. Args: file_url (str): The URL of the file to download. token (str): The authorization token for the request. cache_path (str): The path to save the downloaded file. proxies (dict): Proxies for the request. episode_id (str): Unique identifier for the film download. title (str): The title of the film. chunk_size (int): Size of each chunk to download. """ print(f"Downloading file from URL: {file_url} to {cache_path} with proxies: {proxies}") headers = {'Authorization': f'Bearer {token}'} try: response = requests.get(file_url, headers=headers, proxies=proxies, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) download_progress[episode_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()} os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar: for data in response.iter_content(chunk_size=chunk_size): file.write(data) pbar.update(len(data)) download_progress[episode_id]["downloaded"] += len(data) print(f'File cached to {cache_path} successfully.') Instance.update_tv_store_json(title, cache_path) download_progress[episode_id]["status"] = "Completed" except RequestException as e: print(f"Error downloading file: {e}") download_progress[episode_id]["status"] = "Failed" except IOError as e: print(f"Error writing file {cache_path}: {e}") download_progress[episode_id]["status"] = "Failed" finally: if download_progress[episode_id]["status"] != "Downloading": download_progress[episode_id]["end_time"] = time.time() def update_tv_store_json(self, title, cache_path): """ Updates the TV store JSON with the new file, organizing by title, season, and episode. Args: title (str): The title of the TV show. cache_path (str): The local path where the file is saved. """ tv_store_data = {} if os.path.exists(self.TV_STORE_JSON_PATH): with open(self.TV_STORE_JSON_PATH, 'r') as json_file: tv_store_data = json.load(json_file) # Extract season and episode information from the cache_path season_part = os.path.basename(os.path.dirname(cache_path)) # Extracts 'Season 1' episode_part = os.path.basename(cache_path) # Extracts 'Grand Blue Dreaming - S01E01 - Deep Blue HDTV-720p.mp4' # Create the structure if not already present if title not in tv_store_data: tv_store_data[title] = {} if season_part not in tv_store_data[title]: tv_store_data[title][season_part] = {} # Assuming episode_part is unique for each episode within a season tv_store_data[title][season_part][episode_part] = cache_path with open(self.TV_STORE_JSON_PATH, 'w') as json_file: json.dump(tv_store_data, json_file, indent=2) print(f'TV store updated with {title}, {season_part}, {episode_part}.') def load_json(self, file_path): """Load JSON data from a file.""" with open(file_path, 'r') as file: return json.load(file) def find_movie_path(self, title): """Find the path of the movie in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': for item in sub_directory['contents']: if item['type'] == 'file' and title.lower() in item['path'].lower(): return item['path'] return None def find_tv_path(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory['path'] return None def get_tv_structure(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): return sub_directory return None def get_film_id(self, title): """Generate a film ID based on the title.""" return title.replace(" ", "_").lower() def bytes_to_human_readable(self, num, suffix="B"): for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: if abs(num) < 1024.0: return f"{num:3.1f} {unit}{suffix}" num /= 1024.0 return f"{num:.1f} Y{suffix}" def encode_episodeid(self, title, season, episode): return f"{title}_{season}_{episode}" def get_all_tv_shows(self): """Get all TV shows from the indexed cache structure JSON file.""" tv_shows = {} for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': show_title = sub_directory['path'].split('/')[-1] tv_shows[show_title] = [] for season_directory in sub_directory['contents']: if season_directory['type'] == 'directory': season = season_directory['path'].split('/')[-1] for episode in season_directory['contents']: if episode['type'] == 'file': tv_shows[show_title].append({ "season": season, "episode": episode['path'].split('/')[-1], "path": episode['path'] }) return tv_shows def get_all_films(self): """Get all films from the indexed cache structure JSON file.""" films = [] for directory in self.file_structure: if directory['type'] == 'directory' and directory['path'] == 'films': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory': films.append(sub_directory['path']) return films def register_to_load_balancer(self): retries = 0 delay = self.initial_delay max_delay = 120 while True: try: result = self.load_balancer_api.register_instance(self.id, self.url) if result: logging.info(f'Successfully registered instance {self.id} to load balancer.') return result except Exception as e: logging.error(f'Error during registration: {e}') retries += 1 logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...') time.sleep(delay) delay = min(delay * 2, max_delay) # Exponential backoff with maximum delay