|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import json |
|
|
import pickle |
|
|
import io |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
class MovieRecommender: |
|
|
def __init__(self, model_path="."): |
|
|
self.embeddings = np.load(f"{model_path}/embeddings.npy") |
|
|
self.embeddings = np.nan_to_num(self.embeddings) |
|
|
|
|
|
|
|
|
try: |
|
|
with open(f"{model_path}/tokenizer_vocab.json", "r") as f: |
|
|
self.tokenizer = json.load(f) |
|
|
except FileNotFoundError: |
|
|
|
|
|
self.tokenizer = self._extract_vocab_from_pickle(f"{model_path}/tokenizer.pkl") |
|
|
|
|
|
with open(f"{model_path}/tokenizer_vocab.json", "w") as f: |
|
|
json.dump(self.tokenizer, f) |
|
|
|
|
|
self.movies = pd.read_json(f"{model_path}/movies.json") |
|
|
|
|
|
def _extract_vocab_from_pickle(self, filepath): |
|
|
"""Extract vocabulary dictionary from pickle file by analyzing its structure""" |
|
|
with open(filepath, "rb") as f: |
|
|
pickle_data = f.read() |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
unpickler = pickle.Unpickler(io.BytesIO(pickle_data)) |
|
|
|
|
|
unpickler.find_class = lambda module, name: dict |
|
|
try: |
|
|
result = unpickler.load() |
|
|
if isinstance(result, dict): |
|
|
return result |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
memo = {} |
|
|
stack = [] |
|
|
|
|
|
|
|
|
import pickletools |
|
|
ops = [] |
|
|
for opcode, arg, pos in pickletools.genops(pickle_data): |
|
|
ops.append((opcode.name, arg)) |
|
|
|
|
|
|
|
|
for i, (op, arg) in enumerate(ops): |
|
|
if op == 'EMPTY_DICT' or op == 'DICT': |
|
|
|
|
|
try: |
|
|
|
|
|
subset = pickle_data[:pos+10] |
|
|
test_unpickler = pickle.Unpickler(io.BytesIO(subset)) |
|
|
test_unpickler.find_class = lambda m, n: None |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
print("Warning: Could not extract vocabulary from pickle. Using empty tokenizer.") |
|
|
print("Recommendation quality will be limited.") |
|
|
return {} |
|
|
|
|
|
def _encode(self, prompt): |
|
|
tokens = prompt.lower().split()[:32] |
|
|
ids = [self.tokenizer.get(t, 0) for t in tokens] |
|
|
ids = [i if i < len(self.embeddings) else 0 for i in ids] |
|
|
return np.array(ids)[None,:] |
|
|
|
|
|
def recommend(self, prompt, topk=10): |
|
|
q_ids = self.tokenizer.texts_to_sequences([prompt])[0] |
|
|
q_ids = [i for i in q_ids if 0 <= i < len(self.embeddings)] |
|
|
q_ids = np.array(q_ids, dtype=np.int64) |
|
|
query_vec = self.embeddings[q_ids].mean(axis=0, keepdims=True) |
|
|
sims = cosine_similarity(query_vec, self.embeddings).flatten() |
|
|
idx = sims.argsort()[::-1][:topk] |
|
|
return self.movies.iloc[idx][["title","release_date","vote_average","vote_count","status"]] |
|
|
|