File size: 4,931 Bytes
7860c2e
c83b398
 
7860c2e
 
 
7c8002a
c83b398
7860c2e
 
 
 
 
 
 
 
 
 
c83b398
 
 
 
 
 
 
7860c2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c83b398
7860c2e
962e6f5
 
 
 
7860c2e
 
 
 
c83b398
962e6f5
c83b398
7860c2e
 
c83b398
 
7860c2e
c83b398
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
962e6f5
 
c83b398
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import zipfile
from io import BytesIO

import pandas as pd
import streamlit as st

# === Fonctions ===


def find_header_row(df, keyword="Dist_Name"):
    for i in range(min(20, len(df))):
        row = df.iloc[i].astype(str).str.strip().str.lower()
        if any(keyword.lower() in str(cell) for cell in row):
            return i
    raise ValueError(f"No row with '{keyword}' found.")


def read_sheet_fallback(file_bytes, sheet):
    file_bytes.seek(0)
    return pd.read_excel(file_bytes, sheet_name=sheet, header=None, engine="calamine")


def load_clean_df(file_bytes, sheet):
    df_raw = read_sheet_fallback(file_bytes, sheet)
    header_row = find_header_row(df_raw)
    df_raw.columns = df_raw.iloc[header_row]
    df = df_raw.drop(index=list(range(header_row + 1)))
    df.columns = [str(c).strip().replace("\xa0", " ") for c in df.columns]
    df = df.astype(str).apply(lambda col: col.str.strip())
    return df


def detect_dist_col(columns):
    for col in columns:
        if "dist" in col.lower() and "name" in col.lower():
            return col
    raise ValueError("Dist_Name column not found.")


# === Interface Streamlit ===

st.title("πŸ“Š Dump Compare Tool")
st.markdown(
    ":blue[**Upload the old and new dumps, then input the object class (comma-separated) to compare**]"
)

old_file = st.file_uploader("Upload Old Dump (.xlsb)", type=["xlsb"], key="old")
new_file = st.file_uploader("Upload New Dump (.xlsb)", type=["xlsb"], key="new")

sheet_list_input = st.text_input(
    "Enter object class (comma-separated)", placeholder="e.g. BCF, BTS, CELL"
)

if st.button("Run Comparison", type="primary", use_container_width=True):
    if not all([old_file, new_file, sheet_list_input.strip()]):
        st.warning("Please upload both files and provide at least one sheet name.")
    else:
        sheet_names = [s.strip() for s in sheet_list_input.split(",") if s.strip()]
        old_bytes = BytesIO(old_file.read())
        new_bytes = BytesIO(new_file.read())

        logs = []
        total = 0
        all_results = {}

        for sheet in sheet_names:
            try:
                df_old = load_clean_df(old_bytes, sheet)
                old_bytes.seek(0)
                df_new = load_clean_df(new_bytes, sheet)
                new_bytes.seek(0)

                dist_col_old = detect_dist_col(df_old.columns)
                dist_col_new = detect_dist_col(df_new.columns)

                df_old = df_old[df_old[dist_col_old].notna()].set_index(dist_col_old)
                df_new = df_new[df_new[dist_col_new].notna()].set_index(dist_col_new)

                common = df_old.index.intersection(df_new.index)
                df_old_common = df_old.loc[common]
                df_new_common = df_new.loc[common]

                mask = (df_old_common != df_new_common) & ~(
                    df_old_common.isna() & df_new_common.isna()
                )

                changes = []
                for dist in mask.index:
                    for param in mask.columns[mask.loc[dist]]:
                        if param.strip().lower() == "file_name":
                            continue
                        changes.append(
                            {
                                "Dist_Name": dist,
                                "Parameter": param,
                                os.path.basename(old_file.name): df_old_common.loc[
                                    dist, param
                                ],
                                os.path.basename(new_file.name): df_new_common.loc[
                                    dist, param
                                ],
                            }
                        )

                df_changes = pd.DataFrame(changes)
                if not df_changes.empty:
                    all_results[sheet] = df_changes
                    logs.append(f"{len(df_changes)} changes in '{sheet}'")
                    total += len(df_changes)
                else:
                    logs.append(f"No changes in '{sheet}'")

            except Exception as e:
                logs.append(f"❌ Error in '{sheet}': {e}")

        st.success(f"βœ… Comparison completed. Total changes: {total}")
        for log in logs:
            st.write(log)

        if all_results:
            output_buffer = BytesIO()
            with zipfile.ZipFile(output_buffer, mode="w") as zf:
                for sheet, df in all_results.items():
                    file_buffer = BytesIO()
                    df.to_excel(file_buffer, index=False)
                    zf.writestr(f"{sheet}_differences.xlsx", file_buffer.getvalue())

            st.download_button(
                "Download Results (.zip)",
                data=output_buffer.getvalue(),
                file_name="differences.zip",
                mime="application/zip",
                type="primary",
                on_click="ignore",
            )