| |
| import streamlit as st |
| import pandas as pd |
| import numpy as np |
| import jellyfish |
| import io |
| import uuid |
| from io import BytesIO |
|
|
| from st_link_analysis import st_link_analysis, NodeStyle, EdgeStyle |
|
|
| |
| try: |
| import networkx as nx |
| HAS_NETWORKX = True |
| except ImportError: |
| HAS_NETWORKX = False |
|
|
| |
| |
| |
| DEFAULT_NODE_LABEL = "Record" |
| DEFAULT_REL_TYPE = "SIMILAR" |
| DEFAULT_THRESHOLD = 0.80 |
| MAX_REDLINE_PREVIEW = 10 |
|
|
| st.set_page_config( |
| page_title="CSV ER & Network Graph", |
| layout="wide", |
| initial_sidebar_state="expanded" |
| ) |
| st.title("Entity Resolution on CSV (Network Graph)") |
|
|
| |
| |
| |
| st.sidebar.header("Upload CSV for Entity Resolution") |
| uploaded_file = st.sidebar.file_uploader( |
| "π Choose a CSV file", |
| type=["csv"], |
| help="Drag and drop your CSV file here or click to browse", |
| accept_multiple_files=False, |
| key="csv_uploader" |
| ) |
|
|
| |
| if uploaded_file is None: |
| st.sidebar.info("π **Drag & drop** your CSV file above or click to browse") |
| st.sidebar.markdown("**Supported formats:** `.csv` files") |
| st.sidebar.markdown("**Max size:** 200MB") |
|
|
| |
| if 'uploaded_data_df' not in st.session_state: |
| st.session_state.uploaded_data_df = None |
| if 'last_uploaded_file' not in st.session_state: |
| st.session_state.last_uploaded_file = None |
|
|
| |
| if uploaded_file is not None: |
| |
| file_id = f"{uploaded_file.name}_{uploaded_file.size}" |
| |
| if 'current_file_id' not in st.session_state: |
| st.session_state.current_file_id = None |
| |
| |
| if st.session_state.current_file_id != file_id: |
| st.sidebar.info("π Processing uploaded file...") |
| |
| |
| df = None |
| success_method = None |
| |
| |
| try: |
| df = pd.read_csv(uploaded_file) |
| success_method = "Direct Read" |
| except Exception as e1: |
| st.sidebar.warning(f"Method 1 failed: {str(e1)[:50]}...") |
| |
| |
| try: |
| uploaded_file.seek(0) |
| bytes_data = uploaded_file.getvalue() |
| df = pd.read_csv(BytesIO(bytes_data)) |
| success_method = "Bytes Read" |
| except Exception as e2: |
| st.sidebar.warning(f"Method 2 failed: {str(e2)[:50]}...") |
| |
| |
| try: |
| uploaded_file.seek(0) |
| string_data = uploaded_file.getvalue().decode("utf-8") |
| df = pd.read_csv(io.StringIO(string_data)) |
| success_method = "String Read" |
| except Exception as e3: |
| st.sidebar.error(f"Method 3 failed: {str(e3)[:50]}...") |
| |
| |
| try: |
| uploaded_file.seek(0) |
| raw_data = uploaded_file.read() |
| if isinstance(raw_data, bytes): |
| string_data = raw_data.decode('utf-8') |
| else: |
| string_data = raw_data |
| df = pd.read_csv(io.StringIO(string_data)) |
| success_method = "Force UTF-8" |
| except Exception as e4: |
| st.sidebar.error(f"All methods failed. Last error: {str(e4)}") |
| |
| |
| if df is not None: |
| st.session_state.uploaded_data_df = df |
| st.session_state.last_uploaded_file = uploaded_file.name |
| st.session_state.current_file_id = file_id |
| |
| st.sidebar.success(f"β
**File loaded successfully!**") |
| st.sidebar.success(f"π **{uploaded_file.name}** ({len(df)} rows, {len(df.columns)} columns)") |
| st.sidebar.info(f"π§ Used method: {success_method}") |
| else: |
| st.sidebar.error("β **All reading methods failed!**") |
| st.sidebar.info("π‘ Try the manual processing button below or paste your data") |
| st.session_state.uploaded_data_df = None |
| else: |
| |
| if st.session_state.uploaded_data_df is not None: |
| st.sidebar.success(f"β
**File already loaded:** {uploaded_file.name}") |
| st.sidebar.info(f"π {len(st.session_state.uploaded_data_df)} rows, {len(st.session_state.uploaded_data_df.columns)} columns") |
|
|
| |
| if uploaded_file is not None and st.session_state.uploaded_data_df is None: |
| if st.sidebar.button("π Process Uploaded File"): |
| try: |
| df = pd.read_csv(uploaded_file) |
| st.session_state.uploaded_data_df = df |
| st.session_state.last_uploaded_file = uploaded_file.name |
| st.sidebar.success("β
File processed manually!") |
| st.rerun() |
| except Exception as e: |
| st.sidebar.error(f"β Error: {str(e)}") |
|
|
| |
| if st.session_state.uploaded_data_df is not None: |
| st.sidebar.write(f"**Current Data:** {len(st.session_state.uploaded_data_df)} rows loaded") |
| else: |
| st.sidebar.write("**Current Data:** None") |
|
|
| |
| if st.sidebar.button("ποΈ Clear All Data"): |
| st.session_state.uploaded_data_df = None |
| st.session_state.last_uploaded_file = None |
| st.sidebar.success("Data cleared!") |
|
|
| |
| st.sidebar.markdown("---") |
| st.sidebar.markdown("**Alternative: Paste CSV data:**") |
| csv_text = st.sidebar.text_area( |
| "Paste your CSV data here:", |
| height=100, |
| placeholder="first_name,last_name,email_address,phone_number\nJohn,Smith,john@email.com,555-0123\nJane,Doe,jane@email.com,555-0456" |
| ) |
|
|
| if st.sidebar.button("π Process Pasted Data") and csv_text.strip(): |
| try: |
| |
| from io import StringIO |
| df = pd.read_csv(StringIO(csv_text)) |
| st.session_state.uploaded_data_df = df |
| st.session_state.last_uploaded_file = "pasted_data" |
| st.sidebar.success(f"β
**Pasted data loaded!** ({len(df)} rows, {len(df.columns)} columns)") |
| except Exception as e: |
| st.sidebar.error(f"β **Error parsing CSV:** {str(e)}") |
| st.sidebar.info("π‘ Make sure your data is in proper CSV format with headers") |
|
|
| |
| st.sidebar.markdown("---") |
| st.sidebar.markdown("**Or use sample data:**") |
| if st.sidebar.button("Use Sample Data"): |
| |
| st.session_state.uploaded_data_df = pd.DataFrame({ |
| 'first_name': ['John', 'Jon', 'Jane', 'Jain', 'Mike', 'Michael'], |
| 'last_name': ['Smith', 'Smith', 'Doe', 'Doe', 'Johnson', 'Johnson'], |
| 'email_address': ['john.smith@email.com', 'j.smith@gmail.com', 'jane.doe@company.com', 'jdoe@company.com', 'mike.j@work.com', 'michael.johnson@work.com'], |
| 'phone_number': ['555-0123', '555-0123', '555-0456', '(555) 456-0000', '555-0789', '5550789'] |
| }) |
| st.session_state.last_uploaded_file = "sample_data" |
| st.sidebar.success("Sample data loaded!") |
|
|
| similarity_threshold = st.sidebar.slider( |
| "Similarity Threshold", |
| min_value=0.0, |
| max_value=1.0, |
| value=DEFAULT_THRESHOLD, |
| step=0.01 |
| ) |
|
|
| |
| st.sidebar.header("Similarity Columns") |
| |
| |
| default_cols = "first_name,last_name,email_address,phone_number" |
| similarity_cols_raw = st.sidebar.text_input( |
| "Columns to compare (comma-separated):", |
| value=default_cols |
| ) |
| similarity_cols = [c.strip() for c in similarity_cols_raw.split(",") if c.strip()] |
|
|
| |
| show_redlining = st.sidebar.checkbox("Show red-lined differences for top pairs", value=True) |
|
|
| |
| df = None |
| elements = {"nodes": [], "edges": []} |
|
|
|
|
| |
| |
| |
| def jaro_winkler_score(str1, str2): |
| """Simple wrapper around jellyfish.jaro_winkler for string similarity.""" |
| return jellyfish.jaro_winkler_similarity(str1 or "", str2 or "") |
|
|
| def overall_similarity(row1, row2, cols): |
| """ |
| Compute an average similarity across the provided columns. |
| You could weight them or do more sophisticated logic. |
| """ |
| scores = [] |
| for col in cols: |
| val1 = str(row1.get(col, "")).lower() |
| val2 = str(row2.get(col, "")).lower() |
| if val1 == "" or val2 == "": |
| |
| continue |
| sim = jaro_winkler_score(val1, val2) |
| scores.append(sim) |
| if len(scores) == 0: |
| return 0.0 |
| return sum(scores) / len(scores) |
|
|
| def redline_text(str1, str2): |
| """ |
| A simplistic "red-lining" of differences: |
| We'll highlight mismatched characters in red. |
| This helps show how two strings differ. |
| """ |
| |
| |
| |
| out = [] |
| max_len = max(len(str1), len(str2)) |
| for i in range(max_len): |
| c1 = str1[i] if i < len(str1) else "" |
| c2 = str2[i] if i < len(str2) else "" |
| if c1 == c2: |
| out.append(c1) |
| else: |
| |
| out.append(f"<span style='color:red'>{c1 or '_'}</span>") |
| |
| |
| return "".join(out) |
|
|
| def find_connected_components_manual(nodes, edges): |
| """ |
| Manual implementation of connected components finding. |
| Fallback when NetworkX is not available. |
| """ |
| |
| adj_list = {node: set() for node in nodes} |
| for edge in edges: |
| source = edge["data"]["source"] |
| target = edge["data"]["target"] |
| adj_list[source].add(target) |
| adj_list[target].add(source) |
| |
| visited = set() |
| components = [] |
| |
| def dfs(node, component): |
| if node in visited: |
| return |
| visited.add(node) |
| component.add(node) |
| for neighbor in adj_list[node]: |
| dfs(neighbor, component) |
| |
| for node in nodes: |
| if node not in visited: |
| component = set() |
| dfs(node, component) |
| if component: |
| components.append(component) |
| |
| return components |
|
|
|
|
| |
| |
| |
| |
| if st.session_state.uploaded_data_df is not None: |
| st.markdown("### Preview of Data") |
| df = st.session_state.uploaded_data_df |
| st.dataframe(df.head(1000)) |
| st.info(f"π Dataset contains {len(df)} rows and {len(df.columns)} columns") |
|
|
| |
| if st.button("Run Entity Resolution"): |
| |
| |
| nodes = [] |
| for idx, row in df.iterrows(): |
| node_data = row.to_dict() |
| node_data["id"] = str(idx) |
| node_data["label"] = DEFAULT_NODE_LABEL |
| |
| |
| |
| first_name = row.get("first_name", "") |
| last_name = row.get("last_name", "") |
| short_label = f"{first_name} {last_name}".strip() |
| if not short_label.strip(): |
| short_label = f"Row-{idx}" |
| node_data["name"] = short_label |
| nodes.append({"data": node_data}) |
|
|
| |
| |
| edges = [] |
| for i in range(len(df)): |
| for j in range(i + 1, len(df)): |
| sim = overall_similarity(df.loc[i], df.loc[j], similarity_cols) |
| if sim >= similarity_threshold: |
| edge_data = { |
| "id": f"edge_{i}_{j}", |
| "source": str(i), |
| "target": str(j), |
| "label": DEFAULT_REL_TYPE, |
| "similarity": round(sim, 3) |
| } |
| edges.append({"data": edge_data}) |
|
|
| elements = {"nodes": nodes, "edges": edges} |
| st.success("Entity Resolution complete! Network graph built.") |
|
|
|
|
| |
| |
| st.markdown("### Network Graph") |
| node_labels = set(node["data"]["label"] for node in elements["nodes"]) |
| rel_labels = set(edge["data"]["label"] for edge in elements["edges"]) |
|
|
| |
| default_colors = ["#2A629A", "#FF7F3E", "#C0C0C0", "#008000", "#800080"] |
| node_styles = [] |
| for i, label in enumerate(sorted(node_labels)): |
| color = default_colors[i % len(default_colors)] |
| node_styles.append(NodeStyle(label=label, color=color, caption="name")) |
|
|
| edge_styles = [] |
| for rel in sorted(rel_labels): |
| edge_styles.append(EdgeStyle(rel, caption="similarity", directed=False)) |
|
|
| st_link_analysis( |
| elements, |
| layout="cose", |
| node_styles=node_styles, |
| edge_styles=edge_styles |
| ) |
|
|
| |
| |
| st.markdown("### Community Detection Results") |
| |
| |
| if HAS_NETWORKX: |
| |
| G = nx.Graph() |
| for node in elements["nodes"]: |
| G.add_node(node["data"]["id"]) |
| for edge in elements["edges"]: |
| G.add_edge(edge["data"]["source"], edge["data"]["target"]) |
| communities = list(nx.connected_components(G)) |
| else: |
| |
| st.info("NetworkX not found. Using manual connected components algorithm. Install NetworkX for better performance: `pip install networkx`") |
| node_ids = [node["data"]["id"] for node in elements["nodes"]] |
| communities = find_connected_components_manual(node_ids, elements["edges"]) |
| |
| |
| node_to_community = {} |
| community_uuids = {} |
| |
| for i, community in enumerate(communities): |
| community_uuid = str(uuid.uuid4()) |
| community_uuids[i] = community_uuid |
| for node_id in community: |
| node_to_community[node_id] = community_uuid |
| |
| |
| df_with_communities = df.copy() |
| df_with_communities['community_id'] = [ |
| node_to_community.get(str(idx), str(uuid.uuid4())) |
| for idx in df_with_communities.index |
| ] |
| |
| st.write(f"**Found {len(communities)} communities:**") |
| for i, community in enumerate(communities): |
| st.write(f"- Community {i+1}: {len(community)} records (UUID: {community_uuids[i]})") |
| |
| |
| st.markdown("#### Results with Community IDs") |
| st.dataframe(df_with_communities) |
| |
| |
| |
| st.markdown("#### Canonical View (Representative Records by Community)") |
| |
| |
| canonical_records = [] |
| |
| for i, community in enumerate(communities): |
| community_uuid = community_uuids[i] |
| |
| |
| community_rows = df_with_communities[df_with_communities['community_id'] == community_uuid] |
| |
| if len(community_rows) > 0: |
| |
| representative = community_rows.iloc[0].copy() |
| |
| |
| representative['community_size'] = len(community_rows) |
| representative['community_members'] = f"{len(community_rows)} records" |
| |
| |
| if len(community_rows) > 1: |
| variations = [] |
| for col in similarity_cols: |
| if col in community_rows.columns: |
| unique_vals = community_rows[col].dropna().unique() |
| if len(unique_vals) > 1: |
| variations.append(f"{col}: {len(unique_vals)} variants") |
| representative['variations'] = "; ".join(variations) if variations else "No variations" |
| else: |
| representative['variations'] = "Single record" |
| |
| canonical_records.append(representative) |
| |
| if canonical_records: |
| canonical_df = pd.DataFrame(canonical_records) |
| |
| |
| display_cols = [] |
| |
| |
| display_cols.extend(['community_id', 'community_members', 'variations']) |
| |
| |
| for col in similarity_cols: |
| if col in canonical_df.columns: |
| display_cols.append(col) |
| |
| |
| for col in canonical_df.columns: |
| if col not in display_cols and col not in ['community_size']: |
| display_cols.append(col) |
| |
| |
| display_cols = [col for col in display_cols if col in canonical_df.columns] |
| canonical_display = canonical_df[display_cols] |
| |
| st.dataframe(canonical_display) |
| st.info(f"π Showing {len(canonical_display)} canonical records representing {len(communities)} communities") |
| |
| |
| canonical_csv_buffer = io.StringIO() |
| canonical_display.to_csv(canonical_csv_buffer, index=False) |
| canonical_csv_data = canonical_csv_buffer.getvalue() |
| |
| st.download_button( |
| label="π₯ Download Canonical View as CSV", |
| data=canonical_csv_data, |
| file_name="canonical_entity_resolution.csv", |
| mime="text/csv", |
| key="canonical_download" |
| ) |
| else: |
| st.warning("No canonical records to display") |
| |
| |
| st.markdown("#### Export Results") |
| csv_buffer = io.StringIO() |
| df_with_communities.to_csv(csv_buffer, index=False) |
| csv_data = csv_buffer.getvalue() |
| |
| st.download_button( |
| label="π₯ Download Results as CSV", |
| data=csv_data, |
| file_name="entity_resolution_results.csv", |
| mime="text/csv" |
| ) |
|
|
| |
| |
| if show_redlining and len(edges) > 0: |
| st.markdown("### Top Similar Pairs (Red-Lined Differences)") |
| |
| |
| filtered_edges = [ |
| edge for edge in edges if edge["data"]["similarity"] < 1.0 |
| ] |
| |
| |
| sorted_edges = sorted(filtered_edges, key=lambda e: e["data"]["similarity"], reverse=True) |
| top_edges = sorted_edges[:MAX_REDLINE_PREVIEW] |
|
|
| if not top_edges: |
| st.info("No slightly different pairs found; all matches are exact or none meet the threshold.") |
| else: |
| for edge_item in top_edges: |
| s_idx = int(edge_item["data"]["source"]) |
| t_idx = int(edge_item["data"]["target"]) |
| sim_val = edge_item["data"]["similarity"] |
| st.markdown(f"**Pair:** Row {s_idx} β Row {t_idx}, **similarity**={sim_val}") |
|
|
| |
| mismatch_cols = [] |
| for col in similarity_cols: |
| val1 = str(df.loc[s_idx, col]) |
| val2 = str(df.loc[t_idx, col]) |
| if val1.lower() != val2.lower(): |
| mismatch_cols.append((col, val1, val2)) |
|
|
| if mismatch_cols: |
| st.write("Differences in the following columns:") |
| for col_name, str1, str2 in mismatch_cols: |
| redlined = redline_text(str1, str2) |
| st.markdown(f" **{col_name}:** {redlined}", unsafe_allow_html=True) |
| else: |
| st.write("No differences in the compared columns.") |
|
|
| st.markdown("---") |
|
|
| |
| |
| st.markdown("---") |
| st.markdown("### π Enterprise Scale Solutions") |
| |
| if not HAS_NETWORKX: |
| st.warning(""" |
| **Missing NetworkX Dependency** |
| |
| For better performance, install NetworkX: |
| ```bash |
| pip install networkx |
| ``` |
| """) |
| |
| st.info(""" |
| **Need help with larger scale deployments?** |
| |
| If you need to persist UUIDs from run to run, handle larger datasets, or require more sophisticated |
| entity resolution capabilities, you may need an enterprise-scale solution. Consider: |
| |
| - **Database Integration**: Store community IDs in a persistent database |
| - **Incremental Processing**: Handle new data without re-processing everything |
| - **Advanced Blocking**: Use more sophisticated blocking strategies for large datasets |
| - **Distributed Computing**: Scale across multiple machines for very large datasets |
| - **Custom ML Models**: Train domain-specific models for better accuracy |
| |
| Contact **Eastridge Analytics** for guidance on enterprise implementations. |
| """) |
|
|
| else: |
| st.info("Please upload a CSV file in the sidebar to begin.") |
|
|