import streamlit as st import requests import pandas as pd import pickle import gdown import re import os from Helpers import get_user_recommendation , train_model , get_user_recommendation_XGBoost , seen_movies # Set page configuration st.set_page_config(page_title="Movie Recommendation", page_icon="🎬", layout="wide") st.markdown( """ """, unsafe_allow_html=True ) # CSV files URLs as raw data from GitHub repository moviesCSV = "Data/movies.csv" ratingsCSV = "Data/ratings.csv" linksCSV = "Data/links.csv" # the folloing code is used to download the similarity matrix from google drive if not exist # the folloing code is used to download the similarity matrix from google drive if not exist file_url = 'https://drive.google.com/uc?id=1-1bpusE96_Hh0rUxU7YmBo6RiwYLQGVy' DataBaseCSV = "https://drive.google.com/uc?id=11Soimwc1uKS5VGy_QROifwkdIzl8MZaV" output_path = 'Models/similarity_matrix.pkl' output_path_DataBase = 'Data/XGBoost_database.csv' user_matrix_path = 'Models/User_based_matrix.pkl' @st.cache_data def download_model_from_google_drive(file_url, output_path): gdown.download(file_url, output_path, quiet=False) # # Check if the file already exists if not os.path.exists(output_path): print("Downloading the similarity matrix from Googlr Drive...") # change file permission # os.chmod('Models/', 0o777) download_model_from_google_drive(file_url, output_path) download_model_from_google_drive(DataBaseCSV, output_path_DataBase) print("Download completed......") def display_user_history(history): st.write("Your Watch History:") container = st.container(height=300) for movie in history: with container: container.write(movie) def get_user_history(dataBase, user_id): return dataBase[dataBase['userId'] == user_id]['title'].values # Function to hash passwords def hash_password(password): pass # Login function def login(username, password = None): if isinstance(username, int) and username > 0 and username < 610: return True return False def fetch_movie_details(title, api_key_omdb="23f109b2", api_key_tmdb="b8c96e534866701532768a313b978c8b"): # First, try the OMDb API title = title[:-7] title = title.replace('+', '') url_omdb = f"http://www.omdbapi.com/?t={title}&apikey={api_key_omdb}" response_omdb = requests.get(url_omdb) movie = response_omdb.json() if movie['Response'] == 'True': return movie else: # If OMDb API doesn't find the movie, try the TMDb API url_tmdb_search = f"https://api.themoviedb.org/3/search/movie?api_key={api_key_tmdb}&query={title}" response_tmdb_search = requests.get(url_tmdb_search) search_results = response_tmdb_search.json() if search_results['total_results'] > 0: movie_id = search_results['results'][0]['id'] url_tmdb_movie = f"https://api.themoviedb.org/3/movie/{movie_id}?api_key={api_key_tmdb}" response_tmdb_movie = requests.get(url_tmdb_movie) tmdb_movie = response_tmdb_movie.json() # Convert TMDb response to a similar structure as OMDb response movie = { 'Title': tmdb_movie['title'], 'Year': tmdb_movie['release_date'].split('-')[0] if 'release_date' in tmdb_movie else 'N/A', 'Rated': 'N/A', # TMDb doesn't provide rating info in the same way 'Genre': ', '.join([genre['name'] for genre in tmdb_movie['genres']]), 'Plot': tmdb_movie['overview'], 'Poster': f"https://image.tmdb.org/t/p/w500{tmdb_movie['poster_path']}" if 'poster_path' in tmdb_movie else '', 'imdbRating': tmdb_movie['vote_average'], 'imdbID': tmdb_movie['imdb_id'], 'Response': 'True' } return movie else: return {'Response': 'False', 'Error': 'Movie not found'} def display_movie_details(movie): if movie['Response'] == 'False': st.write(f"Movie not found: {movie['Error']}") return if movie['imdbRating'] == 'N/A': movie['imdbRating'] = 0 imdb_rating = float(movie['imdbRating']) url = f"https://www.imdb.com/title/{movie['imdbID']}/" # Split the plot into lines based on . or , plot_lines = re.split(r'[.,]', movie['Plot']) short_plot = '. '.join(plot_lines[:3]).strip() + '.' st.markdown( f"""

{movie['Title']}

Year: {movie['Year']} Rated: {movie['Rated']}
Genre: {movie['Genre'].replace(',', ' |')}

{short_plot}
{imdb_rating}
""", unsafe_allow_html=True ) def print_movie_details(movie): st.markdown( f"""

{' '.join(movie['title'].split(" ")[:-1])}

Year: {movie['title'].split(" ")[-1]}
Genre: {', '.join(movie['genres'])}
Number of Ratings: {movie['num_ratings']}
IMDb Rating: {round(movie["imdb_rating"],1)}

