NLP1 / app.py
VagoX1's picture
Update app.py
59d0e3b verified
from __future__ import annotations
from dataclasses import dataclass
import pickle
import os
from typing import Iterable, Callable, List, Dict, Optional, Type, TypeVar
from nlp4web_codebase.ir.data_loaders.dm import Document
from collections import Counter
import tqdm
import re
import nltk
nltk.download("stopwords", quiet=True)
from nltk.corpus import stopwords as nltk_stopwords
LANGUAGE = "english"
word_splitter = re.compile(r"(?u)\b\w\w+\b").findall
stopwords = set(nltk_stopwords.words(LANGUAGE))
def word_splitting(text: str) -> List[str]:
return word_splitter(text.lower())
def lemmatization(words: List[str]) -> List[str]:
return words # We ignore lemmatization here for simplicity
def simple_tokenize(text: str) -> List[str]:
words = word_splitting(text)
tokenized = list(filter(lambda w: w not in stopwords, words))
tokenized = lemmatization(tokenized)
return tokenized
T = TypeVar("T", bound="InvertedIndex")
@dataclass
class PostingList:
term: str # The term
docid_postings: List[int] # docid_postings[i] means the docid (int) of the i-th associated posting
tweight_postings: List[float] # tweight_postings[i] means the term weight (float) of the i-th associated posting
@dataclass
class InvertedIndex:
posting_lists: List[PostingList] # docid -> posting_list
vocab: Dict[str, int]
cid2docid: Dict[str, int] # collection_id -> docid
collection_ids: List[str] # docid -> collection_id
doc_texts: Optional[List[str]] = None # docid -> document text
def save(self, output_dir: str) -> None:
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "index.pkl"), "wb") as f:
pickle.dump(self, f)
@classmethod
def from_saved(cls: Type[T], saved_dir: str) -> T:
index = cls(
posting_lists=[], vocab={}, cid2docid={}, collection_ids=[], doc_texts=None
)
with open(os.path.join(saved_dir, "index.pkl"), "rb") as f:
index = pickle.load(f)
return index
# The output of the counting function:
@dataclass
class Counting:
posting_lists: List[PostingList]
vocab: Dict[str, int]
cid2docid: Dict[str, int]
collection_ids: List[str]
dfs: List[int] # tid -> df
dls: List[int] # docid -> doc length
avgdl: float
nterms: int
doc_texts: Optional[List[str]] = None
def run_counting(
documents: Iterable[Document],
tokenize_fn: Callable[[str], List[str]] = simple_tokenize,
store_raw: bool = True, # store the document text in doc_texts
ndocs: Optional[int] = None,
show_progress_bar: bool = True,
) -> Counting:
"""Counting TFs, DFs, doc_lengths, etc."""
posting_lists: List[PostingList] = []
vocab: Dict[str, int] = {}
cid2docid: Dict[str, int] = {}
collection_ids: List[str] = []
dfs: List[int] = [] # tid -> df
dls: List[int] = [] # docid -> doc length
nterms: int = 0
doc_texts: Optional[List[str]] = []
for doc in tqdm.tqdm(
documents,
desc="Counting",
total=ndocs,
disable=not show_progress_bar,
):
if doc.collection_id in cid2docid:
continue
collection_ids.append(doc.collection_id)
docid = cid2docid.setdefault(doc.collection_id, len(cid2docid))
toks = tokenize_fn(doc.text)
tok2tf = Counter(toks)
dls.append(sum(tok2tf.values()))
for tok, tf in tok2tf.items():
nterms += tf
tid = vocab.get(tok, None)
if tid is None:
posting_lists.append(
PostingList(term=tok, docid_postings=[], tweight_postings=[])
)
tid = vocab.setdefault(tok, len(vocab))
posting_lists[tid].docid_postings.append(docid)
posting_lists[tid].tweight_postings.append(tf)
if tid < len(dfs):
dfs[tid] += 1
else:
dfs.append(0)
if store_raw:
doc_texts.append(doc.text)
else:
doc_texts = None
return Counting(
posting_lists=posting_lists,
vocab=vocab,
cid2docid=cid2docid,
collection_ids=collection_ids,
dfs=dfs,
dls=dls,
avgdl=sum(dls) / len(dls),
nterms=nterms,
doc_texts=doc_texts,
)
from nlp4web_codebase.ir.data_loaders.sciq import load_sciq
sciq = load_sciq()
counting = run_counting(documents=iter(sciq.corpus), ndocs=len(sciq.corpus))
from dataclasses import asdict, dataclass
import math
import os
from typing import Iterable, List, Optional, Type
import tqdm
from nlp4web_codebase.ir.data_loaders.dm import Document
@dataclass
class BM25Index(InvertedIndex):
@staticmethod
def tokenize(text: str) -> List[str]:
return simple_tokenize(text)
@staticmethod
def cache_term_weights(
posting_lists: List[PostingList],
total_docs: int,
avgdl: float,
dfs: List[int],
dls: List[int],
k1: float,
b: float,
) -> None:
"""Compute term weights and caching"""
N = total_docs
for tid, posting_list in enumerate(
tqdm.tqdm(posting_lists, desc="Regularizing TFs")
):
idf = BM25Index.calc_idf(df=dfs[tid], N=N)
for i in range(len(posting_list.docid_postings)):
docid = posting_list.docid_postings[i]
tf = posting_list.tweight_postings[i]
dl = dls[docid]
regularized_tf = BM25Index.calc_regularized_tf(
tf=tf, dl=dl, avgdl=avgdl, k1=k1, b=b
)
posting_list.tweight_postings[i] = regularized_tf * idf
@staticmethod
def calc_regularized_tf(
tf: int, dl: float, avgdl: float, k1: float, b: float
) -> float:
return tf / (tf + k1 * (1 - b + b * dl / avgdl))
@staticmethod
def calc_idf(df: int, N: int):
return math.log(1 + (N - df + 0.5) / (df + 0.5))
@classmethod
def build_from_documents(
cls: Type[BM25Index],
documents: Iterable[Document],
store_raw: bool = True,
output_dir: Optional[str] = None,
ndocs: Optional[int] = None,
show_progress_bar: bool = True,
k1: float = 0.9,
b: float = 0.4,
) -> BM25Index:
# Counting TFs, DFs, doc_lengths, etc.:
counting = run_counting(
documents=documents,
tokenize_fn=BM25Index.tokenize,
store_raw=store_raw,
ndocs=ndocs,
show_progress_bar=show_progress_bar,
)
# Compute term weights and caching:
posting_lists = counting.posting_lists
total_docs = len(counting.cid2docid)
BM25Index.cache_term_weights(
posting_lists=posting_lists,
total_docs=total_docs,
avgdl=counting.avgdl,
dfs=counting.dfs,
dls=counting.dls,
k1=k1,
b=b,
)
# Assembly and save:
index = BM25Index(
posting_lists=posting_lists,
vocab=counting.vocab,
cid2docid=counting.cid2docid,
collection_ids=counting.collection_ids,
doc_texts=counting.doc_texts,
)
return index
bm25_index = BM25Index.build_from_documents(
documents=iter(sciq.corpus),
ndocs=12160,
show_progress_bar=True,
)
bm25_index.save("output/bm25_index")
from nlp4web_codebase.ir.models import BaseRetriever
from typing import Type
from abc import abstractmethod
class BaseInvertedIndexRetriever(BaseRetriever):
@property
@abstractmethod
def index_class(self) -> Type[InvertedIndex]:
pass
def __init__(self, index_dir: str) -> None:
self.index = self.index_class.from_saved(index_dir)
def get_term_weights(self, query: str, cid: str) -> Dict[str, float]:
toks = self.index.tokenize(query)
target_docid = self.index.cid2docid[cid]
term_weights = {}
for tok in toks:
if tok not in self.index.vocab:
continue
tid = self.index.vocab[tok]
posting_list = self.index.posting_lists[tid]
for docid, tweight in zip(
posting_list.docid_postings, posting_list.tweight_postings
):
if docid == target_docid:
term_weights[tok] = tweight
break
return term_weights
def score(self, query: str, cid: str) -> float:
return sum(self.get_term_weights(query=query, cid=cid).values())
def retrieve(self, query: str, topk: int = 10) -> Dict[str, float]:
toks = self.index.tokenize(query)
docid2score: Dict[int, float] = {}
for tok in toks:
if tok not in self.index.vocab:
continue
tid = self.index.vocab[tok]
posting_list = self.index.posting_lists[tid]
for docid, tweight in zip(
posting_list.docid_postings, posting_list.tweight_postings
):
docid2score.setdefault(docid, 0)
docid2score[docid] += tweight
docid2score = dict(
sorted(docid2score.items(), key=lambda pair: pair[1], reverse=True)[:topk]
)
return {
self.index.collection_ids[docid]: score
for docid, score in docid2score.items()
}
class BM25Retriever(BaseInvertedIndexRetriever):
@property
def index_class(self) -> Type[BM25Index]:
return BM25Index
from nlp4web_codebase.ir.data_loaders import Split
import pytrec_eval
import numpy as np
def evaluate_map(rankings: Dict[str, Dict[str, float]], split=Split.dev) -> float:
metric = "map_cut_10"
qrels = sciq.get_qrels_dict(split)
evaluator = pytrec_eval.RelevanceEvaluator(sciq.get_qrels_dict(split), (metric,))
qps = evaluator.evaluate(rankings)
return float(np.mean([qp[metric] for qp in qps.values()]))
# Loading dataset:
from nlp4web_codebase.ir.data_loaders.sciq import load_sciq
sciq = load_sciq()
counting = run_counting(documents=iter(sciq.corpus), ndocs=len(sciq.corpus))
# Building BM25 index and save:
bm25_index = BM25Index.build_from_documents(
documents=iter(sciq.corpus),
ndocs=12160,
show_progress_bar=True
)
bm25_index.save("output/bm25_index")
plots_b: Dict[str, List[float]] = {
"X": [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
"Y": []
}
plots_k1: Dict[str, List[float]] = {
"X": [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
"Y": []
}
## YOUR_CODE_STARTS_HERE
# Step 1: Tune b (with fixed k1=0.9)
for b_val in plots_b["X"]:
bm25_index = BM25Index.build_from_documents(
documents=iter(sciq.corpus),
ndocs=12160,
show_progress_bar=True,
k1=0.9, # Fix k1
b=b_val
)
bm25_index.save("output/bm25_index")
bm25_retriever = BM25Retriever(index_dir="output/bm25_index")
rankings = {}
for query in sciq.get_split_queries(Split.dev):
ranking = bm25_retriever.retrieve(query=query.text)
rankings[query.query_id] = ranking
map_score = evaluate_map(rankings)
plots_b["Y"].append(map_score)
# Step 2: Tune k1 (with the best b from step 1)
best_b = plots_b["X"][np.argmax(plots_b["Y"])] # Get best b
for k1_val in plots_k1["X"]:
bm25_index = BM25Index.build_from_documents(
documents=iter(sciq.corpus),
ndocs=12160,
show_progress_bar=True,
k1=k1_val,
b=best_b # Use best b
)
bm25_index.save("output/bm25_index")
bm25_retriever = BM25Retriever(index_dir="output/bm25_index")
rankings = {}
for query in sciq.get_split_queries(Split.dev):
ranking = bm25_retriever.retrieve(query=query.text)
rankings[query.query_id] = ranking
map_score = evaluate_map(rankings)
plots_k1["Y"].append(map_score)
best_b = plots_b["X"][np.argmax(plots_b["Y"])]
best_k1 = plots_k1["X"][np.argmax(plots_k1["Y"])]
from scipy.sparse._csc import csc_matrix
@dataclass
class CSCInvertedIndex:
posting_lists_matrix: csc_matrix # docid -> posting_list
vocab: Dict[str, int]
cid2docid: Dict[str, int] # collection_id -> docid
collection_ids: List[str] # docid -> collection_id
doc_texts: Optional[List[str]] = None # docid -> document text
def save(self, output_dir: str) -> None:
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "index.pkl"), "wb") as f:
pickle.dump(self, f)
@classmethod
def from_saved(cls: Type[T], saved_dir: str) -> T:
index = cls(
posting_lists_matrix=None, vocab={}, cid2docid={}, collection_ids=[], doc_texts=None
)
with open(os.path.join(saved_dir, "index.pkl"), "rb") as f:
index = pickle.load(f)
return index
@dataclass
class CSCBM25Index(CSCInvertedIndex):
@staticmethod
def tokenize(text: str) -> List[str]:
return simple_tokenize(text)
@staticmethod
def cache_term_weights(
posting_lists: List[PostingList],
total_docs: int,
avgdl: float,
dfs: List[int],
dls: List[int],
k1: float,
b: float,
) -> csc_matrix:
"""Compute term weights and caching"""
data = []
indices = []
indptr = [0]
N = total_docs
for tid, posting_list in enumerate(tqdm.tqdm(posting_lists, desc="Regularizing TFs")):
idf = CSCBM25Index.calc_idf(df=dfs[tid], N=N)
for i in range(len(posting_list.docid_postings)):
docid = posting_list.docid_postings[i]
tf = posting_list.tweight_postings[i]
dl = dls[docid]
regularized_tf = CSCBM25Index.calc_regularized_tf(
tf=tf, dl=dl, avgdl=avgdl, k1=k1, b=b
)
weight = regularized_tf * idf
if weight > 1e-6: # Use a small threshold to filter near-zero values #This is the fix
data.append(weight)
indices.append(docid)
indptr.append(len(data))
# Ensure data types are compatible with csc_matrix
data = np.array(data, dtype=np.float64) # or np.float32 if appropriate
indices = np.array(indices, dtype=np.int32) # or np.int64 if appropriate
indptr = np.array(indptr, dtype=np.int32) # or np.int64 if appropriate
return csc_matrix((data, indices, indptr), shape=(total_docs, len(posting_lists)))
## YOUR_CODE_ENDS_HERE
@staticmethod
def calc_regularized_tf(
tf: int, dl: float, avgdl: float, k1: float, b: float
) -> float:
return tf / (tf + k1 * (1 - b + b * dl / avgdl))
@staticmethod
def calc_idf(df: int, N: int):
return math.log(1 + (N - df + 0.5) / (df + 0.5))
@classmethod
def build_from_documents(
cls: Type[CSCBM25Index],
documents: Iterable[Document],
store_raw: bool = True,
output_dir: Optional[str] = None,
ndocs: Optional[int] = None,
show_progress_bar: bool = True,
k1: float = 0.9,
b: float = 0.4,
) -> CSCBM25Index:
# Counting TFs, DFs, doc_lengths, etc.:
counting = run_counting(
documents=documents,
tokenize_fn=CSCBM25Index.tokenize,
store_raw=store_raw,
ndocs=ndocs,
show_progress_bar=show_progress_bar,
)
# Compute term weights and caching:
posting_lists = counting.posting_lists
total_docs = len(counting.cid2docid)
posting_lists_matrix = CSCBM25Index.cache_term_weights(
posting_lists=posting_lists,
total_docs=total_docs,
avgdl=counting.avgdl,
dfs=counting.dfs,
dls=counting.dls,
k1=k1,
b=b,
)
# Assembly and save:
index = CSCBM25Index(
posting_lists_matrix=posting_lists_matrix,
vocab=counting.vocab,
cid2docid=counting.cid2docid,
collection_ids=counting.collection_ids,
doc_texts=counting.doc_texts,
)
return index
csc_bm25_index = CSCBM25Index.build_from_documents(
documents=iter(sciq.corpus),
ndocs=12160,
show_progress_bar=True,
k1=best_k1,
b=best_b
)
csc_bm25_index.save("output/csc_bm25_index")
class BaseCSCInvertedIndexRetriever(BaseRetriever):
@property
@abstractmethod
def index_class(self) -> Type[CSCInvertedIndex]:
pass
def __init__(self, index_dir: str) -> None:
self.index = self.index_class.from_saved(index_dir)
def get_term_weights(self, query: str, cid: str) -> Dict[str, float]:
## YOUR_CODE_STARTS_HERE
toks = self.index.tokenize(query)
target_docid = self.index.cid2docid[cid]
term_weights = {}
for tok in toks:
if tok not in self.index.vocab:
continue
tid = self.index.vocab[tok]
# Get weights for the target document from the CSC matrix
weights_for_doc = self.index.posting_lists_matrix.getcol(tid).toarray()[:, 0]
term_weights[tok] = weights_for_doc[target_docid]
return term_weights
## YOUR_CODE_ENDS_HERE
def score(self, query: str, cid: str) -> float:
return sum(self.get_term_weights(query=query, cid=cid).values())
def retrieve(self, query: str, topk: int = 10) -> Dict[str, float]:
## YOUR_CODE_STARTS_HERE
ranking: Dict[str, float] = {}
toks = self.index.tokenize(query)
docid2score: Dict[int, float] = {}
for tok in toks:
if tok not in self.index.vocab:
continue
tid = self.index.vocab[tok]
tid2documents = self.index.posting_lists_matrix.getcol(tid)
for docid, tweight in zip(tid2documents.indices, tid2documents.data):
docid2score.setdefault(docid, 0)
docid2score[docid] += tweight
docid2score = dict(
sorted(docid2score.items(), key=lambda pair: pair[1], reverse=True)[:topk]
)
ranking = {
self.index.collection_ids[docid]: score
for docid, score in docid2score.items()
}
return ranking
## YOUR_CODE_ENDS_HERE
class CSCBM25Retriever(BaseCSCInvertedIndexRetriever):
@property
def index_class(self) -> Type[CSCBM25Index]:
return CSCBM25Index
import gradio as gr
from typing import TypedDict
class Hit(TypedDict):
cid: str
score: float
text: str
demo: Optional[gr.Interface] = None # Assign your gradio demo to this variable
return_type = List[Hit]
## YOUR_CODE_STARTS_HERE
def search(query: str) -> List[Hit]:
bm25_index = BM25Index.build_from_documents(
documents=iter(sciq.corpus),
ndocs=12160,
show_progress_bar=True, # Keep progress bar
k1=best_k1, # Use tuned k1
b=best_b # Use best b
)
bm25_index.save("output/bm25_index")
bm25_retriever = BM25Retriever(index_dir="output/bm25_index")
ranking = bm25_retriever.retrieve(query=query)
hits = []
for cid, score in ranking.items():
doc = next((doc for doc in sciq.corpus if doc.collection_id == cid), None)
if doc:
hits.append({"cid": cid, "score": score, "text": doc.text})
return hits
demo = gr.Interface(
fn=search,
inputs=gr.Textbox(lines=2, placeholder="Enter your query here..."),
outputs=gr.JSON(label="Search Results"),
title="SciQ Search Engine",
description="Enter a query to search the SciQ dataset using BM25.",
)
## YOUR_CODE_ENDS_HERE
demo.launch(share=True)