|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
from pathlib import Path |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import powerlaw |
|
import streamlit as st |
|
from scipy.stats import ks_2samp |
|
from scipy.stats import zipf as zipf_lib |
|
|
|
from .dataset_utils import CNT, PROP |
|
|
|
|
|
|
|
pd.set_option("use_inf_as_na", True) |
|
|
|
logs = logging.getLogger(__name__) |
|
logs.setLevel(logging.INFO) |
|
logs.propagate = False |
|
|
|
if not logs.handlers: |
|
|
|
Path('./log_files').mkdir(exist_ok=True) |
|
|
|
|
|
file = logging.FileHandler("./log_files/zipf.log") |
|
fileformat = logging.Formatter("%(asctime)s:%(message)s") |
|
file.setLevel(logging.INFO) |
|
file.setFormatter(fileformat) |
|
|
|
|
|
stream = logging.StreamHandler() |
|
streamformat = logging.Formatter("[data_measurements_tool] %(message)s") |
|
stream.setLevel(logging.WARNING) |
|
stream.setFormatter(streamformat) |
|
|
|
logs.addHandler(file) |
|
logs.addHandler(stream) |
|
|
|
|
|
class Zipf: |
|
def __init__(self, vocab_counts_df=pd.DataFrame()): |
|
self.vocab_counts_df = vocab_counts_df |
|
self.alpha = None |
|
self.xmin = None |
|
self.xmax = None |
|
self.fit = None |
|
self.ranked_words = {} |
|
self.uniq_counts = [] |
|
self.uniq_ranks = [] |
|
self.uniq_fit_counts = None |
|
self.term_df = None |
|
self.pvalue = None |
|
self.ks_test = None |
|
self.distance = None |
|
self.fit = None |
|
self.predicted_zipf_counts = None |
|
if not self.vocab_counts_df.empty: |
|
logs.info("Fitting based on input vocab counts.") |
|
self.calc_fit(vocab_counts_df) |
|
logs.info("Getting predicted counts.") |
|
self.predicted_zipf_counts = self.calc_zipf_counts(vocab_counts_df) |
|
|
|
def load(self, zipf_dict): |
|
self.set_xmin(zipf_dict["xmin"]) |
|
self.set_xmax(zipf_dict["xmax"]) |
|
self.set_alpha(zipf_dict["alpha"]) |
|
self.set_ks_distance(zipf_dict["ks_distance"]) |
|
self.set_p(zipf_dict["p-value"]) |
|
self.set_unique_ranks(zipf_dict["uniq_ranks"]) |
|
self.set_unique_counts(zipf_dict["uniq_counts"]) |
|
|
|
def calc_fit(self, vocab_counts_df): |
|
""" |
|
Uses the powerlaw package to fit the observed frequencies to a zipfian distribution. |
|
We use the KS-distance to fit, as that seems more appropriate that MLE. |
|
:param vocab_counts_df: |
|
:return: |
|
""" |
|
self.vocab_counts_df = vocab_counts_df |
|
|
|
vocab_counts_df[PROP] = vocab_counts_df[CNT] / float(sum(vocab_counts_df[CNT])) |
|
rank_column = vocab_counts_df[CNT].rank( |
|
method="dense", numeric_only=True, ascending=False |
|
) |
|
vocab_counts_df["rank"] = rank_column.astype("int64") |
|
observed_counts = vocab_counts_df[CNT].values |
|
|
|
|
|
self.fit = powerlaw.Fit(observed_counts, fit_method="KS", discrete=True) |
|
|
|
|
|
|
|
|
|
|
|
pdf_bin_edges, observed_pdf = self.fit.pdf(original_data=False) |
|
|
|
|
|
theoretical_distro = self.fit.power_law |
|
|
|
|
|
predicted_pdf = theoretical_distro.pdf() |
|
|
|
self.alpha = theoretical_distro.alpha |
|
|
|
|
|
self.xmin = theoretical_distro.xmin |
|
self.xmax = theoretical_distro.xmax |
|
self.distance = theoretical_distro.KS() |
|
self.ks_test = ks_2samp(observed_pdf, predicted_pdf) |
|
self.pvalue = self.ks_test[1] |
|
logs.info("KS test:") |
|
logs.info(self.ks_test) |
|
|
|
def set_xmax(self, xmax): |
|
""" |
|
xmax is usually None, so we add some handling to set it as the |
|
maximum rank in the dataset. |
|
:param xmax: |
|
:return: |
|
""" |
|
if xmax: |
|
self.xmax = int(xmax) |
|
elif self.uniq_counts: |
|
self.xmax = int(len(self.uniq_counts)) |
|
elif self.uniq_ranks: |
|
self.xmax = int(len(self.uniq_ranks)) |
|
|
|
def get_xmax(self): |
|
""" |
|
:return: |
|
""" |
|
if not self.xmax: |
|
self.set_xmax(self.xmax) |
|
return self.xmax |
|
|
|
def set_p(self, p): |
|
self.p = int(p) |
|
|
|
def get_p(self): |
|
return int(self.p) |
|
|
|
def set_xmin(self, xmin): |
|
self.xmin = xmin |
|
|
|
def get_xmin(self): |
|
if self.xmin: |
|
return int(self.xmin) |
|
return self.xmin |
|
|
|
def set_alpha(self, alpha): |
|
self.alpha = float(alpha) |
|
|
|
def get_alpha(self): |
|
return float(self.alpha) |
|
|
|
def set_ks_distance(self, distance): |
|
self.distance = float(distance) |
|
|
|
def get_ks_distance(self): |
|
return self.distance |
|
|
|
def calc_zipf_counts(self, vocab_counts_df): |
|
""" |
|
The fit is based on an optimal xmin (minimum rank) |
|
Let's use this to make count estimates for the zipf fit, |
|
by multiplying the fitted pmf value by the sum of counts above xmin. |
|
:return: array of count values following the fitted pmf. |
|
""" |
|
|
|
counts = vocab_counts_df[CNT] |
|
self.uniq_counts = list(pd.unique(counts)) |
|
self.uniq_ranks = list(np.arange(1, len(self.uniq_counts) + 1)) |
|
logs.info(self.uniq_counts) |
|
logs.info(self.xmin) |
|
logs.info(self.xmax) |
|
|
|
xmin = self.get_xmin() |
|
xmax = self.get_xmax() |
|
self.uniq_fit_counts = self.uniq_counts[xmin + 1 : xmax] |
|
pmf_mass = float(sum(self.uniq_fit_counts)) |
|
zipf_counts = np.array( |
|
[self.estimate_count(rank, pmf_mass) for rank in self.uniq_ranks] |
|
) |
|
return zipf_counts |
|
|
|
def estimate_count(self, rank, pmf_mass): |
|
return int(round(zipf_lib.pmf(rank, self.alpha) * pmf_mass)) |
|
|
|
def set_unique_ranks(self, ranks): |
|
self.uniq_ranks = ranks |
|
|
|
def get_unique_ranks(self): |
|
return self.uniq_ranks |
|
|
|
def get_unique_fit_counts(self): |
|
return self.uniq_fit_counts |
|
|
|
def set_unique_counts(self, counts): |
|
self.uniq_counts = counts |
|
|
|
def get_unique_counts(self): |
|
return self.uniq_counts |
|
|
|
def set_axes(self, unique_counts, unique_ranks): |
|
self.uniq_counts = unique_counts |
|
self.uniq_ranks = unique_ranks |
|
|
|
|
|
def fit_others(self, fit): |
|
st.markdown( |
|
"_Checking log likelihood ratio to see if the data is better explained by other well-behaved distributions..._" |
|
) |
|
|
|
better_distro = False |
|
trunc = fit.distribution_compare("power_law", "truncated_power_law") |
|
if trunc[0] < 0: |
|
st.markdown("Seems a truncated power law is a better fit.") |
|
better_distro = True |
|
|
|
lognormal = fit.distribution_compare("power_law", "lognormal") |
|
if lognormal[0] < 0: |
|
st.markdown("Seems a lognormal distribution is a better fit.") |
|
st.markdown("But don't panic -- that happens sometimes with language.") |
|
better_distro = True |
|
|
|
exponential = fit.distribution_compare("power_law", "exponential") |
|
if exponential[0] < 0: |
|
st.markdown("Seems an exponential distribution is a better fit. Panic.") |
|
better_distro = True |
|
|
|
if not better_distro: |
|
st.markdown("\nSeems your data is best fit by a power law. Celebrate!!") |
|
|