# Copyright 2023 Dmitry Ustalov # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. __author__ = 'Dmitry Ustalov' __license__ = 'Apache 2.0' from typing import IO, Tuple, List, cast, Dict import gradio as gr import networkx as nx import numpy as np import numpy.typing as npt import pandas as pd import plotly.express as px from plotly.graph_objects import Figure def visualize(df_pairwise: pd.DataFrame) -> Figure: fig = px.imshow(df_pairwise, color_continuous_scale='RdBu', text_auto='.2f') fig.update_layout(xaxis_title='Loser', yaxis_title='Winner', xaxis_side='top') fig.update_traces(hovertemplate='Winner: %{y}
Loser: %{x}
Fraction of Wins: %{z}') return fig # https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-bradley_terry-py def bradley_terry(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]: M = wins + .5 * ties T = M.T + M active = T > 0 w = M.sum(axis=1) Z = np.zeros_like(M, dtype=float) p = np.ones(M.shape[0]) p_new = p.copy() converged, iterations = False, 0 while not converged: iterations += 1 P = np.broadcast_to(p, M.shape) Z[active] = T[active] / (P[active] + P.T[active]) p_new[:] = w p_new /= Z.sum(axis=0) p_new /= p_new.sum() converged = bool(np.linalg.norm(p_new - p) < tolerance) or (iterations >= limit) p[:] = p_new return p def pagerank(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 100) -> npt.NDArray[np.float64]: A = wins + .5 * ties G = nx.from_numpy_array(A, create_using=nx.DiGraph) pagerank: Dict[int, float] = nx.algorithms.pagerank(G, max_iter=limit, tol=tolerance) scores = np.array([pagerank[i] for i in range(len(G))]) return scores # https://gist.github.com/dustalov/41678b70c40ba5a55430fa5e77b121d9#file-newman-py def newman(wins: npt.NDArray[np.int64], ties: npt.NDArray[np.int64], seed: int = 0, tolerance: float = 10e-6, limit: int = 20) -> npt.NDArray[np.float64]: rng = np.random.default_rng(seed) pi, v = rng.random(wins.shape[0]), rng.random() converged, iterations = False, 0 while not converged: iterations += 1 v_numerator = np.sum( ties * (pi[:, np.newaxis] + pi) / (pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) ) / 2 v_denominator = np.sum( wins * 2 * np.sqrt(pi[:, np.newaxis] * pi) / (pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) ) v = v_numerator / v_denominator v = np.nan_to_num(v, copy=False, nan=tolerance) pi_old = pi.copy() pi_numerator = np.sum( (wins + ties / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) / (pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), axis=1 ) pi_denominator = np.sum( (wins + ties / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) / (pi[:, np.newaxis] + pi + 2 + v * np.sqrt(pi[:, np.newaxis] * pi)), axis=0 ) pi = pi_numerator / pi_denominator pi = np.nan_to_num(pi, copy=False, nan=tolerance) converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1), rtol=tolerance, atol=tolerance) or (iterations >= limit) return pi ALGORITHMS = { 'Bradley-Terry (1952)': bradley_terry, 'PageRank (1998)': pagerank, 'Newman (2023)': newman, } def handler(file: IO[bytes], algorithm: str, seed: int) -> Tuple[pd.DataFrame, Figure]: if file is None: raise gr.Error('File must be uploaded') if algorithm not in ALGORITHMS: raise gr.Error(f'Unknown algorithm: {algorithm}') try: df = pd.read_csv(file.name, dtype=str) except ValueError as e: raise gr.Error(f'Parsing error: {e}') if not pd.Series(['left', 'right', 'winner']).isin(df.columns).all(): raise gr.Error('Columns must exist: left, right, winner') if not df['winner'].isin(pd.Series(['left', 'right', 'tie'])).all(): raise gr.Error('Allowed winner values: left, right, tie') df = df[['left', 'right', 'winner']] df.dropna(axis='rows', inplace=True) index = pd.Index(np.unique(df[['left', 'right']].values), name='item') df_wins = pd.pivot_table(df[df['winner'].isin(['left', 'right'])], index='left', columns='right', values='winner', aggfunc='count', fill_value=0) df_wins = df_wins.reindex(labels=index, columns=index, fill_value=0, copy=False) df_ties = pd.pivot_table(df[df['winner'] == 'tie'], index='left', columns='right', values='winner', aggfunc='count', fill_value=0) df_ties = df_ties.reindex(labels=index, columns=index, fill_value=0, copy=False) wins = df_wins.to_numpy(dtype=np.int64) ties = df_ties.to_numpy(dtype=np.int64) ties += ties.T assert wins.shape == ties.shape, 'wins and ties shapes are different' scores = ALGORITHMS[algorithm](wins, ties, seed=seed) df_result = pd.DataFrame(data={'score': scores}, index=index) df_result['pairs'] = df.groupby('left')['left'].count() + df.groupby('right')['right'].count() df_result['rank'] = df_result['score'].rank(na_option='bottom', ascending=False).astype(int) df_result.fillna(np.NINF, inplace=True) df_result.sort_values(by=['rank', 'score'], ascending=[True, False], inplace=True) df_result.reset_index(inplace=True) df_pairwise = pd.DataFrame(data=scores[:, np.newaxis] / (scores + scores[:, np.newaxis]), index=index, columns=index) df_pairwise = df_pairwise.reindex(labels=df_result['item'], columns=df_result['item'], copy=False) fig = visualize(df_pairwise) return df_result, fig def main() -> None: iface = gr.Interface( fn=handler, inputs=[ gr.File( file_types=['.tsv', '.csv'], label='Comparisons' ), gr.Dropdown( choices=cast(List[str], ALGORITHMS), value='Bradley-Terry (1952)', label='Algorithm' ), gr.Number( label='Seed', precision=0 ) ], outputs=[ gr.Dataframe( headers=['item', 'score', 'pairs', 'rank'], label='Ranking' ), gr.Plot( label='Pairwise Chances of Winning the Comparison' ) ], examples=[ ['food.csv', 'Bradley-Terry (1952)', 0], ['food.csv', 'PageRank (1998)', 0], ['food.csv', 'Newman (2023)', 0] ], title='Pair2Rank: Turn Your Side-by-Side Comparisons into Ranking!', description=''' This easy-to-use tool transforms pairwise comparisons (aka side-by-side) to a meaningful ranking of items. As an input, it expects a comma-separated (CSV) file with a header containing the following columns: - `left`: the first compared item - `right`: the second compared item - `winner`: the label indicating the winning item Possible values for `winner` are `left`, `right`, or `tie`. The provided example might be a good starting point. As the output, this tool provides a table with items, their estimated scores, and ranks. ''', article=''' This tool attempts to implement the tie-aware ranking aggregation algorithm as described in [Efficient Computation of Rankings from Pairwise Comparisons](https://www.jmlr.org/papers/v24/22-1086.html). ''', allow_flagging='never' ) iface.launch() if __name__ == '__main__': main()