File size: 7,268 Bytes
f43fd7e
 
 
 
 
 
 
 
 
 
 
ecb5372
f43fd7e
 
 
 
 
 
 
 
 
ecb5372
f43fd7e
ecb5372
f43fd7e
 
ecb5372
f43fd7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7056566
f43fd7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7056566
f43fd7e
 
 
 
 
 
 
 
bf1894e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f43fd7e
 
 
 
bf1894e
 
 
f43fd7e
 
 
bf1894e
f43fd7e
 
 
bf1894e
f43fd7e
 
 
 
 
 
 
 
bf1894e
f43fd7e
 
 
 
 
 
 
bf1894e
f43fd7e
 
 
 
 
 
 
ecb5372
f43fd7e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import pandas as pd
from sklearn.impute import KNNImputer
import gradio as gr

def get_clean_columns(df):
    df.columns = df.columns.str.replace('\xa0', ' ', regex=True).str.strip()
    return df

def get_imputable_columns(file_obj):
    """Reads a CSV and returns a list of numeric columns with NaNs eligible for user selection and a warning."""
    if file_obj is None:
        return gr.update(choices=[], value=[]), gr.update(value="", visible=False)

    try:
        df = pd.read_csv(file_obj)
        df = get_clean_columns(df)

        all_numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
        cols_with_nan = df[all_numeric_cols].columns[df[all_numeric_cols].isnull().any()].tolist()

        if not cols_with_nan:
            return gr.update(choices=[], value=[]), gr.update(value="⚠️ No numeric columns with missing values found.", visible=True)

        return gr.update(choices=cols_with_nan, value=cols_with_nan), gr.update(value="", visible=False)

    except Exception as e:
        return gr.update(choices=[], value=[]), gr.update(value=f"⚠️ Could not read file: {e}", visible=True)

def impute_and_analyze_data(input_file, selected_columns, n_neighbors=5):
    if input_file is None:
        raise gr.Error("Please upload a CSV file.")
    if not selected_columns:
        raise gr.Error("Please select at least one column for imputation.")

    try:
        df = pd.read_csv(input_file)
        df = get_clean_columns(df)
        df_original = df.copy()

        stats_data = []
        replacements_data = []

        valid_selected_columns = [
            col for col in selected_columns
            if col in df.columns and pd.api.types.is_numeric_dtype(df[col])
        ]

        if not valid_selected_columns:
            raise gr.Error("No valid numeric columns selected for imputation.")

        original_nan_counts = {
            col: df_original[col].isnull().sum()
            for col in valid_selected_columns
        }

        for col in valid_selected_columns:
            stats_data.append({
                'Metric': 'Before Imputation',
                'Column': col,
                'Mean': df_original[col].mean(),
                'Median': df_original[col].median(),
                'Std Dev': df_original[col].std(),
                'Min': df_original[col].min(),
                'Max': df_original[col].max()
            })

        df_for_imputation = df[valid_selected_columns].copy()
        df_for_imputation = df_for_imputation.dropna(axis=1, how='all')
        imputation_target_cols = df_for_imputation.columns.tolist()

        if not imputation_target_cols:
            output_path = f"imputed_data_{input_file.split('/')[-1]}"
            df.to_csv(output_path, index=False)
            stats_df = pd.DataFrame(stats_data)
            replacement_df = pd.DataFrame(replacements_data)
            return output_path, stats_df, replacement_df, gr.update(value="⚠️ No columns eligible for imputation after preprocessing.", visible=True)

        means = df_for_imputation.mean()
        stds  = df_for_imputation.std()

        df_std = (df_for_imputation - means) / stds
        imputer = KNNImputer(n_neighbors=n_neighbors)

        df_imputed_std = pd.DataFrame(
            imputer.fit_transform(df_std),
            columns=imputation_target_cols,
            index=df_std.index
        )

        df_imputed = df_imputed_std * stds + means

        df_final = df.copy()
        after_nan_counts = {}

        for col in imputation_target_cols:
            df_final[col] = df_imputed[col]
            after_nan_counts[col] = df_final[col].isnull().sum()
            replacements = original_nan_counts.get(col, 0) - after_nan_counts.get(col, 0)
            if replacements < 0:
                replacements = original_nan_counts.get(col, 0)

            replacements_data.append({
                'Column': col,
                'Original NaNs': original_nan_counts.get(col, 0),
                'Replacements Made': replacements
            })

            stats_data.append({
                'Metric': 'After Imputation',
                'Column': col,
                'Mean': df_final[col].mean(),
                'Median': df_final[col].median(),
                'Std Dev': df_final[col].std(),
                'Min': df_final[col].min(),
                'Max': df_final[col].max()
            })

        output_path = f"imputed_data_{input_file.split('/')[-1]}"
        df_final.to_csv(output_path, index=False)

        stats_df = pd.DataFrame(stats_data)
        stats_df[["Mean", "Median", "Std Dev"]] = stats_df[["Mean", "Median", "Std Dev"]].round(4)
        stats_df['order_key'] = stats_df['Column'] + stats_df['Metric']
        stats_df = stats_df.sort_values(by=['Column', 'Metric'], ascending=[True, False]).drop(columns='order_key')
        replacement_df = pd.DataFrame(replacements_data)

        return output_path, stats_df, replacement_df, gr.update(value="", visible=False)

    except pd.errors.EmptyDataError:
        raise gr.Error("The uploaded file is empty.")
    except pd.errors.ParserError:
        raise gr.Error("Could not parse the CSV file. Please ensure it's a valid CSV.")
    except Exception as e:
        raise gr.Error(f"An unexpected error occurred: {e}")


# Create .ris citation file
ris_content = """TY  - COMP
T1  - Missing value analysis with KNN Impute
AU  - Mat Roni, S.
PY  - 2025
VL  - 1.0
PB  - Hugging Face
UR  - https://huggingface.co/spaces/pvaluedotone/knn_impute
ER  -
"""

with open("citation.ris", "w") as f:
    f.write(ris_content)
    
# --- Gradio Interface ---
with gr.Blocks() as app:
    gr.Markdown("# Missing value analysis with KNN Imputer")
    gr.Markdown("Upload your CSV dataset, select columns for imputation, and view statistics and imputation details.")
    gr.Markdown("**Citation:** Mat Roni, S. (2025). *Missing value analysis with KNN Impute* (version 1.0) [software]. [https://huggingface.co/spaces/pvaluedotone/knn_impute](https://huggingface.co/spaces/pvaluedotone/knn_impute)")
    gr.File(value="citation.ris", label="Download citation (.ris)", interactive=False)


    with gr.Row():
        file_input = gr.File(label="Upload CSV file (.csv)", type="filepath")
        n_neighbors_slider = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Number of neighbours (n_neighbors)", interactive=True)

    warning_box = gr.Markdown("", visible=False)

    column_checkboxes = gr.CheckboxGroup(label="Select columns for impute", choices=[], value=[])

    file_input.change(
        fn=get_imputable_columns,
        inputs=file_input,
        outputs=[column_checkboxes, warning_box],
        queue=False
    )

    impute_button = gr.Button("Run imputation")

    with gr.Row():
        download_output = gr.File(label="Download imputed CSV")
        
    with gr.Row():
        replacement_display = gr.DataFrame(label="Replacement summary")
    with gr.Row():
        stats_display = gr.DataFrame(label="Before/after imputation statistics")

    impute_button.click(
        fn=impute_and_analyze_data,
        inputs=[file_input, column_checkboxes, n_neighbors_slider],
        outputs=[download_output, stats_display, replacement_display, warning_box]
    )

app.launch(debug=True)