Spaces:
Running
Running
import requests | |
import random | |
import logging | |
import os | |
from typing import Dict, Any, List | |
from services.omdb_client import omdb_client | |
# Create logger for this module | |
logger = logging.getLogger(__name__) | |
class TMDBClient: | |
"""Simple TMDB API client.""" | |
def __init__(self): | |
"""Initialize with API key and base URL.""" | |
self.api_key = os.environ.get("TMDB_API_KEY") | |
self.base_url = "https://api.themoviedb.org/3" | |
self._genre_cache = None | |
def _get_genres(self) -> Dict[str, int]: | |
"""Get genre name to ID mapping from TMDB API.""" | |
if self._genre_cache is None: | |
url = f"{self.base_url}/genre/movie/list" | |
params = {"api_key": self.api_key} | |
try: | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
# Create mapping from genre name to ID (case insensitive) | |
self._genre_cache = { | |
genre["name"].lower(): genre["id"] | |
for genre in data.get("genres", []) | |
} | |
except requests.RequestException as e: | |
logger.error(f"Error fetching genres: {e}") | |
self._genre_cache = {} | |
return self._genre_cache | |
def _get_external_ids(self, tmdb_id: int) -> Dict[str, str]: | |
"""Get external IDs and construct URLs for a movie using its TMDB ID.""" | |
external_urls = {} | |
try: | |
url = f"{self.base_url}/movie/{tmdb_id}/external_ids" | |
params = {"api_key": self.api_key} | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
# IMDB URL | |
imdb_id = data.get("imdb_id") | |
if imdb_id: | |
external_urls["imdb_id"] = imdb_id | |
external_urls["imdb_url"] = f"https://www.imdb.com/title/{imdb_id}/" | |
except requests.RequestException as e: | |
logger.error(f"Error fetching external IDs for TMDB ID {tmdb_id}: {e}") | |
return external_urls | |
def _get_watch_providers(self, tmdb_id: int, country: str) -> Dict[str, Any]: | |
"""Get watch providers (streaming services) for a movie in the US.""" | |
streaming_data = { | |
'free_services': [], | |
'subscription_services': [], | |
'all_services': [] | |
} | |
try: | |
url = f"{self.base_url}/movie/{tmdb_id}/watch/providers" | |
params = {"api_key": self.api_key} | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
# Always get data for US regardless of movie origin country | |
country_data = data.get("results", {}).get("US", {}) | |
if not country_data: | |
return streaming_data | |
# Extract free services (completely free) | |
free_providers = country_data.get("free", []) | |
for provider in free_providers: | |
service_info = { | |
'name': provider.get('provider_name', ''), | |
'logo_url': f"https://image.tmdb.org/t/p/original{provider.get('logo_path', '')}" if provider.get('logo_path') else None, | |
'type': 'free' | |
} | |
streaming_data['free_services'].append(service_info) | |
streaming_data['all_services'].append(service_info) | |
# Extract subscription services (free with subscription) | |
subscription_providers = country_data.get("flatrate", []) | |
for provider in subscription_providers: | |
service_info = { | |
'name': provider.get('provider_name', ''), | |
'logo_url': f"https://image.tmdb.org/t/p/original{provider.get('logo_path', '')}" if provider.get('logo_path') else None, | |
'type': 'subscription' | |
} | |
streaming_data['subscription_services'].append(service_info) | |
streaming_data['all_services'].append(service_info) | |
except requests.RequestException as e: | |
logger.error(f"Error fetching watch providers for TMDB ID {tmdb_id}: {e}") | |
except Exception as e: | |
logger.error(f"Unexpected error fetching watch providers for TMDB ID {tmdb_id}: {e}") | |
return streaming_data | |
def _enrich_movie_data(self, movie: Dict[str, Any], country: str) -> Dict[str, Any]: | |
""" | |
Format and enrich a single movie with OMDB data and streaming information. | |
Args: | |
movie: Raw movie data from TMDB API | |
country: Country code for streaming data | |
Returns: | |
Fully enriched movie data dictionary | |
""" | |
tmdb_id = movie.get("id") | |
formatted_movie = { | |
"title": movie.get("title", "Unknown"), | |
"original_title": movie.get("original_title", ""), | |
"year": movie.get("release_date", "")[:4] if movie.get("release_date") else "Unknown", | |
"rating": movie.get("vote_average", 0), | |
"vote_count": movie.get("vote_count", 0), | |
"overview": movie.get("overview", "No description available"), | |
"poster_path": movie.get("poster_path"), | |
"genre_ids": movie.get("genre_ids", []), | |
"tmdb_id": tmdb_id | |
} | |
# Add poster URL if available | |
if formatted_movie["poster_path"]: | |
formatted_movie["poster_url"] = f"https://image.tmdb.org/t/p/w500{formatted_movie['poster_path']}" | |
# Get external IDs and construct URLs | |
external_ids = self._get_external_ids(tmdb_id) | |
formatted_movie.update(external_ids) | |
# Enrich with OMDB data if IMDB ID is available | |
if formatted_movie.get('imdb_id'): | |
logger.info(f"Fetching OMDB data for {formatted_movie['title']} ({formatted_movie['imdb_id']})") | |
omdb_data = omdb_client.get_movie_detail(formatted_movie['imdb_id']) | |
if omdb_data: | |
# Add OMDB-specific information | |
formatted_movie.update({ | |
'director': omdb_data.get('Director', 'N/A'), | |
'actors': omdb_data.get('Actors', 'N/A'), | |
'plot': omdb_data.get('Plot', formatted_movie['overview']), | |
'runtime': omdb_data.get('Runtime', 'N/A'), | |
'awards': omdb_data.get('Awards', 'N/A'), | |
'box_office': omdb_data.get('BoxOffice', 'N/A'), | |
'country': omdb_data.get('Country', 'N/A'), | |
'language': omdb_data.get('Language', 'N/A'), | |
'rated': omdb_data.get('Rated', 'N/A'), | |
'metascore': omdb_data.get('Metascore', 'N/A'), | |
'imdb_rating': omdb_data.get('imdbRating', 'N/A'), | |
'imdb_votes': omdb_data.get('imdbVotes', 'N/A'), | |
'rotten_tomatoes': 'N/A', | |
'has_omdb_data': True | |
}) | |
# Extract Rotten Tomatoes rating from ratings array | |
ratings = omdb_data.get('Ratings', []) | |
for rating in ratings: | |
if rating.get('Source') == 'Rotten Tomatoes': | |
formatted_movie['rotten_tomatoes'] = rating.get('Value', 'N/A') | |
break | |
else: # when imdb_id is available but cannot get data from OMDB | |
formatted_movie['has_omdb_data'] = False | |
else: # when imdb_id is not available | |
formatted_movie['has_omdb_data'] = False | |
# Get watch providers for the movie | |
streaming_data = self._get_watch_providers(tmdb_id, country) | |
formatted_movie.update(streaming_data) | |
return formatted_movie | |
def get_random_movies(self, decade: int, country: str, genre: str = None, n: int = 5) -> List[Dict[str, Any]]: | |
""" | |
Get random movies from the TMDB API with simplified random selection: | |
1. Search full decade using date range | |
2. If multiple pages: randomly select 1 page | |
3. Randomly select n movies from the selected page | |
""" | |
try: | |
# Calculate decade boundaries | |
start_year = decade | |
end_year = decade + 9 | |
# Setup search parameters for full decade | |
url = f"{self.base_url}/discover/movie" | |
params = { | |
"api_key": self.api_key, | |
"primary_release_date.gte": f"{start_year}-01-01", | |
"primary_release_date.lte": f"{end_year}-12-31", | |
"with_origin_country": country.upper(), | |
"vote_count.gte": 1.1, # > 1 | |
"include_adult": True, | |
"with_runtime.gte": 50, | |
"page": 1 | |
} | |
# Add genre filter only if provided (for top countries) | |
if genre: | |
genres = self._get_genres() | |
genre_id = genres.get(genre.lower()) | |
if not genre_id: | |
logger.warning(f"Genre '{genre}' not found. Available genres: {list(genres.keys())}") | |
return [] | |
params["with_genres"] = genre_id | |
# Get first page to check total pages | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
total_pages = data.get("total_pages", 1) | |
logger.info(f"Found {total_pages} pages of results for {start_year}s") | |
# Randomly select page if multiple pages available | |
if total_pages >= 2: | |
selected_page = random.randint(1, total_pages) | |
params["page"] = selected_page | |
logger.info(f"Randomly selected page {selected_page} out of {total_pages}") | |
# Fetch the selected page | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
else: | |
logger.info(f"Only 1 page available, using page 1") | |
movies = data.get("results", []) | |
# Randomly select n movies from the selected page | |
logger.info(f"Found {len(movies)} movies on selected page") | |
if len(movies) < n: | |
logger.info(f"Only found {len(movies)} movies, returning all") | |
selected_movies = movies | |
else: | |
selected_movies = random.sample(movies, n) | |
# Format and enrich each movie with OMDB data | |
formatted_movies = [] | |
for movie in selected_movies: | |
enriched_movie = self._enrich_movie_data(movie, country) | |
formatted_movies.append(enriched_movie) | |
return formatted_movies | |
except requests.RequestException as e: | |
logger.error(f"Error fetching movies: {e}") | |
return [] | |
except Exception as e: | |
logger.error(f"Unexpected error: {e}") | |
return [] | |
# Create global instance | |
tmdb_client = TMDBClient() | |