|
from transformers import AutoModel, AutoTokenizer |
|
import pandas as pd |
|
import torch |
|
from torch.utils.data import Dataset |
|
import logging |
|
from tqdm import tqdm |
|
from torch.utils.data import DataLoader |
|
import torch.nn.functional as F |
|
import pickle |
|
import string |
|
from abc import abstractmethod |
|
import json |
|
|
|
|
|
class AbstractMoviesRanker: |
|
"""Abstract class for ranking items""" |
|
def __init__(self, df, index_matrix, score_name = "score"): |
|
self.df = df |
|
self.ids = self.df.index.values |
|
self.index_matrix = index_matrix |
|
self.score_name = score_name |
|
|
|
@abstractmethod |
|
def encode_query(self, query): |
|
pass |
|
|
|
def get_scores(self, encoded_query): |
|
return torch.mm(encoded_query, self.index_matrix.transpose(0,1))[0].tolist() |
|
|
|
def get_top_ids(self, scores, topn=6): |
|
ids_scores_pairs = list(zip(self.ids.tolist(), scores)) |
|
ids_scores_pairs = sorted(ids_scores_pairs, key = lambda x:x[1], reverse = True) |
|
sorted_ids = [v[0] for v in ids_scores_pairs] |
|
sorted_scores = [v[1] for v in ids_scores_pairs] |
|
sorted_df = self.df.loc[sorted_ids[:topn], :] |
|
sorted_df.loc[:,self.score_name] = sorted_scores[:topn] |
|
return sorted_df |
|
|
|
def run_query(self, query, topn=6): |
|
encoded_query = self.encode_query(query) |
|
scores = self.get_scores(encoded_query) |
|
return self.get_top_ids(scores, topn) |
|
|
|
depunctuate = staticmethod(lambda x: x.translate(str.maketrans('','',string.punctuation))) |
|
|
|
class SparseTfIdfRanker(AbstractMoviesRanker): |
|
"""Sparse Ranking via TF iDF""" |
|
def __init__(self, df, index_matrix, vectorizer_path): |
|
super(SparseTfIdfRanker, self).__init__(df, index_matrix, score_name = 'tfidf-score') |
|
self.vectorizer = pickle.load(open(vectorizer_path, 'rb')) |
|
self.index_matrix = self.index_matrix.to_dense() |
|
|
|
def encode_query(self, query): |
|
encoded_query = torch.tensor(self.vectorizer.transform([self.depunctuate(query)]).todense(), dtype = torch.float32) |
|
return F.normalize(encoded_query, p=2) |
|
|
|
|
|
class BertRanker(AbstractMoviesRanker): |
|
"""Dense Ranking with embedding matrix""" |
|
def __init__(self, df, index_matrix, modelpath): |
|
super(BertRanker, self).__init__(df, index_matrix, score_name = "bert-score") |
|
self.tokenizer = AutoTokenizer.from_pretrained(modelpath) |
|
self.model = AutoModel.from_pretrained(modelpath) |
|
|
|
def encode_query(self, query): |
|
tok_q = self.tokenizer(query, return_tensors="pt", padding="max_length", max_length = 128, truncation=True) |
|
o = self.model(**tok_q) |
|
encoded_query = self.mean_pooling(o, tok_q['attention_mask']) |
|
return F.normalize(encoded_query, p=2) |
|
|
|
@staticmethod |
|
def mean_pooling(model_output, attention_mask): |
|
token_embeddings = model_output.last_hidden_state |
|
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() |
|
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) |
|
|
|
|
|
|
|
class SparseDenseMoviesRanker(): |
|
"""Sparse Ranking via TF iDF, filtering a first rank, then dense ranking on these items""" |
|
def __init__(self, df, modelpath, bert_index, sparse_index, vectorizer_path): |
|
self.df =df |
|
self.ids = self.df.index.values |
|
self.tfidf_engine = SparseTfIdfRanker(df, sparse_index, vectorizer_path) |
|
self.modelpath = modelpath |
|
self.bert_index = bert_index |
|
|
|
def run_query(self, query, topn=6, first_ranking=1000): |
|
tfidf_sorted_frame = self.tfidf_engine.run_query(query, topn=first_ranking) |
|
firstranking_index = self.bert_index[tfidf_sorted_frame.index.values] |
|
self.bert_engine = BertRanker(tfidf_sorted_frame, firstranking_index, self.modelpath) |
|
bert_sorted_frame = self.bert_engine.run_query(query, topn=topn) |
|
return bert_sorted_frame |
|
|
|
@classmethod |
|
def from_json_config(cls, jsonfile): |
|
with open(jsonfile) as fp: |
|
conf = json.loads(fp.read()) |
|
|
|
|
|
df = pd.read_pickle(conf['dataframe']) |
|
|
|
|
|
bert_index = torch.load(conf['bert_index']) |
|
sparse_index = torch.load(conf['sparse_index']) |
|
vectorizer_path = conf['vectorizer_path'] |
|
modelpath = conf['modelpath'] |
|
|
|
|
|
firstranking = conf.get('firstranking', 100) |
|
ranker = cls(df, modelpath, bert_index, sparse_index, vectorizer_path) |
|
return ranker |
|
|
|
|
|
if __name__=='__main__': |
|
|
|
engine = SparseDenseMoviesRanker.from_json_config('conf.json') |
|
|
|
for query in ["une histoire de pirates et de chasse au trésor", "une histoire de gangsters avec de l'argent"]: |
|
print(query) |
|
final_df = engine.run_query(query) |
|
print(final_df.head()) |
|
|