|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__author__ = 'Dmitry Ustalov' |
|
__license__ = 'Apache 2.0' |
|
|
|
from typing import IO, Tuple, List, cast, Dict |
|
|
|
import gradio as gr |
|
import networkx as nx |
|
import numpy as np |
|
import numpy.typing as npt |
|
import pandas as pd |
|
import plotly.express as px |
|
from plotly.graph_objects import Figure |
|
|
|
|
|
def visualize(df_pairwise: pd.DataFrame) -> Figure: |
|
fig = px.imshow(df_pairwise, color_continuous_scale='RdBu', text_auto='.2f') |
|
fig.update_layout(xaxis_title='Loser', yaxis_title='Winner', xaxis_side='top') |
|
fig.update_traces(hovertemplate='Winner: %{y}<br>Loser: %{x}<br>Fraction of Wins: %{z}<extra></extra>') |
|
return fig |
|
|
|
|
|
|
|
def bradley_terry(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], |
|
seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]: |
|
M = wins + .5 * ties |
|
|
|
T = M.T + M |
|
active = T > 0 |
|
|
|
w = M.sum(axis=1) |
|
|
|
Z = np.zeros_like(M, dtype=float) |
|
|
|
p = np.ones(M.shape[0]) |
|
p_new = p.copy() |
|
|
|
converged, iterations = False, 0 |
|
|
|
while not converged: |
|
iterations += 1 |
|
|
|
P = np.broadcast_to(p, M.shape) |
|
|
|
Z[active] = T[active] / (P[active] + P.T[active]) |
|
|
|
p_new[:] = w |
|
p_new /= Z.sum(axis=0) |
|
p_new /= p_new.sum() |
|
|
|
converged = bool(np.linalg.norm(p_new - p) < tolerance) or (iterations >= limit) |
|
|
|
p[:] = p_new |
|
|
|
return p |
|
|
|
|
|
def pagerank(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], |
|
seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]: |
|
A = wins + .5 * ties |
|
|
|
G = nx.from_numpy_array(A, create_using=nx.DiGraph) |
|
|
|
pagerank: Dict[int, float] = nx.algorithms.pagerank(G, max_iter=limit, tol=tolerance) |
|
|
|
scores = np.array([pagerank[i] for i in range(len(G))]) |
|
|
|
return scores |
|
|
|
|
|
|
|
def newman(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], |
|
seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]: |
|
rng = np.random.default_rng(seed) |
|
|
|
pi, v = rng.random(wins.shape[0]), rng.random() |
|
|
|
converged, iterations = False, 0 |
|
|
|
while not converged: |
|
iterations += 1 |
|
|
|
v_numerator = np.sum( |
|
ties * (pi[:, np.newaxis] + pi) / |
|
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) |
|
) / 2 |
|
|
|
v_denominator = np.sum( |
|
wins * 2 * np.sqrt(pi[:, np.newaxis] * pi) / |
|
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) |
|
) |
|
|
|
v = v_numerator / v_denominator |
|
v = np.nan_to_num(v, copy=False, nan=tolerance) |
|
|
|
pi_old = pi.copy() |
|
|
|
pi_numerator = np.sum( |
|
(wins + ties / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) / |
|
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), |
|
axis=1 |
|
) |
|
|
|
pi_denominator = np.sum( |
|
(wins + ties / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) / |
|
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), |
|
axis=0 |
|
) |
|
|
|
pi = pi_numerator / pi_denominator |
|
pi = np.nan_to_num(pi, copy=False, nan=tolerance) |
|
|
|
converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1), |
|
rtol=tolerance, atol=tolerance) or (iterations >= limit) |
|
|
|
return pi |
|
|
|
|
|
ALGORITHMS = { |
|
'Bradley-Terry (1952)': bradley_terry, |
|
'PageRank (1998)': pagerank, |
|
'Newman (2023)': newman, |
|
} |
|
|
|
|
|
def handler(file: IO[bytes], algorithm: str, seed: int) -> Tuple[pd.DataFrame, Figure]: |
|
if file is None: |
|
raise gr.Error('File must be uploaded') |
|
|
|
if algorithm not in ALGORITHMS: |
|
raise gr.Error(f'Unknown algorithm: {algorithm}') |
|
|
|
try: |
|
df = pd.read_csv(file.name, dtype=str) |
|
except ValueError as e: |
|
raise gr.Error(f'Parsing error: {e}') |
|
|
|
if not pd.Series(['left', 'right', 'winner']).isin(df.columns).all(): |
|
raise gr.Error('Columns must exist: left, right, winner') |
|
|
|
if not df['winner'].isin(pd.Series(['left', 'right', 'tie'])).all(): |
|
raise gr.Error('Allowed winner values: left, right, tie') |
|
|
|
df = df[['left', 'right', 'winner']] |
|
|
|
df.dropna(axis='rows', inplace=True) |
|
|
|
index = pd.Index(np.unique(df[['left', 'right']].values), name='item') |
|
|
|
df_wins = pd.pivot_table(df[df['winner'].isin(['left', 'right'])], |
|
index='left', columns='right', values='winner', |
|
aggfunc='count', fill_value=0) |
|
df_wins = df_wins.reindex(labels=index, columns=index, fill_value=0, copy=False) |
|
|
|
df_ties = pd.pivot_table(df[df['winner'] == 'tie'], |
|
index='left', columns='right', values='winner', aggfunc='count', |
|
fill_value=0) |
|
df_ties = df_ties.reindex(labels=index, columns=index, fill_value=0, copy=False) |
|
|
|
wins = df_wins.to_numpy(dtype=np.int64) |
|
ties = df_ties.to_numpy(dtype=np.int64) |
|
ties += ties.T |
|
|
|
assert wins.shape == ties.shape, 'wins and ties shapes are different' |
|
|
|
scores = ALGORITHMS[algorithm](wins, ties, seed=seed) |
|
|
|
df_result = pd.DataFrame(data={'score': scores}, index=index) |
|
df_result['pairs'] = df.groupby('left')['left'].count() + df.groupby('right')['right'].count() |
|
df_result['rank'] = df_result['score'].rank(na_option='bottom', ascending=False).astype(int) |
|
df_result.fillna(np.NINF, inplace=True) |
|
df_result.sort_values(by=['rank', 'score'], ascending=[True, False], inplace=True) |
|
df_result.reset_index(inplace=True) |
|
|
|
df_pairwise = pd.DataFrame(data=scores[:, np.newaxis] / (scores + scores[:, np.newaxis]), |
|
index=index, columns=index) |
|
df_pairwise = df_pairwise.reindex(labels=df_result['item'], columns=df_result['item'], copy=False) |
|
|
|
fig = visualize(df_pairwise) |
|
|
|
return df_result, fig |
|
|
|
|
|
def main() -> None: |
|
iface = gr.Interface( |
|
fn=handler, |
|
inputs=[ |
|
gr.File( |
|
file_types=['.tsv', '.csv'], |
|
label='Comparisons' |
|
), |
|
gr.Dropdown( |
|
choices=cast(List[str], ALGORITHMS), |
|
value='Bradley-Terry (1952)', |
|
label='Algorithm' |
|
), |
|
gr.Number( |
|
label='Seed', |
|
precision=0 |
|
) |
|
], |
|
outputs=[ |
|
gr.Dataframe( |
|
headers=['item', 'score', 'pairs', 'rank'], |
|
label='Ranking' |
|
), |
|
gr.Plot( |
|
label='Pairwise Chances of Winning the Comparison' |
|
) |
|
], |
|
examples=[ |
|
['food.csv', 'Bradley-Terry (1952)', 0], |
|
['food.csv', 'PageRank (1998)', 0], |
|
['food.csv', 'Newman (2023)', 0] |
|
], |
|
title='Pair2Rank: Turn Your Side-by-Side Comparisons into Ranking!', |
|
description=''' |
|
This easy-to-use tool transforms pairwise comparisons (aka side-by-side) to a meaningful ranking of items. |
|
|
|
As an input, it expects a comma-separated (CSV) file with a header containing the following columns: |
|
|
|
- `left`: the first compared item |
|
- `right`: the second compared item |
|
- `winner`: the label indicating the winning item |
|
|
|
Possible values for `winner` are `left`, `right`, or `tie`. |
|
The provided example might be a good starting point. |
|
|
|
As the output, this tool provides a table with items, their estimated scores, and ranks. |
|
''', |
|
article=''' |
|
This tool attempts to implement the tie-aware ranking aggregation algorithm as described in |
|
[Efficient Computation of Rankings from Pairwise Comparisons](https://www.jmlr.org/papers/v24/22-1086.html). |
|
''', |
|
allow_flagging='never' |
|
) |
|
|
|
iface.launch() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |
|
|