# Copyright 2023 Dmitry Ustalov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'Dmitry Ustalov'
__license__ = 'Apache 2.0'
import typing
import gradio as gr
import numpy as np
import numpy.typing as npt
import pandas as pd
import plotly.express as px
from plotly.graph_objects import Figure
def visualize(df_pairwise: pd.DataFrame) -> Figure:
fig = px.imshow(df_pairwise, color_continuous_scale='RdBu', text_auto='.2f')
fig.update_layout(xaxis_title='Loser', yaxis_title='Winner', xaxis_side='top')
fig.update_traces(hovertemplate='Winner: %{y}
Loser: %{x}
Fraction of Wins: %{z}')
return fig
# https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-newman-py
def aggregate(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64],
seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]:
assert wins.shape == ties.shape, 'wins and ties shapes are different'
rng = np.random.default_rng(seed)
pi, v = rng.random(wins.shape[0]), rng.random()
converged, iterations = False, 0
while not converged:
iterations += 1
v_numerator = np.sum(
ties * (pi[:, np.newaxis] + pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
) / 2
v_denominator = np.sum(
wins * 2 * np.sqrt(pi[:, np.newaxis] * pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
)
v = v_numerator / v_denominator
v = np.nan_to_num(v, copy=False, nan=tolerance)
pi_old = pi.copy()
pi_numerator = np.sum(
(wins + ties / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=1
)
pi_denominator = np.sum(
(wins + ties / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=0
)
pi = pi_numerator / pi_denominator
pi = np.nan_to_num(pi, copy=False, nan=tolerance)
converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1),
rtol=tolerance, atol=tolerance) or (iterations >= limit)
return pi
def handler(file: typing.IO[bytes], seed: int) -> typing.Tuple[pd.DataFrame, Figure]:
if file is None:
raise gr.Error('File must be uploaded')
try:
df = pd.read_csv(file.name, dtype=str)
except ValueError as e:
raise gr.Error(f'Parsing error: {e}')
if not pd.Series(['left', 'right', 'winner']).isin(df.columns).all():
raise gr.Error('Columns must exist: left, right, winner')
if not df['winner'].isin(pd.Series(['left', 'right', 'tie'])).all():
raise gr.Error('Allowed winner values: left, right, tie')
df = df[['left', 'right', 'winner']]
df.dropna(axis='rows', inplace=True)
index = pd.Index(np.unique(df[['left', 'right']].values), name='item')
df_wins = pd.pivot_table(df[df['winner'].isin(['left', 'right'])],
index='left', columns='right', values='winner',
aggfunc='count', fill_value=0)
df_wins = df_wins.reindex(labels=index, columns=index, fill_value=0, copy=False)
df_ties = pd.pivot_table(df[df['winner'] == 'tie'],
index='left', columns='right', values='winner', aggfunc='count',
fill_value=0)
df_ties = df_ties.reindex(labels=index, columns=index, fill_value=0, copy=False)
wins = df_wins.to_numpy(dtype=np.int64)
ties = df_ties.to_numpy(dtype=np.int64)
ties += ties.T
scores = aggregate(wins, ties, seed=seed)
df_result = pd.DataFrame(data={'score': scores}, index=index)
df_result['rank'] = df_result['score'].rank(na_option='bottom', ascending=False).astype(int)
df_result.fillna(np.NINF, inplace=True)
df_result.sort_values(by=['rank', 'score'], ascending=[True, False], inplace=True)
df_result.reset_index(inplace=True)
df_pairwise = pd.DataFrame(data=scores[:, np.newaxis] / (scores + scores[:, np.newaxis]),
index=index, columns=index)
df_pairwise = df_pairwise.reindex(labels=df_result['item'], columns=df_result['item'], copy=False)
fig = visualize(df_pairwise)
return df_result, fig
def main() -> None:
iface = gr.Interface(
fn=handler,
inputs=[
gr.File(
value='example.csv',
file_types=['.tsv', '.csv'],
label='Comparisons'
),
gr.Number(
label='Seed',
precision=0
)
],
outputs=[
gr.Dataframe(
headers=['item', 'score', 'rank'],
label='Ranking'
),
gr.Plot(
label='Pairwise Chances of Winning the Comparison'
)
],
title='Pair2Rank: Turn Your Side-by-Side Comparisons into Ranking!',
description='''
This easy-to-use tool transforms pairwise comparisons (aka side-by-side) to a meaningful ranking of items.
As an input, it expects a comma-separated (CSV) file with a header containing the following columns:
- `left`: the first compared item
- `right`: the second compared item
- `winner`: the label indicating the winning item
Possible values for `winner` are `left`, `right`, or `tie`.
The provided example might be a good starting point.
As the output, this tool provides a table with items, their estimated scores, and ranks.
''',
article='''
This tool attempts to implement the tie-aware ranking aggregation algorithm as described in
[Efficient Computation of Rankings from Pairwise Comparisons](https://www.jmlr.org/papers/v24/22-1086.html).
''',
allow_flagging='never'
)
iface.launch()
if __name__ == '__main__':
main()