In [1]:
from transformers import DistilBertTokenizer, DistilBertModel, \
 BertTokenizer, BertModel, \
 RobertaTokenizer, RobertaModel, \
 AutoTokenizer, AutoModelForMaskedLM
import gradio as gr
import pandas as pd
import numpy as np
import torch
from typing import List, Tuple
from sklearn.cluster import KMeans

 from .autonotebook import tqdm as notebook_tqdm


In [2]:
# global variables
encoder_options = [
 'distilbert-base-uncased',
 'bert-base-uncased',
 'bert-base-cased'
 'roberta-base',
 'xlm-roberta-base',
 ]

current_encoder = encoder_options[0]
tokenizer = None
model = None

genres = pd.read_csv("./all_genres.csv")
genres = genres["genre"].to_list()

In [3]:
if current_encoder == 'distilbert-base-uncased':
 tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
 model = DistilBertModel.from_pretrained('distilbert-base-uncased')
elif current_encoder == 'bert-base-uncased':
 tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
 model = BertModel.from_pretrained('bert-base-uncased')
elif current_encoder == 'bert-base-cased':
 tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
 model = BertModel.from_pretrained('bert-base-cased')
elif current_encoder == 'roberta-base':
 tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
 model = RobertaModel.from_pretrained('roberta-base')
elif current_encoder == 'xlm-roberta-base':
 tokenizer = AutoTokenizer.from_pretrained('xlm-roberta-base')
 model = AutoModelForMaskedLM.from_pretrained('xlm-roberta-base')

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertModel: ['vocab_projector.bias', 'vocab_layer_norm.weight', 'vocab_transform.weight', 'vocab_layer_norm.bias', 'vocab_transform.bias', 'vocab_projector.weight']
- This IS expected if you are initializing DistilBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [10]:
def embed_string() -> np.ndarray:
 output = []
 for text in genres:
 encoded_input = tokenizer(text, return_tensors='pt')
 # forward pass
 new_output = model(**encoded_input)
 to_append = new_output.last_hidden_state
 to_append = to_append[:, -1, :] #Take the last element
 to_append = to_append.flatten().detach().cpu().numpy()
 output.append(to_append)
 np_output = np.zeros((len(output), output[0].shape[0]))
 for i, vector in enumerate(output):
 np_output[i, :] = vector
 return np_output

In [5]:
def gen_clusters(input_strs:np.ndarray, num_clusters:int) -> Tuple[KMeans, np.ndarray, float]:
 clustering_algo = KMeans(n_clusters=num_clusters)
 predicted_labels = clustering_algo.fit_predict(input_strs)

 cluster_error = 0.0
 for i, predicted_label in enumerate(predicted_labels):
 predicted_center = clustering_algo.cluster_centers_[predicted_label, :]
 new_error = np.sqrt(np.sum(np.square(predicted_center, input_strs[i])))
 cluster_error += new_error

 return clustering_algo, predicted_labels, cluster_error



In [16]:
def view_clusters(predicted_clusters:np.ndarray) -> pd.DataFrame:
 mappings = dict()
 for predicted_cluster, movie in zip(predicted_clusters, genres):
 curr_mapping = mappings.get(predicted_cluster, [])
 curr_mapping.append(movie)
 mappings[predicted_cluster] = curr_mapping

 output_df = pd.DataFrame()
 max_len = max([len(x) for x in mappings.values()])
 max_cluster = max(predicted_clusters)

 for i in range(max_cluster + 1):
 new_column_name = f"cluster_{i}"
 new_column_data = mappings[i]
 new_column_data.extend([''] * (max_len - len(new_column_data)))
 output_df[new_column_name] = new_column_data

 return output_df

In [17]:
def add_new_genre(clustering_algo:KMeans, new_genre:str, recompute:bool = False) -> pd.DataFrame:
 global genres
 genres.append(new_genre)
 embedded_genres = embed_string()
 if recompute:
 cluster_algo, cluster_centers, error = gen_clusters(embedded_genres, 5)
 else:
 cluster_centers = cluster_algo.predict(embedded_genres)
 
 ouput_df = view_clusters(cluster_centers)
 return ouput_df
 

In [18]:
embedded_genres = embed_string()
clustering_algo, predicted_labels, cluster_error = gen_clusters(embedded_genres, 5)
output_df = view_clusters(predicted_labels)