# # Pyserini: Reproducible IR research with sparse and dense representations # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import math from typing import List, Optional from sklearn.preprocessing import normalize from scipy.sparse import csr_matrix from pyserini import index, search from pyserini.analysis import Analyzer, get_lucene_analyzer from tqdm import tqdm class Vectorizer: """Base class for vectorizer implemented on top of Pyserini. Parameters ---------- lucene_index_path : str Path to lucene index folder min_df : int Minimum acceptable document frequency verbose : bool Whether to print out debugging information """ def __init__(self, lucene_index_path: str, min_df: int = 1, verbose: bool = False): self.min_df: int = min_df self.verbose: bool = verbose self.index_reader = index.IndexReader(lucene_index_path) self.searcher = search.LuceneSearcher(lucene_index_path) self.num_docs: int = self.searcher.num_docs self.stats = self.index_reader.stats() self.analyzer = Analyzer(get_lucene_analyzer()) # build vocabulary self.vocabulary_ = set() for term in self.index_reader.terms(): if term.df > self.min_df: self.vocabulary_.add(term.term) self.vocabulary_ = sorted(self.vocabulary_) # build term to index mapping self.term_to_index = {} for i, term in enumerate(self.vocabulary_): self.term_to_index[term] = i self.vocabulary_size = len(self.vocabulary_) if self.verbose: print(f'Found {self.vocabulary_size} terms with min_df={self.min_df}') def get_query_vector(self, query: str): matrix_row, matrix_col, matrix_data = [], [], [] tokens = self.analyzer.analyze(query) for term in tokens: if term in self.vocabulary_: matrix_row.append(0) matrix_col.append(self.term_to_index[term]) matrix_data.append(1) vectors = csr_matrix((matrix_data, (matrix_row, matrix_col)), shape=(1, self.vocabulary_size)) return vectors class TfidfVectorizer(Vectorizer): """Wrapper class for tf-idf vectorizer implemented on top of Pyserini. Parameters ---------- lucene_index_path : str Path to lucene index folder min_df : int Minimum acceptable document frequency verbose : bool Whether to print out debugging information """ def __init__(self, lucene_index_path: str, min_df: int = 1, verbose: bool = False): super().__init__(lucene_index_path, min_df, verbose) self.idf_ = {} for term in self.index_reader.terms(): self.idf_[term.term] = math.log(self.num_docs / term.df) def get_vectors(self, docids: List[str], norm: Optional[str] = 'l2'): """Get the tf-idf vectors given a list of docids Parameters ---------- norm : str Normalize the sparse matrix docids : List[str] The piece of text to analyze. Returns ------- csr_matrix Sparse matrix representation of tf-idf vectors """ matrix_row, matrix_col, matrix_data = [], [], [] num_docs = len(docids) for index, doc_id in enumerate(tqdm(docids)): # Term Frequency tf = self.index_reader.get_document_vector(doc_id) if tf is None: continue # Filter out in-eligible terms tf = {t: tf[t] for t in tf if t in self.term_to_index} # Convert from dict to sparse matrix for term in tf: tfidf = tf[term] * self.idf_[term] matrix_row.append(index) matrix_col.append(self.term_to_index[term]) matrix_data.append(tfidf) vectors = csr_matrix((matrix_data, (matrix_row, matrix_col)), shape=(num_docs, self.vocabulary_size)) if norm: return normalize(vectors, norm=norm) return vectors class BM25Vectorizer(Vectorizer): """Wrapper class for BM25 vectorizer implemented on top of Pyserini. Parameters ---------- lucene_index_path : str Path to lucene index folder min_df : int Minimum acceptable document frequency verbose : bool Whether to print out debugging information """ def __init__(self, lucene_index_path: str, min_df: int = 1, verbose: bool = False): super().__init__(lucene_index_path, min_df, verbose) def get_vectors(self, docids: List[str], norm: Optional[str] = 'l2'): """Get the BM25 vectors given a list of docids Parameters ---------- norm : str Normalize the sparse matrix docids : List[str] The piece of text to analyze. Returns ------- csr_matrix Sparse matrix representation of BM25 vectors """ matrix_row, matrix_col, matrix_data = [], [], [] num_docs = len(docids) for index, doc_id in enumerate(tqdm(docids)): # Term Frequency tf = self.index_reader.get_document_vector(doc_id) if tf is None: continue # Filter out in-eligible terms tf = {t: tf[t] for t in tf if t in self.term_to_index} # Convert from dict to sparse matrix for term in tf: bm25_weight = self.index_reader.compute_bm25_term_weight(doc_id, term, analyzer=None) matrix_row.append(index) matrix_col.append(self.term_to_index[term]) matrix_data.append(bm25_weight) vectors = csr_matrix((matrix_data, (matrix_row, matrix_col)), shape=(num_docs, self.vocabulary_size)) if norm: return normalize(vectors, norm=norm) return vectors