File size: 4,269 Bytes
d6585f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#
# Pyserini: Reproducible IR research with sparse and dense representations
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import enum
import importlib
import os
import uuid
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from typing import List


class ClassifierType(enum.Enum):
    LR = 'lr'
    SVM = 'svm'


class FusionMethod(enum.Enum):
    AVG = 'avg'


class PseudoRelevanceClassifierReranker:
    def __init__(self, lucene_index: str, vectorizer_class: str, clf_type: List[ClassifierType], r=10, n=100, alpha=0.5):
        self.r = r
        self.n = n
        self.alpha = alpha
        self.clf_type = clf_type

        # get vectorizer
        module = importlib.import_module("pyserini.vectorizer")
        VectorizerClass = getattr(module, vectorizer_class)
        self.vectorizer = VectorizerClass(lucene_index, min_df=5)

        if len(clf_type) > 2:
            raise Exception('Re-ranker takes at most two classifiers')

    def _set_classifier(self, clf_type: ClassifierType):
        if clf_type == ClassifierType.LR:
            self.clf = LogisticRegression(random_state=42)
        elif clf_type == ClassifierType.SVM:
            self.clf = SVC(kernel='linear', probability=True, random_state=42)
        else:
            raise Exception("Invalid classifier type")

    def _get_prf_vectors(self, doc_ids: List[str]):
        train_docs = doc_ids[:self.r] + doc_ids[-self.n:]
        train_labels = [1] * self.r + [0] * self.n

        train_vecs = self.vectorizer.get_vectors(train_docs)
        test_vecs = self.vectorizer.get_vectors(doc_ids)

        return train_vecs, train_labels, test_vecs

    def _rerank_with_classifier(self, doc_ids: List[str], search_scores: List[float]):
        train_vecs, train_labels, test_vecs = self._get_prf_vectors(doc_ids)

        # classification
        self.clf.fit(train_vecs, train_labels)
        pred = self.clf.predict_proba(test_vecs)
        classifier_scores = self._normalize([p[1] for p in pred])
        search_scores = self._normalize(search_scores)

        # interpolation
        interpolated_scores = [a * self.alpha + b * (1-self.alpha) for a, b in zip(classifier_scores, search_scores)]

        return self._sort_dual_list(interpolated_scores, doc_ids)

    def rerank(self, doc_ids: List[str], search_scores: List[float]):
        # one classifier
        if len(self.clf_type) == 1:
            self._set_classifier(self.clf_type[0])
            return self._rerank_with_classifier(doc_ids, search_scores)

        # two classifier with FusionMethod.AVG
        doc_score_dict = {}
        for i in range(2):
            self._set_classifier(self.clf_type[i])
            i_scores, i_doc_ids = self._rerank_with_classifier(doc_ids, search_scores)

            for score, doc_id in zip(i_scores, i_doc_ids):
                if doc_id not in doc_score_dict:
                    doc_score_dict[doc_id] = set()
                doc_score_dict[doc_id].add(score)

        r_scores, r_doc_ids = [], []
        for doc_id, score in doc_score_dict.items():
            avg = sum(score) / len(score)
            r_doc_ids.append(doc_id)
            r_scores.append(avg)

        return r_scores, r_doc_ids

    def _normalize(self, scores: List[float]):
        low = min(scores)
        high = max(scores)
        width = high - low

        return [(s-low)/width for s in scores]

    # sort both list in decreasing order by using the list1 to compare
    def _sort_dual_list(self, list1, list2):
        zipped_lists = zip(list1, list2)
        sorted_pairs = sorted(zipped_lists)

        tuples = zip(*sorted_pairs)
        list1, list2 = [list(tuple) for tuple in tuples]

        list1.reverse()
        list2.reverse()
        return list1, list2