Spaces:
Running
Running
File size: 3,218 Bytes
eafbf97 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
"""Utility functions for pandas operations"""
from typing import List
import numpy as np
import pandas as pd
def apply_filters(df: pd.DataFrame, filters: dict, reset_index=False):
"""
Filters df based on given filters (key-values pairs).
"""
import omegaconf
X = df.copy()
all_indices = []
for col, values in filters.items():
if isinstance(values, (list, tuple, np.ndarray, omegaconf.listconfig.ListConfig)):
indices = X[col].isin(list(values))
else:
indices = X[col] == values
all_indices.append(indices)
# print(col, values, len(indices), sum(indices))
# X = X[indices]
if len(all_indices):
all_indices = np.array(all_indices)
indices = np.all(all_indices, axis=0)
X = X[indices]
if reset_index:
X = X.reset_index(drop=True)
return X
def apply_antifilters(df: pd.DataFrame, filters: dict, reset_index=False):
"""
Filters df removing rows for given filters (key-values pairs).
"""
X = df.copy()
for col, values in filters.items():
if isinstance(values, (list, tuple, np.ndarray)):
indices = X[col].isin(list(values))
else:
indices = X[col] == values
X = X[~indices]
if reset_index:
X = X.reset_index(drop=True)
return X
def custom_eval(x):
"""Splits string '["a", "b", "c"]' into ["a", "b", "c"]."""
if isinstance(x, str):
x = x.replace('[', '')
x = x.replace(']', '')
x = x.split(',')
x = [y.rstrip().lstrip() for y in x]
return x
else:
return ['NA']
def split_column_into_columns(df, column):
"""
For given df, splits `column` containing values like '["a", "b"]'
into one-hot subcolumns like a. b with `Yes`/`No` values.
"""
df[column] = df[column].apply(custom_eval)
unique_values = []
for i in range(len(df)):
index = df.index[i]
list_of_values = df.loc[index, column]
for x in list_of_values:
if (x != 'NA') and (x != ''):
df.at[index, x] = 'Yes'
if x not in unique_values:
unique_values.append(x)
df[unique_values] = df[unique_values].fillna('No')
df[f'any_{column}'] = df[unique_values].apply(
lambda x: 'Yes' if 'Yes' in list(x) else 'No', axis=1
)
return df
def custom_read_csv(path: str, columns_to_onehot: List) -> pd.DataFrame:
"""Custom CSV reader
Args:
path (str): path to .csv file
columns_to_onehot (List): list of columns to one-hotify
Returns:
pd.DataFrame: loaded df
"""
df = pd.read_csv(path)
for column in columns_to_onehot:
df = split_column_into_columns(df, column)
return df
def split_df(df, test_size=0.2):
from sklearn.model_selection import train_test_split
# split the dataframe into train and test sets
train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)
# split the train set into train and validation sets
train_df, val_df = train_test_split(train_df, test_size=test_size, random_state=42)
return train_df, val_df, test_df
|