random-movie-hf / services /tmdb_client.py
gytcrt
Add Gradio app files
f00d260
import requests
import random
import logging
import os
from typing import Dict, Any, List
from services.omdb_client import omdb_client
# Create logger for this module
logger = logging.getLogger(__name__)
class TMDBClient:
"""Simple TMDB API client."""
def __init__(self):
"""Initialize with API key and base URL."""
self.api_key = os.environ.get("TMDB_API_KEY")
self.base_url = "https://api.themoviedb.org/3"
self._genre_cache = None
def _get_genres(self) -> Dict[str, int]:
"""Get genre name to ID mapping from TMDB API."""
if self._genre_cache is None:
url = f"{self.base_url}/genre/movie/list"
params = {"api_key": self.api_key}
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Create mapping from genre name to ID (case insensitive)
self._genre_cache = {
genre["name"].lower(): genre["id"]
for genre in data.get("genres", [])
}
except requests.RequestException as e:
logger.error(f"Error fetching genres: {e}")
self._genre_cache = {}
return self._genre_cache
def _get_external_ids(self, tmdb_id: int) -> Dict[str, str]:
"""Get external IDs and construct URLs for a movie using its TMDB ID."""
external_urls = {}
try:
url = f"{self.base_url}/movie/{tmdb_id}/external_ids"
params = {"api_key": self.api_key}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# IMDB URL
imdb_id = data.get("imdb_id")
if imdb_id:
external_urls["imdb_id"] = imdb_id
external_urls["imdb_url"] = f"https://www.imdb.com/title/{imdb_id}/"
except requests.RequestException as e:
logger.error(f"Error fetching external IDs for TMDB ID {tmdb_id}: {e}")
return external_urls
def _get_watch_providers(self, tmdb_id: int, country: str) -> Dict[str, Any]:
"""Get watch providers (streaming services) for a movie in the US."""
streaming_data = {
'free_services': [],
'subscription_services': [],
'all_services': []
}
try:
url = f"{self.base_url}/movie/{tmdb_id}/watch/providers"
params = {"api_key": self.api_key}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Always get data for US regardless of movie origin country
country_data = data.get("results", {}).get("US", {})
if not country_data:
return streaming_data
# Extract free services (completely free)
free_providers = country_data.get("free", [])
for provider in free_providers:
service_info = {
'name': provider.get('provider_name', ''),
'logo_url': f"https://image.tmdb.org/t/p/original{provider.get('logo_path', '')}" if provider.get('logo_path') else None,
'type': 'free'
}
streaming_data['free_services'].append(service_info)
streaming_data['all_services'].append(service_info)
# Extract subscription services (free with subscription)
subscription_providers = country_data.get("flatrate", [])
for provider in subscription_providers:
service_info = {
'name': provider.get('provider_name', ''),
'logo_url': f"https://image.tmdb.org/t/p/original{provider.get('logo_path', '')}" if provider.get('logo_path') else None,
'type': 'subscription'
}
streaming_data['subscription_services'].append(service_info)
streaming_data['all_services'].append(service_info)
except requests.RequestException as e:
logger.error(f"Error fetching watch providers for TMDB ID {tmdb_id}: {e}")
except Exception as e:
logger.error(f"Unexpected error fetching watch providers for TMDB ID {tmdb_id}: {e}")
return streaming_data
def _enrich_movie_data(self, movie: Dict[str, Any], country: str) -> Dict[str, Any]:
"""
Format and enrich a single movie with OMDB data and streaming information.
Args:
movie: Raw movie data from TMDB API
country: Country code for streaming data
Returns:
Fully enriched movie data dictionary
"""
tmdb_id = movie.get("id")
formatted_movie = {
"title": movie.get("title", "Unknown"),
"original_title": movie.get("original_title", ""),
"year": movie.get("release_date", "")[:4] if movie.get("release_date") else "Unknown",
"rating": movie.get("vote_average", 0),
"vote_count": movie.get("vote_count", 0),
"overview": movie.get("overview", "No description available"),
"poster_path": movie.get("poster_path"),
"genre_ids": movie.get("genre_ids", []),
"tmdb_id": tmdb_id
}
# Add poster URL if available
if formatted_movie["poster_path"]:
formatted_movie["poster_url"] = f"https://image.tmdb.org/t/p/w500{formatted_movie['poster_path']}"
# Get external IDs and construct URLs
external_ids = self._get_external_ids(tmdb_id)
formatted_movie.update(external_ids)
# Enrich with OMDB data if IMDB ID is available
if formatted_movie.get('imdb_id'):
logger.info(f"Fetching OMDB data for {formatted_movie['title']} ({formatted_movie['imdb_id']})")
omdb_data = omdb_client.get_movie_detail(formatted_movie['imdb_id'])
if omdb_data:
# Add OMDB-specific information
formatted_movie.update({
'director': omdb_data.get('Director', 'N/A'),
'actors': omdb_data.get('Actors', 'N/A'),
'plot': omdb_data.get('Plot', formatted_movie['overview']),
'runtime': omdb_data.get('Runtime', 'N/A'),
'awards': omdb_data.get('Awards', 'N/A'),
'box_office': omdb_data.get('BoxOffice', 'N/A'),
'country': omdb_data.get('Country', 'N/A'),
'language': omdb_data.get('Language', 'N/A'),
'rated': omdb_data.get('Rated', 'N/A'),
'metascore': omdb_data.get('Metascore', 'N/A'),
'imdb_rating': omdb_data.get('imdbRating', 'N/A'),
'imdb_votes': omdb_data.get('imdbVotes', 'N/A'),
'rotten_tomatoes': 'N/A',
'has_omdb_data': True
})
# Extract Rotten Tomatoes rating from ratings array
ratings = omdb_data.get('Ratings', [])
for rating in ratings:
if rating.get('Source') == 'Rotten Tomatoes':
formatted_movie['rotten_tomatoes'] = rating.get('Value', 'N/A')
break
else: # when imdb_id is available but cannot get data from OMDB
formatted_movie['has_omdb_data'] = False
else: # when imdb_id is not available
formatted_movie['has_omdb_data'] = False
# Get watch providers for the movie
streaming_data = self._get_watch_providers(tmdb_id, country)
formatted_movie.update(streaming_data)
return formatted_movie
def get_random_movies(self, decade: int, country: str, genre: str = None, n: int = 5) -> List[Dict[str, Any]]:
"""
Get random movies from the TMDB API with simplified random selection:
1. Search full decade using date range
2. If multiple pages: randomly select 1 page
3. Randomly select n movies from the selected page
"""
try:
# Calculate decade boundaries
start_year = decade
end_year = decade + 9
# Setup search parameters for full decade
url = f"{self.base_url}/discover/movie"
params = {
"api_key": self.api_key,
"primary_release_date.gte": f"{start_year}-01-01",
"primary_release_date.lte": f"{end_year}-12-31",
"with_origin_country": country.upper(),
"vote_count.gte": 1.1, # > 1
"include_adult": True,
"with_runtime.gte": 50,
"page": 1
}
# Add genre filter only if provided (for top countries)
if genre:
genres = self._get_genres()
genre_id = genres.get(genre.lower())
if not genre_id:
logger.warning(f"Genre '{genre}' not found. Available genres: {list(genres.keys())}")
return []
params["with_genres"] = genre_id
# Get first page to check total pages
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
total_pages = data.get("total_pages", 1)
logger.info(f"Found {total_pages} pages of results for {start_year}s")
# Randomly select page if multiple pages available
if total_pages >= 2:
selected_page = random.randint(1, total_pages)
params["page"] = selected_page
logger.info(f"Randomly selected page {selected_page} out of {total_pages}")
# Fetch the selected page
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
else:
logger.info(f"Only 1 page available, using page 1")
movies = data.get("results", [])
# Randomly select n movies from the selected page
logger.info(f"Found {len(movies)} movies on selected page")
if len(movies) < n:
logger.info(f"Only found {len(movies)} movies, returning all")
selected_movies = movies
else:
selected_movies = random.sample(movies, n)
# Format and enrich each movie with OMDB data
formatted_movies = []
for movie in selected_movies:
enriched_movie = self._enrich_movie_data(movie, country)
formatted_movies.append(enriched_movie)
return formatted_movies
except requests.RequestException as e:
logger.error(f"Error fetching movies: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error: {e}")
return []
# Create global instance
tmdb_client = TMDBClient()