Spaces:
Runtime error
Runtime error
# | |
# Pyserini: Python interface to the Anserini IR toolkit built on Lucene | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
import json | |
import os | |
from dataclasses import dataclass | |
from typing import Dict, List | |
import nmslib | |
import numpy as np | |
from scipy.sparse import csr_matrix, vstack | |
class SearchResult: | |
docid: str | |
score: float | |
class NmslibSearcher: | |
"""Simple Searcher for vector representation | |
""" | |
def __init__(self, index_dir: str, ef_search: int = 1000, is_sparse=False): | |
self.is_sparse = is_sparse | |
self.index, self.docids, self.token2id, self.metadata = self._load_index(index_dir, self.is_sparse) | |
self.index.setQueryTimeParams({'efSearch': ef_search}) | |
self.dimension = len(self.token2id) if self.is_sparse else None | |
def search(self, query, k: int = 10) -> List[SearchResult]: | |
"""Search the collection. | |
Parameters | |
---------- | |
query : query vector | |
k : int | |
Number of hits to return. | |
threads : int | |
Maximum number of threads to use for intra-query search. | |
Returns | |
------- | |
List[SearchResult] | |
List of search results. | |
""" | |
if self.is_sparse: | |
query = self._token_dict_to_sparse_vector(query) | |
else: | |
query = np.array([query]) | |
indexes, scores = self.index.knnQueryBatch(query, k=k, num_threads=1)[0] | |
return [SearchResult(self.docids[idx], -score) | |
for score, idx in zip(scores, indexes) if idx != -1] | |
def batch_search(self, queries, q_ids: List[str], k: int = 10, threads: int = 1) \ | |
-> Dict[str, List[SearchResult]]: | |
""" | |
Parameters | |
---------- | |
queries : vectors | |
q_ids : List[str] | |
List of corresponding query ids. | |
k : int | |
Number of hits to return. | |
threads : int | |
Maximum number of threads to use. | |
Returns | |
------- | |
Dict[str, List[SearchResult]] | |
Dictionary holding the search results, with the query ids as keys and the corresponding lists of search | |
results as the values. | |
""" | |
if self.is_sparse: | |
queries = [self._token_dict_to_sparse_vector(query) for query in queries] | |
queries = vstack(queries) | |
else: | |
queries = np.array(queries) | |
I, D = zip(*self.index.knnQueryBatch(queries, k=k, num_threads=threads)) | |
return {key: [SearchResult(self.docids[idx], -score) | |
for score, idx in zip(distances, indexes) if idx != -1] | |
for key, distances, indexes in zip(q_ids, D, I)} | |
def _load_index(self, index_dir: str, is_sparse: bool): | |
if is_sparse: | |
index = nmslib.init(method='hnsw', space='negdotprod_sparse', data_type=nmslib.DataType.SPARSE_VECTOR) | |
else: | |
index = nmslib.init(method='hnsw', space='negdotprod', data_type=nmslib.DataType.DENSE_VECTOR) | |
index_path = os.path.join(index_dir, 'index.bin') | |
docid_path = os.path.join(index_dir, 'docid') | |
tokens_path = os.path.join(index_dir, 'tokens') | |
metadata_path = os.path.join(index_dir, 'meta') | |
index.loadIndex(index_path, load_data=True) | |
docids = self._load_docids(docid_path) | |
token2id = self._load_tokens(tokens_path) | |
metadata = self._load_metadata(metadata_path) | |
return index, docids, token2id, metadata | |
def _token_dict_to_sparse_vector(self, token_dict): | |
matrix_row, matrix_col, matrix_data = [], [], [] | |
tokens = token_dict.keys() | |
col = [] | |
data = [] | |
for tok in tokens: | |
if tok in self.token2id: | |
col.append(self.token2id[tok]) | |
data.append(token_dict[tok]) | |
matrix_row.extend([0] * len(col)) | |
matrix_col.extend(col) | |
matrix_data.extend(data) | |
vector = csr_matrix((matrix_data, (matrix_row, matrix_col)), shape=(1, self.dimension)) | |
return vector | |
def _load_docids(docid_path: str) -> List[str]: | |
docids = [line.rstrip() for line in open(docid_path, 'r').readlines()] | |
return docids | |
def _load_tokens(tokens_path: str): | |
if not os.path.exists(tokens_path): | |
return None | |
tokens = [line.rstrip() for line in open(tokens_path, 'r').readlines()] | |
return dict(zip(tokens, range(len(tokens)))) | |
def _load_metadata(metadata_path): | |
if not os.path.exists(metadata_path): | |
return None | |
meta = json.load(open(metadata_path)) | |
return meta | |