# Copyright 2023 Dmitry Ustalov # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. __author__ = 'Dmitry Ustalov' __license__ = 'Apache 2.0' import typing import gradio as gr import numpy as np import numpy.typing as npt import pandas as pd import plotly.express as px from plotly.graph_objects import Figure def visualize(df_pairwise: pd.DataFrame) -> Figure: fig = px.imshow(df_pairwise, color_continuous_scale='RdBu', text_auto='.2f') fig.update_layout(xaxis_title='Loser', yaxis_title='Winner', xaxis_side='top') fig.update_traces(hovertemplate='Winner: %{y}
Loser: %{x}
Fraction of Wins: %{z}') return fig # https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-newman-py def aggregate(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]: assert wins.shape == ties.shape, 'wins and ties shapes are different' rng = np.random.default_rng(seed) pi, v = rng.random(wins.shape[0]), rng.random() converged, iterations = False, 0 while not converged: iterations += 1 v_numerator = np.sum( ties * (pi[:, np.newaxis] + pi) / (pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) ) / 2 v_denominator = np.sum( wins * 2 * np.sqrt(pi[:, np.newaxis] * pi) / (pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) ) v = v_numerator / v_denominator v = np.nan_to_num(v, copy=False, nan=tolerance) pi_old = pi.copy() pi_numerator = np.sum( (wins + ties / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) / (pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), axis=1 ) pi_denominator = np.sum( (wins + ties / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) / (pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), axis=0 ) pi = pi_numerator / pi_denominator pi = np.nan_to_num(pi, copy=False, nan=tolerance) converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1), rtol=tolerance, atol=tolerance) or (iterations >= limit) return pi def handler(file: typing.IO[bytes], seed: int) -> typing.Tuple[pd.DataFrame, Figure]: if file is None: raise gr.Error('File must be uploaded') try: df = pd.read_csv(file.name, dtype=str) except ValueError as e: raise gr.Error(f'Parsing error: {e}') if not pd.Series(['left', 'right', 'winner']).isin(df.columns).all(): raise gr.Error('Columns must exist: left, right, winner') if not df['winner'].isin(pd.Series(['left', 'right', 'tie'])).all(): raise gr.Error('Allowed winner values: left, right, tie') df = df[['left', 'right', 'winner']] df.dropna(axis='rows', inplace=True) index = pd.Index(np.unique(df[['left', 'right']].values), name='item') df_wins = pd.pivot_table(df[df['winner'].isin(['left', 'right'])], index='left', columns='right', values='winner', aggfunc='count', fill_value=0) df_wins = df_wins.reindex(labels=index, columns=index, fill_value=0, copy=False) df_ties = pd.pivot_table(df[df['winner'] == 'tie'], index='left', columns='right', values='winner', aggfunc='count', fill_value=0) df_ties = df_ties.reindex(labels=index, columns=index, fill_value=0, copy=False) wins = df_wins.to_numpy(dtype=np.int64) ties = df_ties.to_numpy(dtype=np.int64) ties += ties.T scores = aggregate(wins, ties, seed=seed) df_result = pd.DataFrame(data={'score': scores}, index=index) df_result['rank'] = df_result['score'].rank(na_option='bottom', ascending=False).astype(int) df_result.fillna(np.NINF, inplace=True) df_result.sort_values(by=['rank', 'score'], ascending=[True, False], inplace=True) df_result.reset_index(inplace=True) df_pairwise = pd.DataFrame(data=scores[:, np.newaxis] / (scores + scores[:, np.newaxis]), index=index, columns=index) df_pairwise = df_pairwise.reindex(labels=df_result['item'], columns=df_result['item'], copy=False) fig = visualize(df_pairwise) return df_result, fig def main() -> None: iface = gr.Interface( fn=handler, inputs=[ gr.File( value='example.csv', file_types=['.tsv', '.csv'], label='Comparisons' ), gr.Number( label='Seed', precision=0 ) ], outputs=[ gr.Dataframe( headers=['item', 'score', 'rank'], label='Ranking' ), gr.Plot( label='Pairwise Chances of Winning the Comparison' ) ], title='Pair2Rank: Turn Your Side-by-Side Comparisons into Ranking!', description=''' This easy-to-use tool transforms pairwise comparisons (aka side-by-side) to a meaningful ranking of items. As an input, it expects a comma-separated (CSV) file with a header containing the following columns: - `left`: the first compared item - `right`: the second compared item - `winner`: the label indicating the winning item Possible values for `winner` are `left`, `right`, or `tie`. The provided example might be a good starting point. As the output, this tool provides a table with items, their estimated scores, and ranks. ''', article=''' This tool attempts to implement the tie-aware ranking aggregation algorithm as described in [Efficient Computation of Rankings from Pairwise Comparisons](https://www.jmlr.org/papers/v24/22-1086.html). ''', allow_flagging='never' ) iface.launch() if __name__ == '__main__': main()