| import pandas as pd |
| from huggingface_hub import hf_hub_download |
| |
| |
| |
| repo_id = "eka416/movies" |
| df = pd.read_csv(hf_hub_download(repo_id=repo_id, filename="TMDB_movie_dataset_v11.csv", repo_type="dataset")) |
| |
| from collections import defaultdict |
| import numpy as np |
| import time |
| import gensim |
| from gensim.models.keyedvectors import KeyedVectors |
| from sklearn.decomposition import TruncatedSVD |
| import matplotlib.pyplot as plt |
| import pickle |
|
|
| import gradio as gr |
| |
|
|
| from cogworks_data.language import get_data_path |
| |
|
|
| class movie: |
| def __init__(self, name, idd, keywords_vector, hot, index): |
| self.name = name |
| self.id = idd |
| self.text_vector = keywords_vector |
| self.genre_vector = hot |
| self.index = index |
|
|
| class Filter: |
| def __init__(self): |
| self.lang = [] |
| self.date = False |
| self.before = None |
| self.after = None |
| self.pop = 0 |
| self.rat = 0 |
| self.runtime = False |
| self.more = None |
| self.less = None |
| self.no_adult = False |
| self.company = [] |
| self.rev = 0 |
|
|
| def add_lang(self, langu): |
| self.lang = langu |
|
|
| def dates(self, after, before = 2030): |
| self.date = True |
| self.after = after |
| self.before = before |
|
|
| def popp(self, pop): |
| self.pop = pop |
|
|
| def ratt(self, rat): |
| self.rat = rat |
|
|
| def length(self, less, more = 0): |
| self.runtime = True |
| self.less = less |
| self.more = more |
|
|
| def adult(self): |
| self.no_adult = True |
|
|
| def add_comp(self, comp): |
| self.company =comp |
|
|
| def revenue(self, num): |
| self.rev = num |
|
|
|
|
|
|
|
|
|
|
| path = get_data_path("glove.6B.50d.txt.w2v") |
| t0 = time.time() |
| glove = KeyedVectors.load_word2vec_format(path, binary=False) |
| t1 = time.time() |
|
|
| from sentence_transformers import SentenceTransformer |
| model = SentenceTransformer('all-MiniLM-L6-v2') |
| all_genres = ['TV Movie', 'Animation', 'Adventure', 'Science Fiction', 'Action', 'Horror', 'History', 'Family', 'Western', 'Drama', 'Mystery', 'Romance', 'Music', 'Fantasy', 'Crime', 'War', 'Documentary', 'Thriller', 'Comedy'] |
|
|
| with open(hf_hub_download(repo_id=repo_id, filename="all_vectors_genre.pkl", repo_type="dataset"), "rb") as f: |
| all_vectors_genre = pickle.load(f) |
| with open(hf_hub_download(repo_id=repo_id, filename="index_map.pkl", repo_type="dataset"), "rb") as f: |
| index_map = pickle.load(f) |
| with open(hf_hub_download(repo_id=repo_id, filename="movies3.pkl", repo_type="dataset"), "rb") as f: |
| movies = pickle.load(f) |
| with open(hf_hub_download(repo_id=repo_id, filename="all_vectors_text.pkl", repo_type="dataset"), "rb") as f: |
| all_vectors_text = pickle.load(f) |
| with open(hf_hub_download(repo_id=repo_id, filename="weights.pkl", repo_type="dataset"), "rb") as f: |
| w = pickle.load(f) |
|
|
| all_vectors_year = [] |
| for movie in movies: |
| ind = movie.index |
| if type(df.loc[ind, "release_date"]) == str: |
| all_vectors_year.append(int(df.loc[ind, "release_date"][0:4])) |
| else: |
| all_vectors_year.append(0) |
| all_vectors_year = np.array(all_vectors_year) |
| languages = { |
| "ab": "Abkhazian", |
| "aa": "Afar", |
| "af": "Afrikaans", |
| "ak": "Akan", |
| "sq": "Albanian", |
| "am": "Amharic", |
| "ar": "Arabic", |
| "an": "Aragonese", |
| "hy": "Armenian", |
| "as": "Assamese", |
| "av": "Avaric", |
| "ae": "Avestan", |
| "ay": "Aymara", |
| "az": "Azerbaijani", |
| "bm": "Bambara", |
| "ba": "Bashkir", |
| "eu": "Basque", |
| "be": "Belarusian", |
| "bn": "Bengali", |
| "bi": "Bislama", |
| "bs": "Bosnian", |
| "br": "Breton", |
| "bg": "Bulgarian", |
| "my": "Burmese", |
| "ca": "Catalan", |
| "ch": "Chamorro", |
| "ce": "Chechen", |
| "ny": "Chichewa", |
| "cn": "Cantonese", |
| "zh": "Chinese", |
| "cu": "Church Slavic", |
| "cv": "Chuvash", |
| "kw": "Cornish", |
| "co": "Corsican", |
| "cr": "Cree", |
| "hr": "Croatian", |
| "cs": "Czech", |
| "da": "Danish", |
| "dv": "Divehi", |
| "nl": "Dutch", |
| "dz": "Dzongkha", |
| "en": "English", |
| "eo": "Esperanto", |
| "et": "Estonian", |
| "ee": "Ewe", |
| "fo": "Faroese", |
| "fj": "Fijian", |
| "fi": "Finnish", |
| "fr": "French", |
| "ff": "Fulah", |
| "gd": "Scottish Gaelic", |
| "gl": "Galician", |
| "lg": "Ganda", |
| "ka": "Georgian", |
| "de": "German", |
| "el": "Greek", |
| "gn": "Guarani", |
| "gu": "Gujarati", |
| "ht": "Haitian Creole", |
| "ha": "Hausa", |
| "he": "Hebrew", |
| "hz": "Herero", |
| "hi": "Hindi", |
| "ho": "Hiri Motu", |
| "hu": "Hungarian", |
| "is": "Icelandic", |
| "io": "Ido", |
| "ig": "Igbo", |
| "id": "Indonesian", |
| "ia": "Interlingua", |
| "ie": "Interlingue", |
| "iu": "Inuktitut", |
| "ik": "Inupiaq", |
| "ga": "Irish", |
| "it": "Italian", |
| "ja": "Japanese", |
| "jv": "Javanese", |
| "kl": "Kalaallisut (Greenlandic)", |
| "kn": "Kannada", |
| "kr": "Kanuri", |
| "ks": "Kashmiri", |
| "kk": "Kazakh", |
| "km": "Khmer", |
| "ki": "Kikuyu", |
| "rw": "Kinyarwanda", |
| "ky": "Kyrgyz", |
| "kv": "Komi", |
| "kg": "Kongo", |
| "ko": "Korean", |
| "kj": "Kuanyama", |
| "ku": "Kurdish", |
| "lo": "Lao", |
| "la": "Latin", |
| "lv": "Latvian", |
| "li": "Limburgish", |
| "ln": "Lingala", |
| "lt": "Lithuanian", |
| "lu": "Luba-Katanga", |
| "lb": "Luxembourgish", |
| "mk": "Macedonian", |
| "mg": "Malagasy", |
| "ms": "Malay", |
| "ml": "Malayalam", |
| "mt": "Maltese", |
| "gv": "Manx", |
| "mi": "Maori", |
| "mr": "Marathi", |
| "mh": "Marshallese", |
| "mo": "Moldovan", |
| "mn": "Mongolian", |
| "na": "Nauru", |
| "nv": "Navajo", |
| "nd": "North Ndebele", |
| "nr": "South Ndebele", |
| "ng": "Ndonga", |
| "ne": "Nepali", |
| "se": "Northern Sami", |
| "no": "Norwegian", |
| "nb": "Norwegian Bokmål", |
| "nn": "Norwegian Nynorsk", |
| "ii": "Sichuan Yi", |
| "oc": "Occitan", |
| "oj": "Ojibwa", |
| "or": "Oriya", |
| "om": "Oromo", |
| "os": "Ossetian", |
| "pi": "Pali", |
| "pa": "Punjabi", |
| "ps": "Pashto", |
| "fa": "Persian", |
| "pl": "Polish", |
| "pt": "Portuguese", |
| "qu": "Quechua", |
| "rm": "Romansh", |
| "ro": "Romanian", |
| "rn": "Kirundi", |
| "ru": "Russian", |
| "sm": "Samoan", |
| "sg": "Sango", |
| "sa": "Sanskrit", |
| "sc": "Sardinian", |
| "sr": "Serbian", |
| "sn": "Shona", |
| "sh": "Serbo-Croatian", |
| "sd": "Sindhi", |
| "si": "Sinhala", |
| "sk": "Slovak", |
| "sl": "Slovenian", |
| "so": "Somali", |
| "st": "Southern Sotho", |
| "es": "Spanish", |
| "su": "Sundanese", |
| "sw": "Swahili", |
| "ss": "Swati", |
| "sv": "Swedish", |
| "tl": "Tagalog", |
| "ty": "Tahitian", |
| "tg": "Tajik", |
| "ta": "Tamil", |
| "tt": "Tatar", |
| "te": "Telugu", |
| "th": "Thai", |
| "bo": "Tibetan", |
| "ti": "Tigrinya", |
| "to": "Tongan", |
| "ts": "Tsonga", |
| "tn": "Tswana", |
| "tr": "Turkish", |
| "tk": "Turkmen", |
| "tw": "Twi", |
| "ug": "Uighur", |
| "uk": "Ukrainian", |
| "ur": "Urdu", |
| "uz": "Uzbek", |
| "ve": "Venda", |
| "vi": "Vietnamese", |
| "vo": "Volapük", |
| "wa": "Walloon", |
| "cy": "Welsh", |
| "wo": "Wolof", |
| "xh": "Xhosa", |
| "xx": "Unknown/Other", |
| "yi": "Yiddish", |
| "yo": "Yoruba", |
| "za": "Zhuang", |
| "zu": "Zulu", |
| } |
|
|
| def ml(movies_list, k, *ratings): |
| from sklearn.metrics.pairwise import cosine_similarity |
| k = int(k) |
| movie_count = len(movies_list) |
| ratings = list(ratings) |
| text_sim = np.zeros((k, movie_count)) |
| genre_sim = np.zeros((k, movie_count)) |
| date_sim = np.zeros((k, movie_count)) |
| for i in range(k): |
| for j in range(movie_count): |
| text_sim[i, j]= cosine_similarity(movies[rec_global[i]].text_vector.reshape(1,-1), movies[user_inp[j]].text_vector.reshape(1, -1)).flatten() |
| genre_sim[i, j] = cosine_similarity(movies[rec_global[i]].genre_vector.reshape(1,-1), movies[user_inp[j]].genre_vector.reshape(1, -1)).flatten() |
| year = int(df.loc[movies[user_inp[j]].index, "release_date"][0:4]) |
| year2 = int(df.loc[movies[rec_global[i]].index, "release_date"][0:4]) |
| differ = np.abs(year2 - year) |
| date_sim[i, j] = np.exp(-differ / 5) |
|
|
| t = algo_type(algo, text_sim) |
| g = algo_type(algo, genre_sim) |
| d = algo_type(algo, date_sim) |
|
|
| for i in range(len(ratings)): |
| if ratings[i] == "N/A": |
| ratings[i] = 3 |
|
|
| print(w) |
| neww = update_session(w, np.stack([t, g, d], axis = 1), ratings, eta=0.1, passes=1) |
| print(neww) |
| print(np.sum(neww)) |
| with open("weights.pkl", "wb") as f: |
| pickle.dump(neww, f) |
|
|
| slider_updates = [] |
| for i in range(20): |
| slider_updates.append(gr.update(visible=False)) |
| return *slider_updates, gr.update(visible = False) |
|
|
|
|
|
|
| def get_top_10(similarities, user_inp, filterr, movies, pool): |
| top10_index = [] |
| order = np.argsort(similarities) |
| count= 0 |
| |
| for i in range(len(order)-1, -1, -1): |
| num = order[i] |
| if num not in user_inp: |
| ind = movies[num].index |
| if ((df.loc[ind, "vote_count"] > filterr.pop or df.loc[ind, "revenue"] > filterr.rev) and df.loc[ind, "vote_average"]> filterr.rat): |
| allowed = True |
| if (len(filterr.lang)>0 and df.loc[ind, "original_language"] not in filterr.lang): |
| allowed = False |
| if (filterr.no_adult and df.loc[ind, "adult"]): |
| allowed = False |
| if (filterr.date): |
| if type(df.loc[ind, "release_date"]) == str: |
| date = int(df.loc[ind, "release_date"][0:4]) |
| if (date<filterr.after or date>filterr.before): |
| allowed = False |
| else: |
| allowed = False |
| if (filterr.runtime): |
| runtime = df.loc[ind, "runtime"] |
| if (runtime<filterr.more or runtime>filterr.less): |
| allowed = False |
|
|
| if (allowed): |
| count+=1 |
| top10_index.append(order[i]) |
| if (count==pool): |
| return top10_index |
|
|
| def normal1(w): |
| w = np.asarray(w, dtype=float) |
| theta = (np.sum(w)-1) / 3 |
| print(theta) |
| return np.maximum(w - theta, 0.0) |
|
|
| def target(r): |
| return {1:0.10, 2:0.30, 3:0.50, 4:0.70, 5:0.9}[int(r)] |
|
|
| def update_one(w, x, r, eta): |
| x = np.asarray(x, float) |
| y = float(np.dot(w, x)) |
| grad = (y - r) * x |
| w_new = w * np.exp(-eta * grad) |
| return normal1(w_new) |
|
|
| def update_session(w, X_session, ratings, eta=0.2, passes=1): |
| w_cur = w.copy() |
| targets = np.array([target(r) for r in ratings], float) |
| for i in range(passes): |
| for x, r in zip(X_session, targets): |
| w_cur = update_one(w_cur, x, r, eta = eta) |
| return w_cur |
|
|
| def algo_type(algo, user): |
| if (algo == 1): |
| |
| similarities = np.exp(user / 0.5).mean(axis=0) |
|
|
| elif (algo == 2): |
| |
| similarities = user.mean(axis = 0) |
|
|
| elif (algo == 5): |
| |
| |
| similarities = user.shape[0] / np.sum(1 / (user), axis=0) |
|
|
| elif (algo == 4): |
| |
| |
| similarities = np.exp(np.mean(np.log(user), axis=0)) |
|
|
| else: |
| |
| arithmetic = user.mean(axis=0) |
| |
| geometric = np.exp(np.mean(np.log(user), axis=0)) |
| similarities = 0.5 * arithmetic + 0.5 * geometric |
| return similarities |
|
|
| user_inp = [] |
| user = [] |
| algo = 0 |
| rec_global = [] |
|
|
| def recommend(movies_list, langs, after, before, rating, rt_min, rt_max, no_adult, k, pop_list, mod): |
| global user_inp |
| global user |
| global algo |
| global rec_global |
| user_inp = [] |
| user = [] |
| algo = 0 |
| rec_global = [] |
| if not movies_list: |
| return [] |
| k = int(k) |
|
|
| from sklearn.metrics.pairwise import cosine_similarity |
| user_text=np.zeros(384) |
| user_genre=np.zeros(len(all_genres)) |
|
|
| movie_count = len(movies_list) |
| for user_movie in movies_list: |
| num = index_map[user_movie.lower()] |
| if isinstance(num, list): |
| num = num[0] |
| user_inp.append(num) |
|
|
| similarities_text = cosine_similarity(all_vectors_text, movies[num].text_vector.reshape(1, -1)).flatten() |
| similarities_genre = cosine_similarity(all_vectors_genre, movies[num].genre_vector.reshape(1, -1)).flatten() |
| year = int(df.loc[movies[num].index, "release_date"][0:4]) |
| diff = np.abs(all_vectors_year - year) |
| date = np.exp(-diff / 5) |
| similarities = similarities_text*w[0]+similarities_genre*w[1]+date*w[2] |
| similarities = (similarities+1)/2 |
| user.append(similarities) |
| user = np.array(user) |
|
|
| filterr = Filter() |
| if no_adult: |
| filterr.adult() |
| if langs: |
| filterr.add_lang(langs) |
| filterr.ratt(rating) |
| filterr.length(rt_max, rt_min) |
| |
| filterr.dates(after, before) |
|
|
|
|
| if "Unheard" in pop_list: |
| popul = 5 |
| elif "Hidden Gems" in pop_list: |
| popul = 4 |
| elif "Mid Tier" in pop_list: |
| popul = 3 |
| elif "Popular Picks" in pop_list: |
| popul = 2 |
| else: |
| popul = 1 |
|
|
| if (popul == 1): |
| filterr.popp(3000) |
| filterr.revenue(90000000) |
| elif (popul == 2): |
| filterr.popp(500) |
| filterr.revenue(5000000) |
| elif (popul == 3): |
| filterr.popp(90) |
| filterr.revenue(1000000) |
| elif (popul == 4): |
| filterr.popp(60) |
| filterr.revenue(100000) |
| else: |
| filterr.popp(10) |
| filterr.revenue(10000) |
|
|
| if (mod == "Spotlight Matches"): |
| algo = 1 |
| if (mod =="Strong Picks"): |
| algo = 2 |
| if mod == "Balanced Blend": |
| algo = 3 |
| if mod == "Common Ground": |
| algo = 4 |
| if mod == "Strong Agreement": |
| algo = 5 |
|
|
| similarities = algo_type(algo, user) |
|
|
| rec_global = get_top_10(similarities, user_inp, filterr, movies, k) |
|
|
| top10_movies = [movies[i].name for i in rec_global] |
| top10_scores = similarities[rec_global] |
| top10_overview = [movies[i].index for i in rec_global] |
|
|
| results = [ |
| [f"https://image.tmdb.org/t/p/w342{df.loc[top10_overview[i], 'poster_path']}", f"{i+1}. {df.loc[top10_overview[i], 'title']} ({df.loc[top10_overview[i], 'release_date'][:4]})"] |
| for i in range(len(top10_overview)) |
| ] |
|
|
| slider_updates = [] |
| for i in range(20): |
| if i < k: |
| slider_updates.append(gr.update(visible=True, label=f"Rate: {top10_movies[i]}")) |
| else: |
| slider_updates.append(gr.update(visible=False)) |
|
|
| return results, *slider_updates, gr.update(visible = True) |
|
|
|
|
|
|
|
|
|
|
| import gradio as gr |
|
|
| def add_item_mov(txt, items): |
| txt = (txt or "").strip() |
| if txt and txt not in items and txt.lower() in index_map: |
| items = items + [txt] |
| return items, gr.update(choices=items, value=[]), gr.update(value="") |
|
|
| def add_item(txt, items): |
| txt = (txt or "").strip() |
| if txt and txt not in items: |
| items = items + [txt] |
| return items, gr.update(choices=items, value=[]), gr.update(value="") |
|
|
| def remove_items(selected, items): |
| selected = selected or [] |
| items = [x for x in items if x not in selected] |
| return items, gr.update(choices=items, value=[]) |
|
|
| def clear_items(): |
| return [], gr.update(choices=[], value=[]) |
|
|
| def enforce(selected): |
| if "Unheard" in selected: |
| return ["Blockbusters", "Popular Picks", "Mid Tier", "Hidden Gems", "Unheard"] |
| if "Hidden Gems" in selected: |
| return ["Blockbusters", "Popular Picks", "Mid Tier", "Hidden Gems"] |
| if "Mid Tier" in selected: |
| return ["Blockbusters", "Popular Picks", "Mid Tier"] |
| if "Popular Picks" in selected: |
| return ["Blockbusters", "Popular Picks"] |
| return ["Blockbusters"] |
|
|
| def setup_ratings(titles): |
| updates = [] |
| for i, t in enumerate(titles): |
| updates.append(gr.update(label=f"Rate: {t}", visible=True, value=None)) |
| for _ in range(N - len(titles)): |
| updates.append(gr.update(visible=False)) |
| return updates |
|
|
|
|
| with gr.Blocks(title="Movie Recommender") as demo: |
| gr.Markdown("## Movie Recommender") |
|
|
| movies_list = gr.State([]) |
| langs = gr.State([]) |
| titles_state = gr.State([]) |
|
|
|
|
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### Movies") |
| m_in = gr.Textbox(placeholder="Add a movie", label = "Add Movie") |
| with gr.Row(): |
| m_add = gr.Button("Add", variant="primary") |
| m_clear = gr.Button("Clear") |
| m_list = gr.CheckboxGroup(choices=[], label="Current (select to remove)") |
| m_rm = gr.Button("Remove Selected") |
|
|
| gr.Markdown("### Filters") |
| with gr.Accordion("Filters", open=False): |
| with gr.Column(): |
| gr.Markdown("### Languages") |
| l_in = gr.Dropdown( |
| choices=[(name, code) for code, name in sorted(languages.items(), key=lambda x: x[1])], |
| label="Select language", |
| multiselect=False, |
| interactive=True, |
| ) |
| with gr.Row(): |
| l_add = gr.Button("Add", variant="primary") |
| l_clear = gr.Button("Clear") |
| l_list = gr.CheckboxGroup(choices=[], label="Languages (select to remove)") |
| l_rm = gr.Button("Remove Selected") |
|
|
|
|
|
|
| with gr.Row(): |
| after = gr.Number(label="After year", value=1900) |
| before = gr.Number(label="Before year", value=2025) |
| with gr.Row(): |
| rating = gr.Slider(0.0, 10.0, value=0.0, step=0.1, label="Min rating") |
| with gr.Row(): |
| rt_min = gr.Number(label="Min runtime (min)", value=0) |
| rt_max = gr.Number(label="Max runtime (min)", value=1000) |
|
|
| no_adult = gr.Checkbox(label="Exclude adult content", value=False) |
|
|
|
|
| gr.Markdown("### Recommendation Models") |
| with gr.Accordion("Recommendation Models", open=False): |
| with gr.Column(): |
| pop_list = gr.CheckboxGroup(choices=["Blockbusters", "Popular Picks", "Mid Tier", "Hidden Gems", "Unheard"], |
| value = ["Blockbusters", "Popular Picks"], |
| label="Which ones do you want to include?", interactive=True) |
|
|
| model = gr.Radio(choices=["Spotlight Matches", "Strong Picks", "Balanced Blend", "Common Ground", "Strong Agreement"], value = "Balanced Blend", label="Pick one", interactive=True) |
|
|
|
|
| gr.Markdown("---") |
| with gr.Row(): |
| k = gr.Slider(1, 20, value=10, step=1, label="How many recommendations?") |
| go = gr.Button("Get Recommendations", variant="primary") |
| gallery = gr.Gallery(columns=5, object_fit="contain") |
|
|
| sliders = [] |
| with gr.Column(): |
| for start in range(0, 20, 5): |
| with gr.Row(): |
| for i in range(start, start + 5): |
| s = gr.Dropdown(choices=["N/A", 1, 2, 3, 4, 5], value = "N/A", visible=False, interactive = True, label=f"Rate: {i+1}") |
| sliders.append(s) |
| save = gr.Button("Submit ratings", visible = False) |
|
|
|
|
|
|
|
|
| m_add.click(add_item_mov, [m_in, movies_list], [movies_list, m_list, m_in]) |
| m_in.submit(add_item_mov, [m_in, movies_list], [movies_list, m_list, m_in]) |
|
|
| m_rm.click(remove_items, [m_list, movies_list], [movies_list, m_list]) |
|
|
| m_clear.click(clear_items, None, [movies_list, m_list]) |
|
|
| l_add.click(add_item, [l_in, langs], [langs, l_list, l_in]) |
| l_rm.click(remove_items, [l_list, langs], [langs, l_list]) |
|
|
| l_clear.click(clear_items, None, [langs, l_list]) |
|
|
|
|
| pop_list.change(enforce, pop_list, pop_list) |
|
|
| go.click(recommend, [movies_list, langs, after, before, rating, rt_min, rt_max, no_adult, k, pop_list, model],[gallery, *sliders, save]) |
| save.click(ml, [movies_list, k, *sliders], [*sliders, save]) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(share = True) |
|
|
|
|