{movie['avg_rating']}
""", unsafe_allow_html=True ) # Function to load data @st.cache_data def load_data(): movies_df = pd.read_csv(moviesCSV) ratings_df = pd.read_csv(ratingsCSV) links_df = pd.read_csv(linksCSV) DataBase = pd.read_csv(output_path_DataBase) return movies_df, ratings_df, links_df , DataBase # Function to load similarity matrix @st.cache_data def load_similarity_matrix(path): with open(path, 'rb') as f: similarity_df = pickle.load(f) return similarity_df # Function to get movie details def get_movie_details(movie_id, df_movies, df_ratings, df_links): try: imdb_id = df_links[df_links['movieId'] == movie_id]['imdbId'].values[0] tmdb_id = df_links[df_links['movieId'] == movie_id]['tmdbId'].values[0] movie_data = df_movies[df_movies['movieId'] == movie_id].iloc[0] genres = movie_data['genres'].split('|') if 'genres' in movie_data else [] avg_rating = df_ratings[df_ratings['movieId'] == movie_id]['rating'].mean() num_ratings = df_ratings[df_ratings['movieId'] == movie_id].shape[0] api_key = 'b8c96e534866701532768a313b978c8b' response = requests.get(f'https://api.themoviedb.org/3/movie/{tmdb_id}?api_key={api_key}' ) poster_url = response.json().get('poster_path', '') full_poster_url = f'https://image.tmdb.org/t/p/w500{poster_url}' if poster_url else '' imdb_rating = response.json().get('vote_average', 0) return { "title": movie_data['title'], "genres": genres, "avg_rating": round(avg_rating, 2), "num_ratings": num_ratings, "imdb_id": imdb_id, "tmdb_id": tmdb_id, "poster_url": full_poster_url, "imdb_rating": imdb_rating } except Exception as e: st.error(f"Error fetching details for movie ID {movie_id}: {e}") return None # Function to recommend movies def recommend(movie, similarity_df, movies_df, ratings_df, links_df, k=5): try: index = movies_df[movies_df['title'] == movie].index[0] distances = sorted(list(enumerate(similarity_df.iloc[index])), reverse=True, key=lambda x: x[1]) recommended_movies = [] for i in distances[1:]: movie_id = movies_df.iloc[i[0]]['movieId'] num_ratings = ratings_df[ratings_df['movieId'] == movie_id].shape[0] if num_ratings > 100: movie_details = get_movie_details(movie_id, movies_df, ratings_df, links_df) if movie_details: recommended_movies.append(movie_details) if len(recommended_movies) == k: break return recommended_movies except Exception as e: st.error(f"Error generating recommendations: {e}") return [] # Main app def main(): movies_df, ratings_df, links_df, DB_df = load_data() print("Data loaded successfully") print("Loading similarity matrix...") similarity_df = load_similarity_matrix(output_path) st.sidebar.title("Navigation") menu = ["Login", "Movie Similarity"] choice = st.sidebar.selectbox("Select an option", menu) if choice == "Login": num_cols = 2 cols = st.columns(num_cols) with cols[0]: st.title("Movie Recommendations") st.write("Welcome to the Movie Recommendation App!") st.write("Please login to get personalized movie recommendations. username between (1 and 609)") # model selection C = st.selectbox("Select the model", ["User Similarity Matrix", "XGBoost"]) # Login form st.sidebar.header("Login") username = st.sidebar.text_input("Username") if username: username = int(username) if st.sidebar.button("Login"): if login(username): st.sidebar.success("Login successful!") # Fetch user history with cols[1 % num_cols]: user_history = get_user_history(DB_df, username) display_user_history(user_history) if C == "User Similarity Matrix": user_matrix = load_similarity_matrix(user_matrix_path) recommendations = get_user_recommendation(DB_df, user_matrix, username) elif C == "XGBoost": model = train_model(DB_df, username) recommendations, user_seen_movies = get_user_recommendation_XGBoost(DB_df, model, username) else: pass st.write(f"Recommendations for user number {username}:") num_cols = 2 cols = st.columns(num_cols) for i, movie_title in enumerate(recommendations): movie = fetch_movie_details(movie_title) if movie['Response'] == 'True': with cols[i % num_cols]: display_movie_details(movie) else: st.write(f"Movie details for '{movie_title}' not found.") else: st.sidebar.error("Invalid email or password") elif choice == "Movie Similarity": num_cols = 2 cols = st.columns(num_cols) # Movie similarity search with cols[0]: st.title("Find Similar Movies") selected_movie = st.selectbox("Type or select a movie from the dropdown", movies_df['title'].unique()) k = st.slider("Select the number of recommendations (k)", min_value=1, max_value=50, value=5) button = st.button("Find Similar Movies") with cols[1]: st.title("Choosen Movie Details:") if selected_movie: movie = fetch_movie_details(selected_movie) if movie['Response'] == 'True': display_movie_details(movie) else: st.write(f"Movie details for '{selected_movie}' not found.") if button: st.write("The rating bar here is token from our dataset and it's between 0 and 5.") if selected_movie: recommendations = recommend(selected_movie, similarity_df, movies_df, ratings_df, links_df, k) if recommendations: st.write(f"Similar movies to '{selected_movie}':") num_cols = 2 cols = st.columns(num_cols) for i, movie in enumerate(recommendations): with cols[i % num_cols]: print_movie_details(movie) else: st.write("No recommendations found.") else: st.write("Please select a movie.") if __name__ == "__main__": main()