# Copyright 2021 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import numpy as np import os import pandas as pd import plotly.graph_objects as go import powerlaw from os.path import join as pjoin import utils from scipy.stats import ks_2samp from scipy.stats import zipf as zipf_lib # treating inf values as NaN as well pd.set_option("use_inf_as_na", True) logs = utils.prepare_logging(__file__) class Zipf: def __init__(self, vocab_counts_df, count_str="count", proportion_str="prop"): self.vocab_counts_df = vocab_counts_df # Strings used in the input dictionary self.cnt_str = count_str self.prop_str = proportion_str self.alpha = None self.xmin = None self.xmax = None self.p = None self.ks_distance = None self.observed_counts = None self.word_counts_unique = None self.word_ranks_unique = None if self.vocab_counts_df is not None: self.observed_counts = self.vocab_counts_df[self.cnt_str].values self.word_counts_unique = list(set(self.observed_counts)) self.word_ranks_unique = list( np.arange(1, len(self.word_counts_unique) + 1)) self.zipf_dict = {"xmin": None, "xmax": None, "alpha": None, "ks_distance": None, "p-value": None, "word_ranks_unique": self.word_ranks_unique, "word_counts_unique": self.word_counts_unique} self.fit = None self.predicted_counts = None def load(self, zipf_dict): self.zipf_dict = zipf_dict self.xmin = zipf_dict["xmin"] self.xmax = zipf_dict["xmax"] self.alpha = zipf_dict["alpha"] self.ks_distance = zipf_dict["ks_distance"] self.p = zipf_dict["p-value"] self.word_ranks_unique = zipf_dict["word_ranks_unique"] self.word_counts_unique = zipf_dict["word_counts_unique"] def get_zipf_dict(self): zipf_dict = {"xmin": int(self.xmin), "xmax": int(self.xmax), "alpha": float(self.alpha), "ks_distance": float(self.ks_distance), "p-value": float(self.ks_test.pvalue), "word_counts_unique": [int(count) for count in self.word_counts_unique], "word_ranks_unique": [int(rank) for rank in self.word_ranks_unique]} return zipf_dict def calc_fit(self): """ Uses the powerlaw package to fit the observed frequencies to a zipfian distribution. We use the KS-distance to fit, as that seems more appropriate that MLE. """ logs.info("Fitting based on input vocab counts.") self._make_rank_column() # Note another method for determining alpha might be defined by # (Newman, 2005): alpha = 1 + n * sum(ln( xi / xmin )) ^ -1 self.fit = powerlaw.Fit(self.observed_counts, fit_method="KS", discrete=True) # This should probably be a pmf (not pdf); using discrete=True above. # original_data=False uses only the fitted data (within xmin and xmax). # pdf_bin_edges: The portion of the data within the bin. # observed_pdf: The probability density function (normalized histogram) # of the data. pdf_bin_edges, observed_pdf = self.fit.pdf(original_data=False) # See the 'Distribution' class described here for info: # https://pythonhosted.org/powerlaw/#powerlaw.Fit.pdf theoretical_distro = self.fit.power_law # The probability density function (normalized histogram) of the # theoretical distribution. predicted_pdf = theoretical_distro.pdf() self._set_fit_vars(observed_pdf, predicted_pdf, theoretical_distro) def _set_fit_vars(self, observed_pdf, predicted_pdf, theoretical_distro): # !!!! CRITICAL VALUE FOR ZIPF !!!! self.alpha = theoretical_distro.alpha # Exclusive xmin: The optimal xmin *beyond which* the scaling regime of # the power law fits best. self.xmin = int(theoretical_distro.xmin) self.xmax = theoretical_distro.xmax # Can be None if there isn't an xmax returned; # this handles that. self._set_xmax() self.ks_distance = theoretical_distro.KS() self.ks_test = ks_2samp(observed_pdf, predicted_pdf) self.p = self.ks_test[1] logs.info("KS test:") logs.info(self.ks_test) self.predicted_counts = self._calc_zipf_counts() def _make_rank_column(self): # TODO: These proportions may have already been calculated. prop_denom = float(sum(self.vocab_counts_df[self.cnt_str])) count_prop = self.vocab_counts_df[self.cnt_str] / prop_denom self.vocab_counts_df[self.prop_str] = count_prop rank_column = self.vocab_counts_df[self.cnt_str].rank( method="dense", numeric_only=True, ascending=False ) self.vocab_counts_df["rank"] = rank_column.astype("int64") def _calc_zipf_counts(self): """ The fit is based on an optimal xmin (minimum rank) Let's use this to make count estimates for the zipf fit, by multiplying the fitted pmf value by the sum of counts above xmin. :return: array of count values following the fitted pmf. """ logs.info("Getting predicted counts.") if not self.alpha: logs.warning("Have not yet fit -- need the alpha value.") logs.warning("Fitting now...") self.calc_fit() logs.info(self.word_counts_unique) logs.info(self.xmin) logs.info(self.xmax) # The subset of words that fit word_counts_fit_unique = self.word_counts_unique[ self.xmin + 1: self.xmax] pmf_mass = float(sum(word_counts_fit_unique)) zipf_counts = np.array( [self._estimate_count(rank, pmf_mass) for rank in self.word_ranks_unique] ) return zipf_counts def _estimate_count(self, rank, pmf_mass): return int(round(zipf_lib.pmf(rank, self.alpha) * pmf_mass)) def _set_xmax(self): """ xmax is usually None, so we add some handling to set it as the maximum rank in the dataset. :param xmax: :return: """ if self.xmax is not None: self.xmax = int(xmax) elif self.word_counts_unique: self.xmax = int(len(self.word_counts_unique)) elif self.word_ranks_unique: self.xmax = int(len(self.word_ranks_unique)) # TODO: This might fit better in its own file handling class? def get_zipf_fids(cache_path): zipf_cache_dir = pjoin(cache_path, "zipf") os.makedirs(zipf_cache_dir, exist_ok=True) # Zipf cache files zipf_fid = pjoin(zipf_cache_dir, "zipf_basic_stats.json") zipf_fig_fid = pjoin(zipf_cache_dir, "zipf_fig.json") zipf_fig_html_fid = pjoin(zipf_cache_dir, "zipf_fig.html") return zipf_fid, zipf_fig_fid, zipf_fig_html_fid def make_unique_rank_word_list(z): """ Function to help with the figure, creating strings for the hovertext. """ ranked_words = {} word_counts = z.word_counts_unique word_ranks = z.word_ranks_unique for count, rank in zip(word_counts, word_ranks): z.vocab_counts_df[z.vocab_counts_df[z.cnt_str] == count]["rank"] = rank ranked_words[rank] = ",".join( z.vocab_counts_df[ z.vocab_counts_df[z.cnt_str] == count].index.astype(str) ) # Use the hovertext kw argument for hover text ranked_words_list = [wrds for rank, wrds in sorted(ranked_words.items())] return ranked_words_list def make_zipf_fig(z): xmin = z.xmin word_ranks_unique = z.word_ranks_unique observed_counts = z.observed_counts zipf_counts = z.predicted_counts # "] #self.calc_zipf_counts() ranked_words_list = make_unique_rank_word_list(z) layout = go.Layout(xaxis=dict(range=[0, 100])) fig = go.Figure( data=[ go.Bar( x=word_ranks_unique, y=observed_counts, hovertext=ranked_words_list, name="Word Rank Frequency", ) ], layout=layout, ) fig.add_trace( go.Scatter( x=word_ranks_unique[xmin: len(word_ranks_unique)], y=zipf_counts[xmin: len(word_ranks_unique)], hovertext=ranked_words_list[xmin: len(word_ranks_unique)], line=go.scatter.Line(color="crimson", width=3), name="Zipf Predicted Frequency", ) ) # Customize aspect # fig.update_traces(marker_color='limegreen', # marker_line_width=1.5, opacity=0.6) fig.update_layout( title_text="Word Counts, Observed and Predicted by Zipf") fig.update_layout(xaxis_title="Word Rank") fig.update_layout(yaxis_title="Frequency") fig.update_layout( legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.10)) return fig