Spaces:
Runtime error
Runtime error
# | |
# Pyserini: Reproducible IR research with sparse and dense representations | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
import enum | |
import importlib | |
import os | |
import uuid | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.svm import SVC | |
from typing import List | |
class ClassifierType(enum.Enum): | |
LR = 'lr' | |
SVM = 'svm' | |
class FusionMethod(enum.Enum): | |
AVG = 'avg' | |
class PseudoRelevanceClassifierReranker: | |
def __init__(self, lucene_index: str, vectorizer_class: str, clf_type: List[ClassifierType], r=10, n=100, alpha=0.5): | |
self.r = r | |
self.n = n | |
self.alpha = alpha | |
self.clf_type = clf_type | |
# get vectorizer | |
module = importlib.import_module("pyserini.vectorizer") | |
VectorizerClass = getattr(module, vectorizer_class) | |
self.vectorizer = VectorizerClass(lucene_index, min_df=5) | |
if len(clf_type) > 2: | |
raise Exception('Re-ranker takes at most two classifiers') | |
def _set_classifier(self, clf_type: ClassifierType): | |
if clf_type == ClassifierType.LR: | |
self.clf = LogisticRegression(random_state=42) | |
elif clf_type == ClassifierType.SVM: | |
self.clf = SVC(kernel='linear', probability=True, random_state=42) | |
else: | |
raise Exception("Invalid classifier type") | |
def _get_prf_vectors(self, doc_ids: List[str]): | |
train_docs = doc_ids[:self.r] + doc_ids[-self.n:] | |
train_labels = [1] * self.r + [0] * self.n | |
train_vecs = self.vectorizer.get_vectors(train_docs) | |
test_vecs = self.vectorizer.get_vectors(doc_ids) | |
return train_vecs, train_labels, test_vecs | |
def _rerank_with_classifier(self, doc_ids: List[str], search_scores: List[float]): | |
train_vecs, train_labels, test_vecs = self._get_prf_vectors(doc_ids) | |
# classification | |
self.clf.fit(train_vecs, train_labels) | |
pred = self.clf.predict_proba(test_vecs) | |
classifier_scores = self._normalize([p[1] for p in pred]) | |
search_scores = self._normalize(search_scores) | |
# interpolation | |
interpolated_scores = [a * self.alpha + b * (1-self.alpha) for a, b in zip(classifier_scores, search_scores)] | |
return self._sort_dual_list(interpolated_scores, doc_ids) | |
def rerank(self, doc_ids: List[str], search_scores: List[float]): | |
# one classifier | |
if len(self.clf_type) == 1: | |
self._set_classifier(self.clf_type[0]) | |
return self._rerank_with_classifier(doc_ids, search_scores) | |
# two classifier with FusionMethod.AVG | |
doc_score_dict = {} | |
for i in range(2): | |
self._set_classifier(self.clf_type[i]) | |
i_scores, i_doc_ids = self._rerank_with_classifier(doc_ids, search_scores) | |
for score, doc_id in zip(i_scores, i_doc_ids): | |
if doc_id not in doc_score_dict: | |
doc_score_dict[doc_id] = set() | |
doc_score_dict[doc_id].add(score) | |
r_scores, r_doc_ids = [], [] | |
for doc_id, score in doc_score_dict.items(): | |
avg = sum(score) / len(score) | |
r_doc_ids.append(doc_id) | |
r_scores.append(avg) | |
return r_scores, r_doc_ids | |
def _normalize(self, scores: List[float]): | |
low = min(scores) | |
high = max(scores) | |
width = high - low | |
return [(s-low)/width for s in scores] | |
# sort both list in decreasing order by using the list1 to compare | |
def _sort_dual_list(self, list1, list2): | |
zipped_lists = zip(list1, list2) | |
sorted_pairs = sorted(zipped_lists) | |
tuples = zip(*sorted_pairs) | |
list1, list2 = [list(tuple) for tuple in tuples] | |
list1.reverse() | |
list2.reverse() | |
return list1, list2 | |