load-balancer / app.py
ChandimaPrabath's picture
season metadata api update
97c0fdb
from fastapi import FastAPI,HTTPException, Request, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Optional, List
from LoadBalancer import LoadBalancer
import logging
import os
import urllib.parse
from utils import read_json_file, is_valid_url
from tvdb import recent_list, genre_list
CACHE_DIR = os.getenv("CACHE_DIR")
TOKEN = os.getenv("TOKEN")
REPO = os.getenv("REPO")
app = FastAPI()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup_event():
global load_balancer
load_balancer = LoadBalancer(cache_dir=CACHE_DIR, token=TOKEN, repo=REPO)
@app.get("/")
def greet_json():
return {"Version": load_balancer.version}
@app.post("/api/post/register")
async def register_instance(request: Request):
try:
data = await request.json()
if not data or "url" not in data:
return JSONResponse(content={"error": "No URL provided"}, status_code=400)
url = data["url"]
if not is_valid_url(url):
return JSONResponse(content={"error": "Invalid URL"}, status_code=400)
# Register the instance
load_balancer.register_instance(url)
logging.info(f"Instance registered: {url}")
return JSONResponse(content={"message": f"Instance {url} registered successfully"}, status_code=200)
except Exception as e:
logging.error(f"Error registering instance: {e}")
return JSONResponse(content={"error": "Failed to register instance"}, status_code=500)
@app.get("/api/get/file_structure")
async def get_file_structure():
return load_balancer.file_structure
@app.get("/api/get/movie/store")
async def get_movie_store():
return load_balancer.FILM_STORE
@app.get("/api/get/series/store")
async def get_series_store():
return load_balancer.TV_STORE
@app.get("/api/get/movie/all")
async def get_all_movies_api():
return load_balancer.get_all_films()
@app.get("/api/get/series/all")
async def get_all_tvshows_api():
return load_balancer.get_all_tv_shows()
@app.get("/api/get/recent")
async def get_recent_items(limit: int = 5):
# Get sorted entries
recent_films = recent_list.get_sorted_entries('film')
recent_series = recent_list.get_sorted_entries('series')
# Slice the lists to only return the desired number of items
limited_films = recent_films[:limit]
limited_series = recent_series[:limit]
# Return combined results
return JSONResponse(content={
'movies': limited_films,
'series': limited_series
})
@app.get("/api/get/genre_categories")
async def get_genre_categories(media_type: Optional[str] = Query(None, description="Filter by media type: 'movie' or 'series'")):
"""
Retrieve all available genre categories along with their density (number of media items).
Query Parameters:
media_type: Optional. Filter by media type ('movie' or 'series'). If not provided, returns the total count.
Returns:
A JSON response containing a list of genre objects, for example:
[{'name': 'Comedy', 'density': 12}, {'name': 'Drama', 'density': 8}, ...]
"""
try:
categories = [
{
"name": genre,
"density": sum(
1 for entry in data["entries"].values()
if media_type is None or entry[3] == media_type
)
}
for genre, data in sorted(genre_list.genres.items())
]
return JSONResponse(content={"genres": categories})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error retrieving genre categories: {str(e)}")
@app.get("/api/get/genre")
async def get_genre_items(
genre: List[str] = Query(...),
media_type: Optional[str] = None,
limit: int = 5,
page: int = 1
):
"""
Get recent items from specified genres with an optional media type filter, a limit on the number of results, and pagination.
:param genre: The genres to filter by (e.g., 'Comedy').
:param media_type: Optional. Filter by media type ('movie' or 'series').
:param limit: The maximum number of items to return for each media type.
:param page: The page number for pagination.
:return: A JSON response containing the filtered items.
"""
# Get sorted entries based on genres and media type
entries = genre_list.get_entries_by_multiple_genres(genre, media_type=media_type)
# Separate entries by media type and include only the title
movies = [{'title': entry[0]} for entry in entries if entry[4] == 'movie']
series = [{'title': entry[0]} for entry in entries if entry[4] == 'series']
# Calculate pagination
start = (page - 1) * limit
end = start + limit
# Limit the number of items for each media type
limited_movies = movies[start:end]
limited_series = series[start:end]
# Organize the results by media type
results = {
'movies': limited_movies,
'series': limited_series,
'page': page,
'limit': limit,
'total_movies': len(movies),
'total_series': len(series)
}
# Return the results in a JSON response
return JSONResponse(content=results)
@app.get("/api/get/movie/metadata/{title}")
async def get_movie_metadata_api(title: str):
"""Endpoint to get the movie metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'movie')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
return JSONResponse(content=data)
raise HTTPException(status_code=404, detail="Metadata not found")
@app.get("/api/get/movie/card/{title}")
async def get_movie_card_api(title: str):
"""Endpoint to get the movie metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'movie')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
image = data['data']['image']
trailers = data['data']['trailers'] or []
eng_title = None
banner = []
portrait =[]
overview = None
if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'):
for name in data['data']['translations']['nameTranslations']:
if name['language'] == 'eng':
eng_title = name.get('name')
break
if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'):
overviews = data['data']['translations']['overviewTranslations']
# Check if there's an English overview
for o in overviews:
if o['language'] == 'eng':
overview = o.get('overview')
break
# If no English overview is found and there's only one translation, use it
if not overview and len(overviews) == 1:
overview = overviews[0].get('overview')
if data['data'].get('artworks'):
for artwork in data['data']['artworks']:
if artwork['type'] == 15:
banner.append(artwork)
if data['data'].get('artworks'):
for artwork in data['data']['artworks']:
if artwork['type'] == 14:
portrait.append(artwork)
year = data['data']['year']
return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image,'portrait':portrait, 'banner':banner, 'overview':overview, 'trailers': trailers})
raise HTTPException(status_code=404, detail="Card not found")
@app.get("/api/get/series/metadata/{title}")
async def get_series_metadata_api(title: str):
"""Endpoint to get the TV show metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'series')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
# Add the file structure to the metadata
tv_structure_data = load_balancer.get_tv_structure(title)
if tv_structure_data:
data['file_structure'] = tv_structure_data
return JSONResponse(content=data)
raise HTTPException(status_code=404, detail="Metadata not found")
@app.get("/api/get/series/card/{title}")
async def get_series_card_api(title: str):
"""Endpoint to get the TV show metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'series')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
image = data['data']['image']
trailers = data['data']['trailers'] or []
eng_title = None
overview = None
portrait = []
banner = []
if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'):
for name in data['data']['translations']['nameTranslations']:
if name['language'] == 'eng':
eng_title = name.get('name')
break
year = data['data']['year']
if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'):
overviews = data['data']['translations']['overviewTranslations']
# Check if there's an English overview
for o in overviews:
if o['language'] == 'eng':
overview = o.get('overview')
break
# If no English overview is found and there's only one translation, use it
if not overview and len(overviews) == 1:
overview = overviews[0].get('overview')
if data['data'].get('artworks'):
for artwork in data['data']['artworks']:
if artwork['type'] == 3:
banner.append(artwork)
if data['data'].get('artworks'):
for artwork in data['data']['artworks']:
if artwork['type'] == 2:
portrait.append(artwork)
return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image, 'portrait':portrait,'banner': banner, 'overview':overview, 'trailers': trailers})
raise HTTPException(status_code=404, detail="Card not found")
@app.get("/api/get/series/metadata/{title}/{season}")
async def get_season_metadata_api(title: str, season: str):
"""Endpoint to get the TV show season metadata by title and season."""
if not season:
raise HTTPException(status_code=400, detail="Season must be provided and cannot be empty")
# Convert series_id to string before joining the path
json_cache_path = os.path.join(CACHE_DIR, "metadata", title, f"{season}.json")
print(json_cache_path)
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
return JSONResponse(content=data)
raise HTTPException(status_code=404, detail="Metadata not found")
@app.get('/api/get/instances')
async def get_instances():
return load_balancer.instances
@app.get('/api/get/instances/health')
async def get_instances_health():
return load_balancer.instances_health
@app.get("/api/get/movie/{title}")
async def get_movie_api(title: str):
"""Endpoint to get the movie by title."""
if not title:
raise HTTPException(status_code=400, detail="Title parameter is required")
# Check if the movie is already cached
if title in load_balancer.FILM_STORE:
url = load_balancer.FILM_STORE[title]
return JSONResponse(content={"url": url})
movie_path = load_balancer.find_movie_path(title)
if not movie_path:
raise HTTPException(status_code=404, detail="Movie not found")
# Start the download in an instance
response = load_balancer.download_film_to_best_instance(title=title)
if response:
return JSONResponse(content=response)
@app.get("/api/get/series/{title}/{season}/{episode}")
async def get_tv_show_api(title: str, season: str, episode: str):
"""Endpoint to get the TV show by title, season, and episode."""
if not title or not season or not episode:
raise HTTPException(status_code=400, detail="Title, season, and episode parameters are required")
# Check if the episode is already cached
if title in load_balancer.TV_STORE and season in load_balancer.TV_STORE[title]:
for ep in load_balancer.TV_STORE[title][season]:
if episode in ep:
url = load_balancer.TV_STORE[title][season][ep]
return JSONResponse(content={"url": url})
tv_path = load_balancer.find_tv_path(title)
if not tv_path:
raise HTTPException(status_code=404, detail="TV show not found")
episode_path = None
for directory in load_balancer.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
for season_dir in sub_directory['contents']:
if season_dir['type'] == 'directory' and season in season_dir['path']:
for episode_file in season_dir['contents']:
if episode_file['type'] == 'file' and episode in episode_file['path']:
episode_path = episode_file['path']
break
if not episode_path:
raise HTTPException(status_code=404, detail="Episode not found")
# Start the download in an instance
response = load_balancer.download_episode_to_best_instance(title=title, season=season, episode=episode)
if response:
return JSONResponse(content=response)