audio-library / app.py
akazemian's picture
Upload folder using huggingface_hub
f1ac269 verified
import os, uuid, datetime, traceback
from pathlib import Path
import html as _py_html
import pandas as pd
import gradio as gr
from huggingface_hub import hf_hub_download
from urllib.parse import unquote # add at top
from huggingface_hub.errors import EntryNotFoundError, RepositoryNotFoundError
# ----------- HF DATASET CONFIG -----------
HF_DATASET_REPO = "akazemian/audio-html" # <-- change if needed
INDEX_FILENAME = "index.csv"
# -----------------------------------------
DB_PATH = "library.csv"
ALLOWED_EXTS = {".html"}
# Columns in DB
EXTRA_COLS = ["category", "dataset", "hf_path"] # <-- add hf_path here
BASE_COLS = ["id","filename","path","tags","keywords","notes","uploaded_at"]
ALL_DB_COLS = BASE_COLS + EXTRA_COLS
# Columns shown in the table (don't show hf_path)
TABLE_COLS = ["id","filename","category","dataset",
"tags","keywords","notes","uploaded_at"]
# ---------- Load HF index ----------
# ----------- HF DATASET CONFIG -----------
HF_DATASET_REPO = "akazemian/audio-html" # dataset holding the HTMLs + index.csv
INDEX_FILENAME = "index.csv"
# -----------------------------------------
# Always read index.csv from the DATASET (not the Space)
HF_INDEX_REPO_ID = HF_DATASET_REPO
HF_INDEX_REPO_TYPE = "dataset"
# at the top
import os
from huggingface_hub import login
def _refresh_hf_path_from_index(db_row_id: str, filename: str, current_model_hint: str | None = None) -> str | None:
"""
Look up correct relpath for `filename` in index.csv. If `current_model_hint`
is given, only consider rows whose relpath starts with that model (handles shard dirs).
Persist the fixed hf_path back into library.csv for row `db_row_id`.
"""
try:
idx = _load_hf_index()
except Exception:
return None
df = idx.copy()
df["filename"] = df["filename"].astype(str)
df["relpath"] = df["relpath"].astype(str)
if current_model_hint:
model = unquote(str(current_model_hint).strip())
df = df[df["relpath"].str.startswith(f"{model}/")]
hits = df[df["filename"] == str(filename)]
if hits.empty:
return None
relpath = unquote(str(hits.iloc[0]["relpath"])) # includes shard subdir
new_hf_path = f"hf://{HF_DATASET_REPO}/{relpath}"
# persist the fix
db = _load_db()
i = db.index[db["id"] == db_row_id]
if len(i):
db.at[i[0], "hf_path"] = new_hf_path
_save_db(db)
return new_hf_path
# --- add near top ---
def parse_hf_uri(uri: str):
"""
hf://<user>/<repo>/<relpath...> -> (repo_id 'user/repo', relpath)
"""
assert uri.startswith("hf://")
rest = uri[len("hf://"):]
parts = rest.split("/", 2)
if len(parts) < 3:
raise ValueError(f"Bad hf:// uri: {uri}")
user, repo, relpath = parts[0], parts[1], parts[2]
return f"{user}/{repo}", relpath
# ---------- DB helpers ----------
def _load_db() -> pd.DataFrame:
if os.path.exists(DB_PATH):
df = pd.read_csv(DB_PATH)
for c in ALL_DB_COLS:
if c not in df.columns:
df[c] = ""
for c in ["tags","keywords","notes","category","dataset","hf_path","path","filename","id","uploaded_at"]:
df[c] = df[c].fillna("").astype(str)
return df[ALL_DB_COLS]
return pd.DataFrame(columns=ALL_DB_COLS)
def _save_db(df: pd.DataFrame):
df.to_csv(DB_PATH, index=False)
# ---------- Table normalizer ----------
def _df_from_table_value(table_value):
cols = TABLE_COLS
if isinstance(table_value, pd.DataFrame):
for c in cols:
if c not in table_value.columns:
table_value[c] = ""
return table_value[cols]
if isinstance(table_value, list):
if not table_value:
return pd.DataFrame(columns=cols)
first = table_value[0]
if isinstance(first, dict):
df = pd.DataFrame(table_value)
for c in cols:
if c not in df.columns:
df[c] = ""
return df[cols]
else:
return pd.DataFrame(table_value, columns=cols)
return pd.DataFrame(columns=cols)
def _load_hf_index() -> pd.DataFrame:
"""
Download + read index.csv from the HF *dataset* repo.
Required columns: id, filename, relpath, category, dataset, tags, keywords, notes, uploaded_at
"""
local = hf_hub_download(
repo_id=HF_INDEX_REPO_ID, # = HF_DATASET_REPO
repo_type=HF_INDEX_REPO_TYPE, # = "dataset"
filename=INDEX_FILENAME,
)
# Optional: log where we loaded from (shows in Space logs)
print(f"[index] loaded from {HF_INDEX_REPO_TYPE}:{HF_INDEX_REPO_ID}/{INDEX_FILENAME} -> {local}")
df = pd.read_csv(local)
for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]:
if c not in df.columns:
df[c] = ""
for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]:
df[c] = df[c].fillna("").astype(str)
return df
# ---------- Sync by model (prefix inside HF dataset) ----------
from urllib.parse import unquote # ensure this import exists at top
def sync_model(model_name: str):
raw = (model_name or "").strip()
if not raw:
return gr.Info("Please enter a model name."), None, None, None, "", ""
try:
idx = _load_hf_index() # must have columns: id,filename,relpath,category,dataset,tags,keywords,notes,uploaded_at
except Exception as e:
traceback.print_exc()
return gr.Info(f"Failed to load index from HF: {e}"), None, None, None, "", ""
# accept both '=' and '%3D' in the model folder name
decoded = unquote(raw)
rel = idx["relpath"].astype(str)
sub = idx[rel.str.startswith(f"{raw}/") | rel.str.startswith(f"{decoded}/")].copy()
if sub.empty:
return gr.Info(
f"No HTML files found for model '{raw}'. "
"Tip: if you copied from the URL, use '=' instead of '%3D'."
), None, None, None, "", ""
# make the hf_path we'll store locally
sub["hf_path"] = sub["relpath"].apply(lambda rp: f"hf://{HF_DATASET_REPO}/{rp}")
# keep only the columns we want to pull from index into library
cols_from_idx = ["filename","hf_path","category","dataset","tags","keywords","notes","uploaded_at"]
sub = sub[cols_from_idx].copy()
db = _load_db()
# 1) update existing rows matched by filename
if not db.empty:
# merge to bring in new metadata
merged = db.merge(sub, on="filename", how="left", suffixes=("", "_idx"))
# for each field, prefer index value if present; otherwise keep existing
for col in ["hf_path","category","dataset","tags","keywords","notes","uploaded_at"]:
idx_col = f"{col}_idx"
if idx_col in merged.columns:
merged[col] = merged[idx_col].where(merged[idx_col].astype(str) != "", merged[col])
merged.drop(columns=[idx_col], inplace=True)
db = merged
# 2) add truly new rows (filenames not present yet)
existing_fns = set(db["filename"].astype(str)) if not db.empty else set()
new_rows = sub[~sub["filename"].astype(str).isin(existing_fns)].copy()
if not new_rows.empty:
# build missing base columns
new_rows["id"] = [uuid.uuid4().hex[:8] for _ in range(len(new_rows))]
new_rows["path"] = "" # unknown locally in HF mode
# reorder to library schema
new_rows = new_rows[["id","filename","path","tags","keywords","notes","uploaded_at","category","dataset","hf_path"]]
db = pd.concat([db, new_rows], ignore_index=True)
_save_db(db)
# show only this model in the UI
current_model = decoded
return refresh_view("", [], "", "", current_model) + (current_model,)
# ---------- Search / filters ----------
from urllib.parse import unquote # ensure at top of file
from urllib.parse import unquote # at top
def refresh_view(query, tag_filters, category_filter, dataset_filter, current_model):
df = _load_db()
# Ensure required columns exist (robust against old CSVs)
for col in ["id","filename","category","dataset","tags","keywords","notes","uploaded_at","path","hf_path"]:
if col not in df.columns:
df[col] = ""
# Scope to current model prefix (HF sharded paths still start with model/)
if current_model:
model = unquote(str(current_model).strip())
hf_prefix = f"hf://{HF_DATASET_REPO}/{model}/"
mask = (
df["hf_path"].astype(str).str.startswith(hf_prefix)
| df["path"].astype(str).str.startswith(hf_prefix) # for legacy rows
| df["path"].astype(str).str.contains(f"/{model}/") # local fallback
)
df = df[mask]
# Free-text search (across common text fields)
if query:
q = str(query).lower()
mask = (
df["filename"].astype(str).str.lower().str.contains(q, na=False) |
df["tags"].astype(str).str.lower().str.contains(q, na=False) |
df["keywords"].astype(str).str.lower().str.contains(q, na=False) |
df["notes"].astype(str).str.lower().str.contains(q, na=False) |
df["category"].astype(str).str.lower().str.contains(q, na=False) |
df["dataset"].astype(str).str.lower().str.contains(q, na=False)
)
df = df[mask]
# Tag filter (AND semantics)
for t in (tag_filters or []):
t = str(t).strip()
if t:
df = df[df["tags"].astype(str).apply(
lambda s: t in [x.strip() for x in str(s).split(",") if x.strip()]
)]
# Dropdown filters (exact match)
if category_filter:
df = df[df["category"] == category_filter]
if dataset_filter:
df = df[df["dataset"] == dataset_filter]
# Build vocab choices—ALWAYS define these, even if df is empty
if not df.empty:
all_tags = sorted({t.strip()
for s in df["tags"].astype(str).tolist()
for t in s.split(",") if t.strip()})
all_cats = sorted([c for c in df["category"].astype(str).unique() if c])
all_sets = sorted([c for c in df["dataset"].astype(str).unique() if c])
else:
all_tags, all_cats, all_sets = [], [], []
# Compose the table view safely
df = df.sort_values("uploaded_at", ascending=False, na_position="last").reset_index(drop=True)
view = pd.DataFrame(columns=TABLE_COLS)
if not df.empty:
# Ensure all TABLE_COLS exist before projection
for c in TABLE_COLS:
if c not in df.columns:
df[c] = ""
view = df[TABLE_COLS].copy()
count_text = f"**Showing {len(view)} file(s)**"
return (
view,
gr.update(choices=all_tags),
gr.update(choices=[""] + all_cats, value=category_filter or ""),
gr.update(choices=[""] + all_sets, value=dataset_filter or ""),
count_text
)
# ---------- Preview ----------
def _iframe_from_html_string(raw_html: str, height_px: int = 720) -> str:
srcdoc = raw_html.replace("&", "&amp;").replace('"', "&quot;")
return f'<iframe style="width:100%;height:{height_px}px;border:1px solid #ddd;border-radius:8px;" srcdoc="{srcdoc}"></iframe>'
def select_row(evt: gr.SelectData, table_value, source_mode, current_model):
try:
view = _df_from_table_value(table_value)
if view.empty:
return "<em>No rows.</em>", ""
# --- resolve row_idx robustly ---
row_idx = None
ix = getattr(evt, "index", None)
if isinstance(ix, int):
row_idx = ix
elif isinstance(ix, (list, tuple)) and ix and isinstance(ix[0], int):
row_idx = ix[0]
if row_idx is None:
val = getattr(evt, "value", None)
if isinstance(val, dict) and "id" in val:
hits = view.index[view["id"] == val["id"]].tolist()
if hits: row_idx = hits[0]
elif isinstance(val, list) and len(val) >= 1:
hits = view.index[view["id"] == val[0]].tolist()
if hits: row_idx = hits[0]
if row_idx is None:
row_idx = 0
if not (0 <= row_idx < len(view)):
return "<em>Invalid selection.</em>", ""
row = view.iloc[row_idx]
sel_id = row["id"]
# --- look up the full record from DB ---
db = _load_db()
rec = db[db["id"] == sel_id]
if rec.empty:
return "<em>Could not find file for this row.</em>", ""
# --- choose source: HF vs Local ---
use_hf = (str(source_mode).upper() == "HF")
path_str = rec["hf_path"].values[0] if use_hf else rec["path"].values[0]
path_str = str(path_str or "")
if not path_str:
return "<em>No path available for this source.</em>", f"📄 {row['filename']}"
# HF dataset URI → lazy download then iframe from raw HTML
if path_str.startswith("hf://"):
repo_id, relpath = parse_hf_uri(path_str)
relpath = unquote(relpath)
try:
# first attempt: whatever is saved in library.csv
local_path = hf_hub_download(
repo_id=repo_id,
repo_type="dataset",
filename=relpath,
local_files_only=False,
)
except EntryNotFoundError:
# fix relpath from index.csv, **scoped by active model** (handles shard dir)
fixed_hf_path = _refresh_hf_path_from_index(
sel_id,
row["filename"],
current_model_hint=current_model
)
if not fixed_hf_path:
msg = (
f"<em>Entry not found for <code>{_py_html.escape(relpath)}</code>, "
f"and no matching row for <code>{_py_html.escape(str(row['filename']))}</code> "
f"under the current model in <code>index.csv</code>.</em>"
)
return msg, f"📄 {row['filename']}"
repo_id, relpath = parse_hf_uri(fixed_hf_path)
relpath = unquote(relpath)
# single retry with corrected shard-aware relpath
local_path = hf_hub_download(
repo_id=repo_id,
repo_type="dataset",
filename=relpath,
local_files_only=False,
)
except RepositoryNotFoundError:
return (
"<em>Dataset repo not accessible. If it's private, set an HF token in Space Secrets "
"as <code>HF_TOKEN</code> (or <code>HUGGINGFACE_HUB_TOKEN</code>) and restart.</em>",
f"📄 {row['filename']}",
)
# success
raw_html = Path(local_path).read_text(encoding="utf-8")
iframe = _iframe_from_html_string(raw_html, height_px=720)
return iframe, f"📄 {row['filename']}"
# Direct HTTP URL (CDN)
if path_str.startswith("http"):
iframe = f'<iframe style="width:100%;height:720px;border:1px solid #ddd;border-radius:8px;" src="{_py_html.escape(path_str)}"></iframe>'
return iframe, f"📄 {row['filename']}"
# Local file fallback
p = Path(path_str)
if not p.exists():
return f"<em>File not found:</em> <code>{_py_html.escape(str(p))}</code>", f"📄 {row['filename']}"
raw_html = p.read_text(encoding="utf-8")
iframe = _iframe_from_html_string(raw_html, height_px=720)
return iframe, f"📄 {row['filename']}"
except EntryNotFoundError:
# just in case one bubbles up
return "<em>File not found in dataset even after shard fix.</em>", ""
except Exception as e:
traceback.print_exc()
return f"<pre>Failed to render (see terminal):\n{_py_html.escape(str(e))}</pre>", ""
# ---------- Save edits ----------
def save_edits(edited_table, current_model):
if edited_table is None or not len(edited_table):
return gr.Info("Nothing to save.")
df_db = _load_db()
editable_cols = ["category","dataset","tags","keywords","notes"]
for c in editable_cols:
edited_table[c] = edited_table[c].fillna("").astype(str)
for _, row in edited_table.iterrows():
i = df_db.index[df_db["id"] == row["id"]]
if len(i):
for c in editable_cols:
df_db.at[i[0], c] = row[c]
_save_db(df_db)
# return refreshed table only (respect current_model scope)
return refresh_view("", [], "", "", current_model)[0]
# -------------------- UI --------------------
# CSS that targets only the three buttons via elem_id
custom_css = """
/* scope styles to only these 3 components */
#sync-btn button,
#refresh-btn button,
#save-btn button,
#sync-btn .gr-button,
#refresh-btn .gr-button,
#save-btn .gr-button,
#sync-btn [role="button"],
#refresh-btn [role="button"],
#save-btn [role="button"] {
background: #f97316 !important; /* orange-500 */
border-color: #f97316 !important;
color: #fff !important;
}
/* hover/active */
#sync-btn button:hover,
#refresh-btn button:hover,
#save-btn button:hover,
#sync-btn .gr-button:hover,
#refresh-btn .gr-button:hover,
#save-btn .gr-button:hover,
#sync-btn [role="button"]:hover,
#refresh-btn [role="button"]:hover,
#save-btn [role="button"]:hover {
background: #ea580c !important; /* orange-600 */
border-color: #ea580c !important;
}
/* (optional) also set CSS vars in case theme uses them */
#sync-btn, #refresh-btn, #save-btn {
--button-primary-background-fill: #f97316;
--button-primary-background-fill-hover: #ea580c;
--button-text-color: #fff;
}
"""
with gr.Blocks(title="Audio HTML Library", css=custom_css) as demo:
gr.Markdown("## 🎧 Audio Reconstruction Reports — sync • search • view")
current_model = gr.State("") # remembers active model prefix inside HF repo
source_mode = gr.State("HF") # default
with gr.Row():
with gr.Column(scale=1):
# Choose model & sync
gr.Markdown(f"**Model prefix on HF dataset:** `{HF_DATASET_REPO}/<model_name>/...`")
model_in = gr.Textbox(label="Model name", placeholder="e.g., WavCochV8192")
sync_btn = gr.Button("Sync this model", elem_id="sync-btn")
# Search & filters
gr.Markdown("---\n**Search & filter**")
query = gr.Textbox(label="Keyword search (filename/tags/notes/category/dataset)", placeholder="type to search…")
tag_filter = gr.CheckboxGroup(choices=[], label="Filter by tags (AND)")
category_filter = gr.Dropdown(choices=[], label="Category")
dataset_filter = gr.Dropdown(choices=[], label="Dataset")
# 🔽 Step 5: Source toggle (HF vs Local)
mode_radio = gr.Radio(
choices=["HF", "Local"],
value="HF",
label="Source",
info="Preview from HF dataset or local disk"
)
refresh_btn = gr.Button("Refresh", elem_id="refresh-btn")
with gr.Column(scale=2):
# Count of current view
count_md = gr.Markdown("**Showing 0 file(s)**")
gr.Markdown("**Library** (click a row to preview; edit cells and Save)")
table = gr.Dataframe(
headers=TABLE_COLS,
datatype=["str"] * len(TABLE_COLS),
interactive=True,
wrap=True,
row_count=(0, "dynamic"),
col_count=(len(TABLE_COLS), "fixed")
)
with gr.Row():
save_btn = gr.Button("Save Edits", elem_id="save-btn")
preview_label = gr.Markdown("")
preview_html = gr.HTML("")
# wiring: sync (also sets current_model)
sync_btn.click(
sync_model,
[model_in],
[table, tag_filter, category_filter, dataset_filter, count_md, current_model]
)
# wiring: refresh + live filters (respect current_model)
refresh_btn.click(
refresh_view,
[query, tag_filter, category_filter, dataset_filter, current_model],
[table, tag_filter, category_filter, dataset_filter, count_md]
)
# Trigger refresh when any filter OR source mode changes
for comp in (query, tag_filter, category_filter, dataset_filter, mode_radio):
comp.change(
refresh_view,
[query, tag_filter, category_filter, dataset_filter, current_model],
[table, tag_filter, category_filter, dataset_filter, count_md]
)
# Keep source_mode state in sync with the radio
mode_radio.change(lambda x: x, [mode_radio], [source_mode])
# Pass source_mode into select_row so it can choose hf_path vs path
table.select(select_row, [table, source_mode, current_model], [preview_html, preview_label])
save_btn.click(save_edits, [table, current_model], [table])
# initial load (no model yet)
demo.load(
refresh_view,
[query, tag_filter, category_filter, dataset_filter, current_model],
[table, tag_filter, category_filter, dataset_filter, count_md]
)
if __name__ == "__main__":
demo.launch(share=True) # auth optional