Autodoc-Lifter / merger.py
Jonathan Wang
initial commit
89cbc4d
#####################################################
### DOCUMENT PROCESSOR [MERGER]
#####################################################
# Jonathan Wang
# ABOUT:
# This project creates an app to chat with PDFs.
# This is the MERGER
# which defines how two lists with scores
# should be merged together into one list.
# (Useful for fusing things like keywords or textnodes)
#####################################################
## TODOS:
# We're looping through A/B more than necessary.
#####################################################
## IMPORTS:
from __future__ import annotations
from typing import TYPE_CHECKING, Sequence, TypeVar, Union
import numpy as np
if TYPE_CHECKING:
from numpy.typing import NDArray
#####################################################
## CODE:
GenericType = TypeVar("GenericType")
### TODO(Jonathan Wang): Implement Maximum Marginal Relevance (MMR)
# https://en.wikipedia.org/wiki/Maximum_marginal_relevance
# def mmr(documents, query, scores, lambda_param=0.5):
# """
# Calculate Maximum Marginal Relevance (MMR) for a list of documents.
# Parameters:
# documents (list of np.array): List of document vectors.
# query (np.array): Query vector.
# scores (list of float): Relevance scores for each document.
# lambda_param (float): Trade-off parameter between relevance and diversity.
# Returns:
# list of int: Indices of selected documents in order of selection.
# """
# selected = []
# remaining = list(range(len(documents)))
# while remaining:
# if not selected:
# # Select the document with the highest relevance score
# idx = np.argmax(scores)
# else:
# # Calculate MMR for remaining documents
# mmr_scores = []
# for i in remaining:
# relevance = scores[i]
# diversity = max([np.dot(documents[i], documents[j]) for j in selected])
# mmr_score = lambda_param * relevance - (1 - lambda_param) * diversity
# mmr_scores.append(mmr_score)
# idx = remaining[np.argmax(mmr_scores)]
# selected.append(idx)
# remaining.remove(idx)
# return selected
def _merge_on_scores(
a_list: Sequence[GenericType],
b_list: Sequence[GenericType],
a_scores_input: Sequence[float | np.float64 | None],
b_scores_input: Sequence[float | np.float64 | None],
use_distribution: bool = True,
a_weight: float = 0.5,
top_k: int = 5,
) -> Sequence[GenericType]:
"""
Given two lists of elements with scores, fuse them together using "Distribution-Based Score Fusion".
Elements which have high scores in both lists are given even higher ranking here.
Inputs:
a_list: list of elements for A
a_scores: list of scores for each element in A. Assume higher is better. Share the same index.
b_list: list of elements for B
b_scores: list of scores for each element in B. Assume higher is better. Share the same index.
use_distribution: Whether to fuse using Min-Max Scaling (FALSE) or Distribution Based Score Fusion (TRUE)
Outputs:
List: List of elements that passed the merge.
"""
# Guard Clauses
if ((len(a_list) != len(a_scores_input)) or (len(b_list) != len(b_scores_input))):
msg = (
f"""_merge_on_scores: Differing number of elements and scores!
a_list: {a_list}
a_scores: {a_scores_input}
b_list: {b_list}
b_scores: {b_scores_input}
"""
)
raise ValueError(msg)
if (a_weight > 1 or a_weight < 0):
msg = "_merge_on_scores: weight for the A list should be between 0 and 1."
raise ValueError(msg)
if (top_k < 0): # or top_k > :
# TODO(Jonathan Wang): Find a nice way to get the number of unique elements in a list
# where those elements are potentially unhashable AND unorderable.
# I know about the n^2 solution with two lists and (if not in x), but it's a bit annoying.
msg = "_merge_on_scores: top_k must be between 0 and the total number of elements."
raise ValueError(msg)
# 0. Convert to numpy arrays
# NOTE: When using a SubQuestionQueryEngine, the subanswers are saved as NodesWithScores, but their score is None.
# We want to filter these out, so we get citations when the two texts are very similar.
a_scores: NDArray[np.float64] = np.array(a_scores_input, dtype=np.float64)
b_scores: NDArray[np.float64] = np.array(b_scores_input, dtype=np.float64)
# 1. Calculate mean of scores.
a_mean = np.nanmean(a_scores) # np.nan if empty
b_mean = np.nanmean(b_scores)
# 2. Calculate standard deviations
a_stdev = np.nanstd(a_scores)
b_stdev = np.nanstd(b_scores)
# 3. Get minimum and maximum bands as 3std from mean
# alternatively, use actual min-max scaling
a_min = a_mean - 3 * a_stdev if use_distribution else np.nanmin(a_scores)
a_max = a_mean + 3 * a_stdev if use_distribution else np.nanmax(a_scores)
b_min = b_mean - 3 * b_stdev if use_distribution else np.nanmin(b_scores)
b_max = b_mean + 3 * b_stdev if use_distribution else np.nanmax(b_scores)
# 4. Rescale the distributions
if (a_max > a_min):
a_scores = np.array([
((x - a_min) / (a_max - a_min))
for x in a_scores
], dtype=np.float64)
if (b_max > b_min):
b_scores = np.array([
(x - b_min) / (b_max - b_min)
for x in b_scores
], dtype=np.float64)
# 5. Fuse the scores together
full_dict: list[tuple[GenericType, float]] = []
for index, element in enumerate(a_list):
a_score = a_scores[index]
if (element in b_list):
# In both A and B. Fuse score.
b_score = b_scores[b_list.index(element)]
fused_score = a_weight * a_score + (1-a_weight) * b_score
full_dict.append((element, fused_score))
else:
# Only in A.
full_dict.append((element, a_weight * a_score))
for index, element in enumerate(b_list):
if (element not in a_list):
b_score = b_scores[index]
full_dict.append((element, (1-a_weight) * b_score))
full_dict = sorted(full_dict, key=lambda item: item[1], reverse=True)
output_list = [item[0] for item in full_dict]
if (top_k >= len(full_dict)):
return output_list
# create final response object
return output_list[:top_k]