Spaces:
Sleeping
Sleeping
File size: 7,578 Bytes
9fa4f5e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import re
import pandas as pd
import numpy as np
from time import perf_counter
import time
# Constants
EMPTY_THRESHOLD = 0.5
LOW_COUNT_THRESHOLD = 2
VALID_DATA_THRESHOLD = 0.5
def print_dataframe_info(df, step=""):
num_columns = len(df.columns)
num_rows = len(df)
num_cells = num_columns * num_rows
print(f"{step}Dataframe info:")
print(f" Number of columns: {num_columns}")
print(f" Number of rows: {num_rows}")
print(f" Total number of cells: {num_cells}")
def check_and_normalize_column_headers(df):
print("Checking and normalizing column headers...")
df.columns = df.columns.str.lower().str.replace(' ', '_')
df.columns = [re.sub(r'[^0-9a-zA-Z_]', '', col) for col in df.columns]
print("Column names have been normalized.")
return df
def remove_empty_columns(df, threshold=EMPTY_THRESHOLD):
print(f"Removing columns with less than {threshold * 100}% valid data...")
return df.dropna(axis=1, thresh=int(threshold * len(df)))
def remove_empty_rows(df, threshold=EMPTY_THRESHOLD):
print(f"Removing rows with less than {threshold * 100}% valid data...")
return df.dropna(thresh=int(threshold * len(df.columns)))
def drop_rows_with_nas(df, threshold=VALID_DATA_THRESHOLD):
print(f"Dropping rows with NAs for columns with more than {threshold * 100}% valid data...")
valid_columns = df.columns[df.notna().mean() > threshold]
return df.dropna(subset=valid_columns)
def check_typos(df, column_name, threshold=2, top_n=100):
if df[column_name].dtype != 'object':
print(f"Skipping typo check for column {column_name} as it is not a string type.")
return None
print(f"Checking for typos in column: {column_name}")
try:
value_counts = df[column_name].value_counts()
top_values = value_counts.head(top_n).index.tolist()
def find_similar_strings(value):
if pd.isna(value):
return []
return [tv for tv in top_values if value != tv and levenshtein_distance(value, tv) <= threshold]
df['possible_typos'] = df[column_name].apply(find_similar_strings)
typos_df = df[df['possible_typos'].apply(len) > 0][[column_name, 'possible_typos']]
typo_count = len(typos_df)
if typo_count > 0:
print(f"Potential typos found in column {column_name}: {typo_count}")
print(typos_df.head(10))
return typos_df
else:
print(f"No potential typos found in column {column_name}")
return None
except Exception as e:
print(f"Unexpected error in check_typos for column {column_name}: {str(e)}")
return None
def levenshtein_distance(s1, s2):
if len(s1) < len(s2):
return levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
def transform_string_column(df, column_name):
print(f"Transforming string column: {column_name}")
df[column_name] = df[column_name].str.lower()
df[column_name] = df[column_name].str.strip()
df[column_name] = df[column_name].str.replace(r'\s+', ' ', regex=True)
df[column_name] = df[column_name].str.replace(r'[^a-zA-Z0-9\s/:.-]', '', regex=True)
return df
def clean_column(df, column_name):
print(f"Cleaning column: {column_name}")
start_time = perf_counter()
if df[column_name].dtype == 'object':
typos_df = check_typos(df, column_name)
if typos_df is not None and len(typos_df) > 0:
print(f"Detailed typos for column {column_name}:")
print(typos_df)
df = transform_string_column(df, column_name)
elif pd.api.types.is_numeric_dtype(df[column_name]):
df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
end_time = perf_counter()
print(f"Time taken to clean {column_name}: {end_time - start_time:.6f} seconds")
return df
def remove_outliers(df, column):
print(f"Removing outliers from column: {column}")
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]
def calculate_nonconforming_cells(df):
return df.isna().sum().to_dict()
def get_numeric_columns(df):
return df.select_dtypes(include=[np.number]).columns.tolist()
def remove_duplicates_from_primary_key(df, primary_key_column):
print(f"Removing duplicates based on primary key column: {primary_key_column}")
return df.drop_duplicates(subset=[primary_key_column])
def clean_data(df, primary_key_column, progress):
start_time = time.time()
process_times = {}
print("Starting data validation and cleaning...")
print_dataframe_info(df, "Initial - ")
nonconforming_cells_before = calculate_nonconforming_cells(df)
progress(0.1, desc="Normalizing column headers")
step_start_time = time.time()
df = check_and_normalize_column_headers(df)
process_times['Normalize headers'] = time.time() - step_start_time
progress(0.2, desc="Removing empty columns")
step_start_time = time.time()
df = remove_empty_columns(df)
print('2) count of valid rows:', len(df))
process_times['Remove empty columns'] = time.time() - step_start_time
progress(0.3, desc="Removing empty rows")
step_start_time = time.time()
df = remove_empty_rows(df)
print('3) count of valid rows:', len(df))
process_times['Remove empty rows'] = time.time() - step_start_time
progress(0.4, desc="Dropping rows with NAs")
step_start_time = time.time()
df = drop_rows_with_nas(df)
print('4) count of valid rows:', len(df))
process_times['Drop rows with NAs'] = time.time() - step_start_time
column_cleaning_times = {}
total_columns = len(df.columns)
for index, column in enumerate(df.columns):
progress(0.5 + (0.2 * (index / total_columns)), desc=f"Cleaning column: {column}")
column_start_time = time.time()
df = clean_column(df, column)
print('5) count of valid rows:', len(df))
column_cleaning_times[f"Clean column: {column}"] = time.time() - column_start_time
process_times.update(column_cleaning_times)
progress(0.7, desc="Removing outliers")
step_start_time = time.time()
numeric_columns = get_numeric_columns(df)
numeric_columns = [col for col in numeric_columns if col != primary_key_column]
for column in numeric_columns:
df = remove_outliers(df, column)
print('6) count of valid rows:', len(df))
process_times['Remove outliers'] = time.time() - step_start_time
progress(0.8, desc="Removing duplicates from primary key")
step_start_time = time.time()
df = remove_duplicates_from_primary_key(df, primary_key_column)
print('7) count of valid rows:', len(df))
print("Cleaning process completed.")
print_dataframe_info(df, "Final - ")
return df, nonconforming_cells_before, process_times |