load-balancer / LoadBalancer.py
ChandimaPrabath's picture
0.0.2.7 V Beta
98b82d8
import os
import json
from indexer import indexer
import re
from tvdb import fetch_and_cache_json
from threading import Event, Thread
import time
import logging
from utils import convert_to_gb
from api import InstancesAPI
CACHE_DIR = os.getenv("CACHE_DIR")
download_progress = {}
class LoadBalancer:
def __init__(self, cache_dir, index_file, token, repo, polling_interval=4, max_retries=3, initial_delay=1):
self.version = "0.0.2.10 V Beta"
self.instances = []
self.instances_health = {}
self.polling_interval = polling_interval
self.max_retries = max_retries
self.initial_delay = initial_delay
self.stop_event = Event()
self.instances_api = InstancesAPI(self.instances)
self.CACHE_DIR = cache_dir
self.INDEX_FILE = index_file
self.TOKEN = token
self.REPO = repo
self.FILM_STORE = {}
self.TV_STORE = {}
self.file_structure = None
self.index_file_last_modified = None
# Ensure CACHE_DIR exists
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
# Index the file structure initially
indexer()
# Load the file structure JSON
self.load_file_structure()
# Start polling and file checking in separate threads
polling_thread = Thread(target=self.start_polling)
polling_thread.daemon = True
polling_thread.start()
file_checking_thread = Thread(target=self.check_file_updates)
file_checking_thread.daemon = True
file_checking_thread.start()
def load_file_structure(self):
if not os.path.exists(self.INDEX_FILE):
raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.")
with open(self.INDEX_FILE, 'r') as f:
self.file_structure = json.load(f)
logging.info("File structure loaded successfully.")
def check_file_updates(self):
while not self.stop_event.is_set():
if self.index_file_last_modified != os.path.getmtime(self.INDEX_FILE):
logging.info(f"{self.INDEX_FILE} has been updated. Re-indexing...")
indexer() # Re-run the indexer
self.load_file_structure() # Reload the file structure
self.index_file_last_modified = os.path.getmtime(self.INDEX_FILE)
# Restart prefetching thread
if hasattr(self, 'prefetch_thread') and self.prefetch_thread.is_alive():
self.prefetch_thread.join()
self.prefetch_thread = Thread(target=self.start_prefetching)
self.prefetch_thread.daemon = True
self.prefetch_thread.start()
time.sleep(120) # Check every 2 minutes
def register_instance(self, instance_url):
if instance_url not in self.instances:
self.instances.append(instance_url)
logging.info(f"Registered instance {instance_url}")
else:
logging.info(f"Instance {instance_url} is already registered.")
def remove_instance(self, instance_url):
if instance_url in self.instances:
self.instances.remove(instance_url)
self.instances_health.pop(instance_url, None)
logging.info(f"Removed instance {instance_url}")
else:
logging.info(f"Instance {instance_url} not found for removal.")
def get_reports(self):
reports = self.instances_api.fetch_reports()
# Initialize temporary JSON data holders
temp_film_store = {}
temp_tv_store = {}
for instance_url in self.instances[:]: # Copy list to avoid modification during iteration
if instance_url in reports:
report = reports[instance_url]
logging.info(f"Report from {instance_url}: {report}")
self.process_report(instance_url, report, temp_film_store, temp_tv_store)
else:
logging.error(f"Failed to get report from {instance_url}. Removing instance.")
self.remove_instance(instance_url)
self.FILM_STORE = temp_film_store
self.TV_STORE = temp_tv_store
def process_report(self, instance_url, report, temp_film_store, temp_tv_store):
film_store = report.get('film_store', {})
tv_store = report.get('tv_store', {})
cache_size = report.get('cache_size')
logging.info(f"Processing report from {instance_url}")
# Update temporary film store
for title, path in film_store.items():
url = f"{instance_url}/api/film/{title.replace(' ', '%20')}"
temp_film_store[title] = url
# Update temporary TV store
for title, seasons in tv_store.items():
if title not in temp_tv_store:
temp_tv_store[title] = {}
for season, episodes in seasons.items():
if season not in temp_tv_store[title]:
temp_tv_store[title][season] = {}
for episode, path in episodes.items():
url = f"{instance_url}/api/tv/{title.replace(' ', '%20')}/{season.replace(' ', '%20')}/{episode.replace(' ', '%20')}"
temp_tv_store[title][season][episode] = url
logging.info("Film and TV Stores processed successfully.")
self.update_instances_health(instance=instance_url, cache_size=cache_size)
def start_polling(self):
logging.info("Starting polling.")
while not self.stop_event.is_set():
self.get_reports()
time.sleep(self.polling_interval)
logging.info("Polling stopped.")
def stop_polling(self):
logging.info("Stopping polling.")
self.stop_event.set()
def start_prefetching(self):
"""Start the metadata prefetching in a separate thread."""
self.prefetch_metadata()
#################################################################
def update_instances_health(self, instance, cache_size):
self.instances_health[instance] = {"used":cache_size["cache_size"],
"total": "50 GB"}
logging.info(f"Updated instance {instance} with cache size {cache_size}")
def download_film_to_best_instance(self, title):
"""
Downloads a film to the first instance that has more free space on the self.instance_health list variable.
The instance_health looks like this:
{
"https://unicone-studio-instance1.hf.space": {
"total": "50 GB",
"used": "3.33 GB"
}
}
Args:
title (str): The title of the film.
"""
best_instance = None
max_free_space = -1
# Calculate free space for each instance
for instance_url, space_info in self.instances_health.items():
total_space = convert_to_gb(space_info['total'])
used_space = convert_to_gb(space_info['used'])
free_space = total_space - used_space
if free_space > max_free_space:
max_free_space = free_space
best_instance = instance_url
if best_instance:
result = self.instances_api.download_film(best_instance, title)
film_id = result["film_id"]
status = result["status"]
progress_url = f'{best_instance}/api/progress/{film_id}'
response = {
"film_id":film_id,
"status":status,
"progress_url":progress_url
}
return response
else:
logging.error("No suitable instance found for downloading the film.")
return {"error": "No suitable instance found for downloading the film."}
def download_episode_to_best_instance(self, title, season, episode):
"""
Downloads a episode to the first instance that has more free space on the self.instance_health list variable.
The instance_health looks like this:
{
"https://unicone-studio-instance1.hf.space": {
"total": "50 GB",
"used": "3.33 GB"
}
}
Args:
title (str): The title of the Tv show.
season (str): The season of the Tv show.
episode (str): The title of the Tv show.
"""
best_instance = None
max_free_space = -1
# Calculate free space for each instance
for instance_url, space_info in self.instances_health.items():
total_space = convert_to_gb(space_info['total'])
used_space = convert_to_gb(space_info['used'])
free_space = total_space - used_space
if free_space > max_free_space:
max_free_space = free_space
best_instance = instance_url
if best_instance:
result = self.instances_api.download_episode(best_instance, title, season, episode)
episode_id = result["episode_id"]
status = result["status"]
progress_url = f'{best_instance}/api/progress/{episode_id}'
response = {
"episode_id":episode_id,
"status":status,
"progress_url":progress_url
}
return response
else:
logging.error("No suitable instance found for downloading the film.")
return {"error": "No suitable instance found for downloading the film."}
#################################################################
def find_movie_path(self, title):
"""Find the path of the movie in the JSON data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'films':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory':
for item in sub_directory['contents']:
if item['type'] == 'file' and title.lower() in item['path'].lower():
return item['path']
return None
def find_tv_path(self, title):
"""Find the path of the TV show in the JSON data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_tv_structure(self, title):
"""Find the path of the TV show in the JSON data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
return sub_directory
return None
def get_film_id(self, title):
"""Generate a film ID based on the title."""
return title.replace(" ", "_").lower()
def prefetch_metadata(self):
"""Prefetch metadata for all items in the file structure."""
for item in self.file_structure:
if 'contents' in item:
for sub_item in item['contents']:
original_title = sub_item['path'].split('/')[-1]
media_type = 'series' if item['path'].startswith('tv') else 'movie'
title = original_title
year = None
# Extract year from the title if available
match = re.search(r'\((\d{4})\)', original_title)
if match:
year_str = match.group(1)
if year_str.isdigit() and len(year_str) == 4:
title = original_title[:match.start()].strip()
year = int(year_str)
else:
parts = original_title.rsplit(' ', 1)
if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4:
title = parts[0].strip()
year = int(parts[-1])
fetch_and_cache_json(original_title, title, media_type, year)
def get_all_tv_shows(self):
"""Get all TV shows from the indexed cache structure JSON file."""
tv_shows = {}
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory':
show_title = sub_directory['path'].split('/')[-1]
tv_shows[show_title] = []
for season_directory in sub_directory['contents']:
if season_directory['type'] == 'directory':
season = season_directory['path'].split('/')[-1]
for episode in season_directory['contents']:
if episode['type'] == 'file':
tv_shows[show_title].append({
"season": season,
"episode": episode['path'].split('/')[-1],
"path": episode['path']
})
return tv_shows
def get_all_films(self):
"""Get all films from the indexed cache structure JSON file."""
films = []
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'films':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory':
films.append(sub_directory['path'])
return films