HTSReivewTool / app.py
joycecast's picture
Upload app.py
77f04f8 verified
"""
HTS Checker - Streamlit Application for HTS Tariff Auditing
Deployed on Hugging Face Spaces
"""
import streamlit as st
import pandas as pd
from io import BytesIO
import hashlib
import os
from HTS_list import (Steel_primary_HTS_list, Aluminum_primary_HTS_list, Copper_primary_HTS_list,
Computer_parts_HTS_list, Auto_parts_HTS_list, Semiconductor_HTS_list)
pd.set_option("styler.render.max_elements", 10_000_000)
# Build combined monitored HTS set (strings) for prefix matching
_ALL_MONITORED_HTS = set()
for _lst in [Steel_primary_HTS_list, Aluminum_primary_HTS_list, Copper_primary_HTS_list,
Computer_parts_HTS_list, Auto_parts_HTS_list, Semiconductor_HTS_list]:
for _hts in _lst:
_ALL_MONITORED_HTS.add(str(_hts))
def hts_in_monitored_list(primary_hts: str) -> bool:
"""Check if primary HTS prefix-matches any monitored HTS code"""
primary_str = str(primary_hts).replace(".", "").strip()
for list_hts in _ALL_MONITORED_HTS:
if len(list_hts) <= len(primary_str):
if primary_str.startswith(list_hts):
return True
else:
if list_hts.startswith(primary_str):
return True
return False
# Path to reviewed combinations CSV file
REVIEWED_COMBINATIONS_FILE = "Reviewed_combination.csv"
# Page configuration
st.set_page_config(
page_title="HTS Checker - Tariff Audit Tool",
page_icon="",
layout="wide"
)
# =============================================================================
# Authentication
# =============================================================================
def get_app_password():
"""Get password from secrets or environment variable."""
try:
return st.secrets["APP_PASSWORD"]
except (KeyError, FileNotFoundError):
pass
return os.environ.get("HTS_CHECKER_PASSWORD", "")
def check_password():
"""Returns True if the user has entered the correct password."""
app_password = get_app_password()
if not app_password:
return True
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if st.session_state.authenticated:
return True
st.markdown("## HTS Checker - Login Required")
st.markdown("Please enter the password to access this application.")
with st.form("login_form"):
password_input = st.text_input("Password", type="password", key="password_input")
submit_button = st.form_submit_button("Login")
if submit_button:
if password_input == app_password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("Incorrect password. Please try again.")
return False
# Check authentication before showing main app
if not check_password():
st.stop()
def load_single_excel(file_content):
"""Load a single Excel file with proper HTS column types"""
df = pd.read_excel(BytesIO(file_content), dtype={
"Tariff": str,
"Primary 1": str,
"Primary 2": str,
"Primary 3": str,
"Primary 4": str,
"Primary 5": str,
"Primary 6": str,
})
hts_columns = ["Tariff", "Primary 1", "Primary 2", "Primary 3",
"Primary 4", "Primary 5", "Primary 6"]
for col in hts_columns:
if col in df.columns:
df[col] = df[col].astype(str).str.replace(r'\.0$', '', regex=True)
df[col] = df[col].replace('nan', '')
return df
@st.cache_data
def load_and_validate_excel(file_contents_list, file_names_list, _cache_key):
"""Load multiple Excel files and combine"""
all_dfs = []
for file_content in file_contents_list:
df = load_single_excel(file_content)
all_dfs.append(df)
combined_df = pd.concat(all_dfs, ignore_index=True)
return combined_df
def load_reviewed_combinations():
"""Load reviewed HTS+Description combinations from CSV file"""
reviewed_set = set()
csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE)
if os.path.exists(csv_path):
encodings = ["utf-8", "cp1252", "latin-1", "utf-8-sig"]
df = None
for encoding in encodings:
try:
df = pd.read_csv(csv_path, dtype=str, encoding=encoding)
break
except UnicodeDecodeError:
continue
except Exception as e:
st.warning(f"Could not load reviewed combinations: {e}")
return reviewed_set
if df is not None and "HTS" in df.columns and "Description" in df.columns:
for _, row in df.iterrows():
hts = str(row["HTS"]).strip() if pd.notna(row["HTS"]) else ""
desc = str(row["Description"]).strip().upper() if pd.notna(row["Description"]) else ""
if hts and desc:
reviewed_set.add((hts, desc))
return reviewed_set
def is_combination_reviewed(hts, description, reviewed_set):
"""Check if HTS+Description combination has been reviewed"""
hts_str = str(hts).strip() if pd.notna(hts) else ""
desc_str = str(description).strip().upper() if pd.notna(description) else ""
return (hts_str, desc_str) in reviewed_set
def run_simple_validation(df, reviewed_combinations):
"""Flag combinations not found in reviewed_combinations CSV"""
additional_cols = ["Primary 1", "Primary 2", "Primary 3",
"Primary 4", "Primary 5", "Primary 6"]
data = []
for idx, row in df.iterrows():
entry_number = str(row.get("Entry Number", f"Row_{idx}"))
description = str(row.get("Description", ""))
primary_hts = str(row.get("Tariff", "")).strip()
additional_hts = []
for col in additional_cols:
if col in row and row[col] is not None:
val = str(row[col]).strip()
if val and val.lower() != "nan":
additional_hts.append(val)
additional_hts_str = ", ".join(additional_hts)
desc_upper = description.strip().upper()
if not hts_in_monitored_list(primary_hts):
status = "PASS"
elif (primary_hts, desc_upper) in reviewed_combinations:
status = "PASS"
else:
status = "FLAG"
data.append({
"Entry Number": entry_number,
"Description": description[:100] + "..." if len(description) > 100 else description,
"Full Description": description,
"Primary HTS": primary_hts,
"Additional HTS": additional_hts_str,
"Status": status,
})
return pd.DataFrame(data)
# Initialize session state
if "export_cache" not in st.session_state:
st.session_state.export_cache = []
if "validation_results" not in st.session_state:
st.session_state.validation_results = None
if "original_df" not in st.session_state:
st.session_state.original_df = None
def color_status(val):
"""Color code status column"""
if val == "PASS":
return "background-color: #90EE90"
elif val == "FLAG":
return "background-color: #FFFFE0"
return ""
def format_hts(hts_value):
"""Format HTS value as string, removing .0 suffix"""
if not hts_value:
return ""
s = str(hts_value)
if s.endswith(".0"):
s = s[:-2]
return s
def export_to_excel(df, results_df=None):
"""Export DataFrame to Excel with optional validation results"""
output = BytesIO()
with pd.ExcelWriter(output, engine="openpyxl") as writer:
if results_df is not None:
export_df = df.copy()
if len(results_df) == len(export_df):
export_df["Status"] = results_df["Status"].values
export_df.to_excel(writer, sheet_name="Audit Results", index=False)
else:
df.to_excel(writer, sheet_name="Export", index=False)
output.seek(0)
return output
# Main app
st.title("HTS Checker - Tariff Audit Tool")
st.markdown("Flag HTS + Description combinations not found in reviewed list")
# Create tabs
tab1, tab2, tab2b, tab3 = st.tabs([
"Upload & Filter",
"Validation Results",
"Unique Combinations",
"Export Selection",
])
# Tab 1: Upload & Filter
with tab1:
st.header("Upload Excel Files")
uploaded_files = st.file_uploader(
"Upload entry report Excel files (multiple allowed)",
type=["xlsx", "xls"],
accept_multiple_files=True,
help="Upload one or more customizable entry reports from NetCHB."
)
if uploaded_files:
try:
file_contents = [f.read() for f in uploaded_files]
file_names = [f.name for f in uploaded_files]
for f in uploaded_files:
f.seek(0)
cache_key = hashlib.md5(",".join(sorted(file_names)).encode()).hexdigest()
df = load_and_validate_excel(file_contents, file_names, cache_key)
st.session_state.original_df = df
if len(uploaded_files) > 1:
st.success(f"Loaded {len(df)} rows from {len(uploaded_files)} files")
else:
st.success(f"Loaded {len(df)} rows")
with st.expander("Column Mapping"):
st.markdown("""
**Expected Columns:**
- Column E: `Description` - Product description
- Column F: `Tariff` - 10-digit Primary HTS code
- Columns I-N: `Primary 1-6` - Additional HTS codes
""")
st.write("**Detected columns:**", df.columns.tolist())
st.subheader("Filter Options")
col1, col2 = st.columns(2)
with col1:
hts_filter = st.text_input(
"Filter by Primary HTS (partial match)",
placeholder="e.g., 7301 or 730120",
)
with col2:
desc_exclude = st.text_input(
"Exclude by description keyword",
placeholder="e.g., polyester",
)
filtered_df = df.copy()
if hts_filter:
tariff_col = "Tariff" if "Tariff" in df.columns else df.columns[5]
filtered_df = filtered_df[
filtered_df[tariff_col].astype(str).str.contains(hts_filter, na=False)
]
if desc_exclude:
desc_col = "Description" if "Description" in df.columns else df.columns[4]
filtered_df = filtered_df[
~filtered_df[desc_col].astype(str).str.lower().str.contains(
desc_exclude.lower(), na=False
)
]
st.write(f"**{len(filtered_df)} of {len(df)} entries after filters**")
if len(filtered_df) > 0:
file_names_key = ",".join(sorted(file_names))
validation_done = (
"cached_full_results" in st.session_state and
st.session_state.get("cached_file_names") == file_names_key
)
if validation_done:
full_results_df = st.session_state.cached_full_results
filtered_indices = filtered_df.index.tolist()
filtered_results_df = full_results_df.iloc[filtered_indices].copy()
st.session_state.validation_results = filtered_results_df
st.session_state.filtered_df = filtered_df
st.success(f"Validated {len(filtered_df)} entries. Go to 'Validation Results' tab to review.")
else:
st.session_state.filtered_df = filtered_df
if st.button("Validate", type="primary"):
with st.spinner("Validating entries..."):
reviewed_combinations = load_reviewed_combinations()
full_results_df = run_simple_validation(df, reviewed_combinations)
st.session_state.cached_full_results = full_results_df
st.session_state.cached_file_names = file_names_key
filtered_indices = filtered_df.index.tolist()
filtered_results_df = full_results_df.iloc[filtered_indices].copy()
st.session_state.validation_results = filtered_results_df
st.rerun()
except Exception as e:
st.error(f"Error loading file: {str(e)}")
# Tab 2: Validation Results
with tab2:
st.header("Validation Results")
if st.session_state.validation_results is None:
st.info("Upload a file and run validation first.")
else:
results_df = st.session_state.validation_results.copy()
col1, col2 = st.columns(2)
with col1:
pass_count = len(results_df[results_df["Status"] == "PASS"])
st.metric("PASS", pass_count)
with col2:
flag_count = len(results_df[results_df["Status"] == "FLAG"])
st.metric("FLAG", flag_count)
st.subheader("Filter Results")
status_filter = st.multiselect(
"Filter by Status",
options=["PASS", "FLAG"],
default=["FLAG"]
)
display_df = results_df.copy()
if status_filter:
display_df = display_df[display_df["Status"].isin(status_filter)]
st.write(f"**Showing {len(display_df)} results**")
if len(display_df) > 0:
st.markdown("**Interactive Filters:**")
filter_col1, filter_col2 = st.columns(2)
with filter_col1:
hts_search = st.text_input(
"Filter by Primary HTS",
placeholder="e.g., 7301 or 8302",
key="results_hts_filter"
)
with filter_col2:
desc_search = st.text_input(
"Filter by Description",
placeholder="e.g., steel, aluminum",
key="results_desc_filter"
)
interactive_df = display_df.copy()
if hts_search:
interactive_df = interactive_df[
interactive_df["Primary HTS"].astype(str).str.contains(hts_search, case=False, na=False)
]
if desc_search:
interactive_df = interactive_df[
interactive_df["Description"].astype(str).str.contains(desc_search, case=False, na=False)
]
st.write(f"**Filtered: {len(interactive_df)} of {len(display_df)} results**")
st.session_state.interactive_filtered_df = interactive_df
display_columns = ["Entry Number", "Description", "Primary HTS", "Additional HTS", "Status"]
styled_df = interactive_df[display_columns].style.applymap(
color_status, subset=["Status"]
)
st.dataframe(styled_df, use_container_width=True, height=400)
st.subheader("Add to Export Cache")
col1, col2 = st.columns(2)
with col1:
if st.button("Add ALL Filtered to Cache", type="primary"):
added_count = 0
for _, row in interactive_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", ""))
existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} entries to cache ({len(st.session_state.export_cache)} total)")
with col2:
if st.button("Add FLAG Only to Cache"):
flag_df = interactive_df[interactive_df["Status"] == "FLAG"]
added_count = 0
for _, row in flag_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Entry Number", ""), row_dict.get("Primary HTS", ""), row_dict.get("Description", ""))
existing_keys = [(d.get("Entry Number", ""), d.get("Primary HTS", ""), d.get("Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} FLAG entries to cache")
st.info(f"Current cache: {len(st.session_state.export_cache)} entries. Go to 'Export Selection' tab to download.")
# Tab 2b: Unique Combinations
with tab2b:
st.header("Unique HTS + Description Combinations")
st.markdown("View unique combinations to avoid reviewing duplicates")
if st.session_state.validation_results is None:
st.info("Upload a file and run validation first.")
else:
results_df = st.session_state.validation_results.copy()
reviewed_combinations = load_reviewed_combinations()
reviewed_count = len(reviewed_combinations)
st.subheader("Filter Options")
filter_reviewed = st.checkbox(
f"Hide reviewed combinations ({reviewed_count} in list)",
value=True,
key="filter_reviewed_combinations",
)
if reviewed_count > 0:
with st.expander(f"View {reviewed_count} reviewed combinations"):
csv_path = os.path.join(os.path.dirname(__file__), REVIEWED_COMBINATIONS_FILE)
st.caption(f"File: {csv_path}")
try:
reviewed_df = None
for enc in ["utf-8", "cp1252", "latin-1", "utf-8-sig"]:
try:
reviewed_df = pd.read_csv(csv_path, dtype=str, encoding=enc)
break
except UnicodeDecodeError:
continue
if reviewed_df is not None:
st.dataframe(reviewed_df, use_container_width=True, height=200)
else:
st.error("Could not decode CSV file with any supported encoding")
except Exception as e:
st.error(f"Error loading file: {e}")
else:
st.info(f"No reviewed combinations found. Add HTS,Description rows to '{REVIEWED_COMBINATIONS_FILE}' to filter them out.")
unique_status_filter = st.multiselect(
"Filter by Status",
options=["PASS", "FLAG"],
default=["FLAG"],
key="unique_status_filter"
)
filtered_df = results_df.copy()
if unique_status_filter:
filtered_df = filtered_df[filtered_df["Status"].isin(unique_status_filter)]
if len(filtered_df) > 0:
unique_df = filtered_df.groupby(
["Primary HTS", "Full Description"], as_index=False
).agg({
"Entry Number": "count",
"Additional HTS": "first",
"Status": "first",
}).rename(columns={"Entry Number": "Count"})
unique_df = unique_df.sort_values("Count", ascending=False).reset_index(drop=True)
if filter_reviewed and reviewed_count > 0:
unique_df["_is_reviewed"] = unique_df.apply(
lambda row: is_combination_reviewed(
row["Primary HTS"],
row["Full Description"],
reviewed_combinations
),
axis=1
)
reviewed_in_data = unique_df["_is_reviewed"].sum()
unique_df = unique_df[~unique_df["_is_reviewed"]].drop(columns=["_is_reviewed"])
unique_df = unique_df.reset_index(drop=True)
unique_df.index = unique_df.index + 1
unique_df["Description"] = unique_df["Full Description"].apply(
lambda x: x[:80] + "..." if len(str(x)) > 80 else x
)
if filter_reviewed and reviewed_count > 0:
st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries, {reviewed_in_data} reviewed combinations hidden)")
else:
st.write(f"**{len(unique_df)} unique combinations** (from {len(filtered_df)} total entries)")
st.markdown("**Search Filters:**")
ucol1, ucol2 = st.columns(2)
with ucol1:
unique_hts_search = st.text_input(
"Filter by Primary HTS",
placeholder="e.g., 7301 or 8302",
key="unique_hts_search"
)
with ucol2:
unique_desc_search = st.text_input(
"Filter by Description",
placeholder="e.g., steel, aluminum",
key="unique_desc_search"
)
display_unique_df = unique_df.copy()
if unique_hts_search:
display_unique_df = display_unique_df[
display_unique_df["Primary HTS"].astype(str).str.contains(unique_hts_search, case=False, na=False)
]
if unique_desc_search:
display_unique_df = display_unique_df[
display_unique_df["Description"].astype(str).str.contains(unique_desc_search, case=False, na=False)
]
display_unique_df = display_unique_df.reset_index(drop=True)
display_unique_df.index = display_unique_df.index + 1
st.write(f"**Showing {len(display_unique_df)} unique combinations**")
display_cols = ["Primary HTS", "Description", "Additional HTS", "Status", "Count"]
styled_unique = display_unique_df[display_cols].style.applymap(
color_status, subset=["Status"]
)
st.dataframe(styled_unique, use_container_width=True, height=400)
st.subheader("Add Unique Combinations to Cache")
col1, col2 = st.columns(2)
with col1:
if st.button("Add ALL Unique to Cache", type="primary", key="add_all_unique"):
added_count = 0
for _, row in display_unique_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", ""))
existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} unique combinations to cache")
with col2:
if st.button("Add FLAG Unique to Cache", key="add_flag_unique"):
flag_df = display_unique_df[display_unique_df["Status"] == "FLAG"]
added_count = 0
for _, row in flag_df.iterrows():
row_dict = row.to_dict()
key = (row_dict.get("Primary HTS", ""), row_dict.get("Full Description", ""))
existing_keys = [(d.get("Primary HTS", ""), d.get("Full Description", ""))
for d in st.session_state.export_cache]
if key not in existing_keys:
st.session_state.export_cache.append(row_dict)
added_count += 1
st.success(f"Added {added_count} FLAG combinations to cache")
st.info(f"Current cache: {len(st.session_state.export_cache)} entries")
else:
st.info("No results matching the selected filters.")
# Tab 3: Export Selection
with tab3:
st.header("Export Selection")
if len(st.session_state.export_cache) == 0:
st.info("No entries in export cache. Select entries from Validation Results tab.")
else:
st.write(f"**{len(st.session_state.export_cache)} entries in cache**")
cache_df = pd.DataFrame(st.session_state.export_cache)
st.dataframe(cache_df, use_container_width=True)
col1, col2 = st.columns(2)
with col1:
if st.button("Clear Cache"):
st.session_state.export_cache = []
st.success("Cache cleared!")
st.rerun()
with col2:
if st.button("Export Cache to Excel"):
excel_data = export_to_excel(cache_df)
st.download_button(
label="Download Excel (Cache Only)",
data=excel_data,
file_name="hts_audit_cache.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
st.subheader("Export Full Results")
if st.session_state.validation_results is not None and st.session_state.original_df is not None:
results_df = st.session_state.validation_results.copy()
export_status = st.multiselect(
"Export entries with status:",
options=["PASS", "FLAG"],
default=["FLAG"],
key="export_status_filter"
)
if export_status:
filtered_results = results_df[results_df["Status"].isin(export_status)]
filtered_indices = filtered_results.index.tolist()
if hasattr(st.session_state, "filtered_df"):
export_original = st.session_state.filtered_df.iloc[filtered_indices].copy()
else:
export_original = st.session_state.original_df.iloc[filtered_indices].copy()
st.write(f"**{len(filtered_results)} entries will be exported**")
if st.button("Generate Full Export", type="primary"):
excel_data = export_to_excel(export_original, filtered_results)
st.download_button(
label="Download Full Excel Report",
data=excel_data,
file_name="hts_audit_full_report.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
else:
st.info("Run validation first to enable full export.")
# Footer
st.markdown("---")
st.markdown("HTS Checker v2.0 - Tariff Audit Tool")