|
import os |
|
import json |
|
from indexer import indexer |
|
import re |
|
from tvdb import fetch_and_cache_json |
|
from threading import Event, Thread |
|
import time |
|
import logging |
|
from utils import convert_to_gb |
|
from api import InstancesAPI |
|
|
|
CACHE_DIR = os.getenv("CACHE_DIR") |
|
|
|
download_progress = {} |
|
|
|
class LoadBalancer: |
|
def __init__(self, cache_dir, index_file, token, repo, polling_interval=4, max_retries=3, initial_delay=1): |
|
self.version = "0.0.2.10 V Beta" |
|
self.instances = [] |
|
self.instances_health = {} |
|
self.polling_interval = polling_interval |
|
self.max_retries = max_retries |
|
self.initial_delay = initial_delay |
|
self.stop_event = Event() |
|
self.instances_api = InstancesAPI(self.instances) |
|
self.CACHE_DIR = cache_dir |
|
self.INDEX_FILE = index_file |
|
self.TOKEN = token |
|
self.REPO = repo |
|
self.FILM_STORE = {} |
|
self.TV_STORE = {} |
|
self.file_structure = None |
|
self.index_file_last_modified = None |
|
|
|
|
|
if not os.path.exists(self.CACHE_DIR): |
|
os.makedirs(self.CACHE_DIR) |
|
|
|
|
|
indexer() |
|
|
|
|
|
self.load_file_structure() |
|
|
|
|
|
polling_thread = Thread(target=self.start_polling) |
|
polling_thread.daemon = True |
|
polling_thread.start() |
|
|
|
file_checking_thread = Thread(target=self.check_file_updates) |
|
file_checking_thread.daemon = True |
|
file_checking_thread.start() |
|
|
|
def load_file_structure(self): |
|
if not os.path.exists(self.INDEX_FILE): |
|
raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.") |
|
|
|
with open(self.INDEX_FILE, 'r') as f: |
|
self.file_structure = json.load(f) |
|
logging.info("File structure loaded successfully.") |
|
|
|
def check_file_updates(self): |
|
while not self.stop_event.is_set(): |
|
if self.index_file_last_modified != os.path.getmtime(self.INDEX_FILE): |
|
logging.info(f"{self.INDEX_FILE} has been updated. Re-indexing...") |
|
indexer() |
|
self.load_file_structure() |
|
self.index_file_last_modified = os.path.getmtime(self.INDEX_FILE) |
|
|
|
|
|
if hasattr(self, 'prefetch_thread') and self.prefetch_thread.is_alive(): |
|
self.prefetch_thread.join() |
|
|
|
self.prefetch_thread = Thread(target=self.start_prefetching) |
|
self.prefetch_thread.daemon = True |
|
self.prefetch_thread.start() |
|
|
|
time.sleep(120) |
|
|
|
def register_instance(self, instance_url): |
|
if instance_url not in self.instances: |
|
self.instances.append(instance_url) |
|
logging.info(f"Registered instance {instance_url}") |
|
else: |
|
logging.info(f"Instance {instance_url} is already registered.") |
|
|
|
def remove_instance(self, instance_url): |
|
if instance_url in self.instances: |
|
self.instances.remove(instance_url) |
|
self.instances_health.pop(instance_url, None) |
|
logging.info(f"Removed instance {instance_url}") |
|
else: |
|
logging.info(f"Instance {instance_url} not found for removal.") |
|
|
|
def get_reports(self): |
|
reports = self.instances_api.fetch_reports() |
|
|
|
|
|
temp_film_store = {} |
|
temp_tv_store = {} |
|
|
|
for instance_url in self.instances[:]: |
|
if instance_url in reports: |
|
report = reports[instance_url] |
|
logging.info(f"Report from {instance_url}: {report}") |
|
self.process_report(instance_url, report, temp_film_store, temp_tv_store) |
|
else: |
|
logging.error(f"Failed to get report from {instance_url}. Removing instance.") |
|
self.remove_instance(instance_url) |
|
|
|
self.FILM_STORE = temp_film_store |
|
self.TV_STORE = temp_tv_store |
|
|
|
def process_report(self, instance_url, report, temp_film_store, temp_tv_store): |
|
film_store = report.get('film_store', {}) |
|
tv_store = report.get('tv_store', {}) |
|
cache_size = report.get('cache_size') |
|
|
|
logging.info(f"Processing report from {instance_url}") |
|
|
|
|
|
for title, path in film_store.items(): |
|
url = f"{instance_url}/api/film/{title.replace(' ', '%20')}" |
|
temp_film_store[title] = url |
|
|
|
|
|
for title, seasons in tv_store.items(): |
|
if title not in temp_tv_store: |
|
temp_tv_store[title] = {} |
|
for season, episodes in seasons.items(): |
|
if season not in temp_tv_store[title]: |
|
temp_tv_store[title][season] = {} |
|
for episode, path in episodes.items(): |
|
url = f"{instance_url}/api/tv/{title.replace(' ', '%20')}/{season.replace(' ', '%20')}/{episode.replace(' ', '%20')}" |
|
temp_tv_store[title][season][episode] = url |
|
|
|
logging.info("Film and TV Stores processed successfully.") |
|
self.update_instances_health(instance=instance_url, cache_size=cache_size) |
|
|
|
def start_polling(self): |
|
logging.info("Starting polling.") |
|
while not self.stop_event.is_set(): |
|
self.get_reports() |
|
time.sleep(self.polling_interval) |
|
logging.info("Polling stopped.") |
|
|
|
def stop_polling(self): |
|
logging.info("Stopping polling.") |
|
self.stop_event.set() |
|
|
|
def start_prefetching(self): |
|
"""Start the metadata prefetching in a separate thread.""" |
|
self.prefetch_metadata() |
|
|
|
|
|
|
|
def update_instances_health(self, instance, cache_size): |
|
self.instances_health[instance] = {"used":cache_size["cache_size"], |
|
"total": "50 GB"} |
|
logging.info(f"Updated instance {instance} with cache size {cache_size}") |
|
|
|
|
|
def download_film_to_best_instance(self, title): |
|
""" |
|
Downloads a film to the first instance that has more free space on the self.instance_health list variable. |
|
The instance_health looks like this: |
|
{ |
|
"https://unicone-studio-instance1.hf.space": { |
|
"total": "50 GB", |
|
"used": "3.33 GB" |
|
} |
|
} |
|
Args: |
|
title (str): The title of the film. |
|
""" |
|
best_instance = None |
|
max_free_space = -1 |
|
|
|
|
|
for instance_url, space_info in self.instances_health.items(): |
|
total_space = convert_to_gb(space_info['total']) |
|
used_space = convert_to_gb(space_info['used']) |
|
free_space = total_space - used_space |
|
|
|
if free_space > max_free_space: |
|
max_free_space = free_space |
|
best_instance = instance_url |
|
|
|
if best_instance: |
|
result = self.instances_api.download_film(best_instance, title) |
|
film_id = result["film_id"] |
|
status = result["status"] |
|
progress_url = f'{best_instance}/api/progress/{film_id}' |
|
response = { |
|
"film_id":film_id, |
|
"status":status, |
|
"progress_url":progress_url |
|
} |
|
|
|
return response |
|
else: |
|
logging.error("No suitable instance found for downloading the film.") |
|
return {"error": "No suitable instance found for downloading the film."} |
|
|
|
def download_episode_to_best_instance(self, title, season, episode): |
|
""" |
|
Downloads a episode to the first instance that has more free space on the self.instance_health list variable. |
|
The instance_health looks like this: |
|
{ |
|
"https://unicone-studio-instance1.hf.space": { |
|
"total": "50 GB", |
|
"used": "3.33 GB" |
|
} |
|
} |
|
Args: |
|
title (str): The title of the Tv show. |
|
season (str): The season of the Tv show. |
|
episode (str): The title of the Tv show. |
|
""" |
|
best_instance = None |
|
max_free_space = -1 |
|
|
|
|
|
for instance_url, space_info in self.instances_health.items(): |
|
total_space = convert_to_gb(space_info['total']) |
|
used_space = convert_to_gb(space_info['used']) |
|
free_space = total_space - used_space |
|
|
|
if free_space > max_free_space: |
|
max_free_space = free_space |
|
best_instance = instance_url |
|
|
|
if best_instance: |
|
result = self.instances_api.download_episode(best_instance, title, season, episode) |
|
episode_id = result["episode_id"] |
|
status = result["status"] |
|
progress_url = f'{best_instance}/api/progress/{episode_id}' |
|
response = { |
|
"episode_id":episode_id, |
|
"status":status, |
|
"progress_url":progress_url |
|
} |
|
|
|
return response |
|
else: |
|
logging.error("No suitable instance found for downloading the film.") |
|
return {"error": "No suitable instance found for downloading the film."} |
|
|
|
|
|
def find_movie_path(self, title): |
|
"""Find the path of the movie in the JSON data based on the title.""" |
|
for directory in self.file_structure: |
|
if directory['type'] == 'directory' and directory['path'] == 'films': |
|
for sub_directory in directory['contents']: |
|
if sub_directory['type'] == 'directory': |
|
for item in sub_directory['contents']: |
|
if item['type'] == 'file' and title.lower() in item['path'].lower(): |
|
return item['path'] |
|
return None |
|
|
|
def find_tv_path(self, title): |
|
"""Find the path of the TV show in the JSON data based on the title.""" |
|
for directory in self.file_structure: |
|
if directory['type'] == 'directory' and directory['path'] == 'tv': |
|
for sub_directory in directory['contents']: |
|
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): |
|
return sub_directory['path'] |
|
return None |
|
|
|
def get_tv_structure(self, title): |
|
"""Find the path of the TV show in the JSON data based on the title.""" |
|
for directory in self.file_structure: |
|
if directory['type'] == 'directory' and directory['path'] == 'tv': |
|
for sub_directory in directory['contents']: |
|
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): |
|
return sub_directory |
|
return None |
|
|
|
def get_film_id(self, title): |
|
"""Generate a film ID based on the title.""" |
|
return title.replace(" ", "_").lower() |
|
|
|
def prefetch_metadata(self): |
|
"""Prefetch metadata for all items in the file structure.""" |
|
for item in self.file_structure: |
|
if 'contents' in item: |
|
for sub_item in item['contents']: |
|
original_title = sub_item['path'].split('/')[-1] |
|
media_type = 'series' if item['path'].startswith('tv') else 'movie' |
|
title = original_title |
|
year = None |
|
|
|
|
|
match = re.search(r'\((\d{4})\)', original_title) |
|
if match: |
|
year_str = match.group(1) |
|
if year_str.isdigit() and len(year_str) == 4: |
|
title = original_title[:match.start()].strip() |
|
year = int(year_str) |
|
else: |
|
parts = original_title.rsplit(' ', 1) |
|
if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: |
|
title = parts[0].strip() |
|
year = int(parts[-1]) |
|
|
|
fetch_and_cache_json(original_title, title, media_type, year) |
|
|
|
def get_all_tv_shows(self): |
|
"""Get all TV shows from the indexed cache structure JSON file.""" |
|
tv_shows = {} |
|
for directory in self.file_structure: |
|
if directory['type'] == 'directory' and directory['path'] == 'tv': |
|
for sub_directory in directory['contents']: |
|
if sub_directory['type'] == 'directory': |
|
show_title = sub_directory['path'].split('/')[-1] |
|
tv_shows[show_title] = [] |
|
for season_directory in sub_directory['contents']: |
|
if season_directory['type'] == 'directory': |
|
season = season_directory['path'].split('/')[-1] |
|
for episode in season_directory['contents']: |
|
if episode['type'] == 'file': |
|
tv_shows[show_title].append({ |
|
"season": season, |
|
"episode": episode['path'].split('/')[-1], |
|
"path": episode['path'] |
|
}) |
|
return tv_shows |
|
|
|
def get_all_films(self): |
|
"""Get all films from the indexed cache structure JSON file.""" |
|
films = [] |
|
for directory in self.file_structure: |
|
if directory['type'] == 'directory' and directory['path'] == 'films': |
|
for sub_directory in directory['contents']: |
|
if sub_directory['type'] == 'directory': |
|
films.append(sub_directory['path']) |
|
return films |