JustinTX's picture
Add files using upload-large-folder tool
517cbd2 verified
import pandas as pd
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor
from utils import Trie
import time
class Algorithm:
def __init__(self, df: pd.DataFrame = None):
self.df = df
def reorder(self, df: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError("Subclasses should implement this!")
@staticmethod
def evaluate_df_prefix_hit_cnt(self, df: pd.DataFrame) -> int:
"""
Function to evaluate the prefix hit count of a DataFrame
"""
def max_overlap(trie, row_string):
return trie.longest_common_prefix(row_string)
trie = Trie()
total_prefix_hit_count = 0
def process_row(index, row):
row_string = "".join(row.astype(str).values) # No spaces between columns
row_prefix_hit_count = max_overlap(trie, row_string)
trie.insert(row_string)
return row_prefix_hit_count
with ThreadPoolExecutor() as executor:
results = executor.map(process_row, df.index, [row for _, row in df.iterrows()])
total_prefix_hit_count = sum(results)
return total_prefix_hit_count
@staticmethod
def evaluate_cell_hit_cnt(df: pd.DataFrame) -> int:
"""
Function to evaluate the prefix hit count of a DataFrame based on exact cell matching.
For a cell to be a hit, all previous cells in the row must also be hits.
"""
total_prefix_hit_count = 0
seen_rows = set() # Cache of fully processed rows
def process_row(index, row):
nonlocal seen_rows
prefix_hit_count = 0
current_row_cache = []
for col_value in row:
# Check if adding this cell matches exactly with prior cache
current_row_cache.append(col_value)
if tuple(current_row_cache) in seen_rows:
prefix_hit_count += 1
else:
break # Stop counting hits if any cell isn't in the cache
seen_rows.add(tuple(row)) # Add the fully processed row to cache
return prefix_hit_count
# Process each row sequentially (row-to-row comparison for hits)
for _, row in df.iterrows():
total_prefix_hit_count += process_row(_, row)
return total_prefix_hit_count
@staticmethod
def get_groups_values(df: pd.DataFrame):
"""
Function to get the value counts of a DataFrame
"""
if df.empty:
return {}
value_counts = df.stack().value_counts()
if value_counts.empty:
return {}
return value_counts
@staticmethod
def calculate_length(value):
val = 0
if isinstance(value, bool):
val = 4 # length of 'True' or 'False'
elif isinstance(value, (int, float)):
val = len(str(value))
elif isinstance(value, str):
val = len(value)
else:
val = 0
return val**2
@staticmethod
def drop_col(df: pd.DataFrame, col):
return df.drop(columns=[col])
@staticmethod
def drop_rows(df: pd.DataFrame, rows):
return df.drop(index=rows)
@staticmethod
def merging_columns(df: pd.DataFrame, col_names: List[str], delimiter: str = "_", prepended: bool = False) -> pd.DataFrame:
if not all(col in df.columns for col in col_names):
raise ValueError("Column names not found in DataFrame")
# before merging, check that each column to be merged has the same number of unique values
if len(set(df[col_names].nunique())) != 1:
raise ValueError(f"Columns to be merged {col_names}, do not have the same number of unique values: {df.nunique().sort_values()}")
merged_names = delimiter.join(col_names)
if prepended:
df[merged_names] = df[col_names].apply(
lambda x: merged_names + ": " + delimiter.join([val.split(": ", 1)[1] for col, val in zip(col_names, x)]), axis=1
)
else:
df[merged_names] = df[col_names].apply(lambda x: "".join([f"{val}" for val in x]), axis=1)
df = df.drop(columns=col_names)
return df
@staticmethod
def calculate_col_stats(df: pd.DataFrame, enable_index=False):
num_rows = len(df)
column_stats = []
for col in df.columns:
if col == "original_index":
continue
num_groups = df[col].nunique()
if df[col].dtype == "object" or df[col].dtype == "string":
avg_length = df[col].astype(str).str.len().mean()
elif df[col].dtype == "bool":
avg_length = 4 # Assuming 'True' or 'False' as average length
elif df[col].dtype in ["int64", "float64"]:
avg_length = df[col].astype(str).str.len().mean()
else:
avg_length = 0
avg_length = avg_length**2
if num_groups == 0:
score = 0
else:
# Average size per group: number of rows in each group
avg_size_per_group = num_rows / num_groups
# score = avg_size_per_group * avg_length
score = avg_length * (avg_size_per_group - 1)
if num_rows == num_groups: # no sharing at all
score = 0
column_stats.append((col, num_groups, avg_length, score))
# original_index all distinct values, so give lowest score
if enable_index and "original_index" in df.columns:
column_stats.append(("original_index", len(df), 0, 0))
# Sort the columns based on the score
column_stats.sort(key=lambda x: x[3], reverse=True)
return num_rows, column_stats