Spaces:
Sleeping
Sleeping
| import os, uuid, datetime, traceback | |
| from pathlib import Path | |
| import html as _py_html | |
| import pandas as pd | |
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| from urllib.parse import unquote # add at top | |
| from huggingface_hub.errors import EntryNotFoundError, RepositoryNotFoundError | |
| # ----------- HF DATASET CONFIG ----------- | |
| HF_DATASET_REPO = "akazemian/audio-html" # <-- change if needed | |
| INDEX_FILENAME = "index.csv" | |
| # ----------------------------------------- | |
| DB_PATH = "library.csv" | |
| ALLOWED_EXTS = {".html"} | |
| # Columns in DB | |
| EXTRA_COLS = ["category", "dataset", "hf_path"] # <-- add hf_path here | |
| BASE_COLS = ["id","filename","path","tags","keywords","notes","uploaded_at"] | |
| ALL_DB_COLS = BASE_COLS + EXTRA_COLS | |
| # Columns shown in the table (don't show hf_path) | |
| TABLE_COLS = ["id","filename","category","dataset", | |
| "tags","keywords","notes","uploaded_at"] | |
| # ---------- Load HF index ---------- | |
| # ----------- HF DATASET CONFIG ----------- | |
| HF_DATASET_REPO = "akazemian/audio-html" # dataset holding the HTMLs + index.csv | |
| INDEX_FILENAME = "index.csv" | |
| # ----------------------------------------- | |
| # Always read index.csv from the DATASET (not the Space) | |
| HF_INDEX_REPO_ID = HF_DATASET_REPO | |
| HF_INDEX_REPO_TYPE = "dataset" | |
| # at the top | |
| import os | |
| from huggingface_hub import login | |
| def _refresh_hf_path_from_index(db_row_id: str, filename: str, current_model_hint: str | None = None) -> str | None: | |
| """ | |
| Look up correct relpath for `filename` in index.csv. If `current_model_hint` | |
| is given, only consider rows whose relpath starts with that model (handles shard dirs). | |
| Persist the fixed hf_path back into library.csv for row `db_row_id`. | |
| """ | |
| try: | |
| idx = _load_hf_index() | |
| except Exception: | |
| return None | |
| df = idx.copy() | |
| df["filename"] = df["filename"].astype(str) | |
| df["relpath"] = df["relpath"].astype(str) | |
| if current_model_hint: | |
| model = unquote(str(current_model_hint).strip()) | |
| df = df[df["relpath"].str.startswith(f"{model}/")] | |
| hits = df[df["filename"] == str(filename)] | |
| if hits.empty: | |
| return None | |
| relpath = unquote(str(hits.iloc[0]["relpath"])) # includes shard subdir | |
| new_hf_path = f"hf://{HF_DATASET_REPO}/{relpath}" | |
| # persist the fix | |
| db = _load_db() | |
| i = db.index[db["id"] == db_row_id] | |
| if len(i): | |
| db.at[i[0], "hf_path"] = new_hf_path | |
| _save_db(db) | |
| return new_hf_path | |
| # --- add near top --- | |
| def parse_hf_uri(uri: str): | |
| """ | |
| hf://<user>/<repo>/<relpath...> -> (repo_id 'user/repo', relpath) | |
| """ | |
| assert uri.startswith("hf://") | |
| rest = uri[len("hf://"):] | |
| parts = rest.split("/", 2) | |
| if len(parts) < 3: | |
| raise ValueError(f"Bad hf:// uri: {uri}") | |
| user, repo, relpath = parts[0], parts[1], parts[2] | |
| return f"{user}/{repo}", relpath | |
| # ---------- DB helpers ---------- | |
| def _load_db() -> pd.DataFrame: | |
| if os.path.exists(DB_PATH): | |
| df = pd.read_csv(DB_PATH) | |
| for c in ALL_DB_COLS: | |
| if c not in df.columns: | |
| df[c] = "" | |
| for c in ["tags","keywords","notes","category","dataset","hf_path","path","filename","id","uploaded_at"]: | |
| df[c] = df[c].fillna("").astype(str) | |
| return df[ALL_DB_COLS] | |
| return pd.DataFrame(columns=ALL_DB_COLS) | |
| def _save_db(df: pd.DataFrame): | |
| df.to_csv(DB_PATH, index=False) | |
| # ---------- Table normalizer ---------- | |
| def _df_from_table_value(table_value): | |
| cols = TABLE_COLS | |
| if isinstance(table_value, pd.DataFrame): | |
| for c in cols: | |
| if c not in table_value.columns: | |
| table_value[c] = "" | |
| return table_value[cols] | |
| if isinstance(table_value, list): | |
| if not table_value: | |
| return pd.DataFrame(columns=cols) | |
| first = table_value[0] | |
| if isinstance(first, dict): | |
| df = pd.DataFrame(table_value) | |
| for c in cols: | |
| if c not in df.columns: | |
| df[c] = "" | |
| return df[cols] | |
| else: | |
| return pd.DataFrame(table_value, columns=cols) | |
| return pd.DataFrame(columns=cols) | |
| def _load_hf_index() -> pd.DataFrame: | |
| """ | |
| Download + read index.csv from the HF *dataset* repo. | |
| Required columns: id, filename, relpath, category, dataset, tags, keywords, notes, uploaded_at | |
| """ | |
| local = hf_hub_download( | |
| repo_id=HF_INDEX_REPO_ID, # = HF_DATASET_REPO | |
| repo_type=HF_INDEX_REPO_TYPE, # = "dataset" | |
| filename=INDEX_FILENAME, | |
| ) | |
| # Optional: log where we loaded from (shows in Space logs) | |
| print(f"[index] loaded from {HF_INDEX_REPO_TYPE}:{HF_INDEX_REPO_ID}/{INDEX_FILENAME} -> {local}") | |
| df = pd.read_csv(local) | |
| for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]: | |
| if c not in df.columns: | |
| df[c] = "" | |
| for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]: | |
| df[c] = df[c].fillna("").astype(str) | |
| return df | |
| # ---------- Sync by model (prefix inside HF dataset) ---------- | |
| from urllib.parse import unquote # ensure this import exists at top | |
| def sync_model(model_name: str): | |
| raw = (model_name or "").strip() | |
| if not raw: | |
| return gr.Info("Please enter a model name."), None, None, None, "", "" | |
| try: | |
| idx = _load_hf_index() # must have columns: id,filename,relpath,category,dataset,tags,keywords,notes,uploaded_at | |
| except Exception as e: | |
| traceback.print_exc() | |
| return gr.Info(f"Failed to load index from HF: {e}"), None, None, None, "", "" | |
| # accept both '=' and '%3D' in the model folder name | |
| decoded = unquote(raw) | |
| rel = idx["relpath"].astype(str) | |
| sub = idx[rel.str.startswith(f"{raw}/") | rel.str.startswith(f"{decoded}/")].copy() | |
| if sub.empty: | |
| return gr.Info( | |
| f"No HTML files found for model '{raw}'. " | |
| "Tip: if you copied from the URL, use '=' instead of '%3D'." | |
| ), None, None, None, "", "" | |
| # make the hf_path we'll store locally | |
| sub["hf_path"] = sub["relpath"].apply(lambda rp: f"hf://{HF_DATASET_REPO}/{rp}") | |
| # keep only the columns we want to pull from index into library | |
| cols_from_idx = ["filename","hf_path","category","dataset","tags","keywords","notes","uploaded_at"] | |
| sub = sub[cols_from_idx].copy() | |
| db = _load_db() | |
| # 1) update existing rows matched by filename | |
| if not db.empty: | |
| # merge to bring in new metadata | |
| merged = db.merge(sub, on="filename", how="left", suffixes=("", "_idx")) | |
| # for each field, prefer index value if present; otherwise keep existing | |
| for col in ["hf_path","category","dataset","tags","keywords","notes","uploaded_at"]: | |
| idx_col = f"{col}_idx" | |
| if idx_col in merged.columns: | |
| merged[col] = merged[idx_col].where(merged[idx_col].astype(str) != "", merged[col]) | |
| merged.drop(columns=[idx_col], inplace=True) | |
| db = merged | |
| # 2) add truly new rows (filenames not present yet) | |
| existing_fns = set(db["filename"].astype(str)) if not db.empty else set() | |
| new_rows = sub[~sub["filename"].astype(str).isin(existing_fns)].copy() | |
| if not new_rows.empty: | |
| # build missing base columns | |
| new_rows["id"] = [uuid.uuid4().hex[:8] for _ in range(len(new_rows))] | |
| new_rows["path"] = "" # unknown locally in HF mode | |
| # reorder to library schema | |
| new_rows = new_rows[["id","filename","path","tags","keywords","notes","uploaded_at","category","dataset","hf_path"]] | |
| db = pd.concat([db, new_rows], ignore_index=True) | |
| _save_db(db) | |
| # show only this model in the UI | |
| current_model = decoded | |
| return refresh_view("", [], "", "", current_model) + (current_model,) | |
| # ---------- Search / filters ---------- | |
| from urllib.parse import unquote # ensure at top of file | |
| from urllib.parse import unquote # at top | |
| def refresh_view(query, tag_filters, category_filter, dataset_filter, current_model): | |
| df = _load_db() | |
| # Ensure required columns exist (robust against old CSVs) | |
| for col in ["id","filename","category","dataset","tags","keywords","notes","uploaded_at","path","hf_path"]: | |
| if col not in df.columns: | |
| df[col] = "" | |
| # Scope to current model prefix (HF sharded paths still start with model/) | |
| if current_model: | |
| model = unquote(str(current_model).strip()) | |
| hf_prefix = f"hf://{HF_DATASET_REPO}/{model}/" | |
| mask = ( | |
| df["hf_path"].astype(str).str.startswith(hf_prefix) | |
| | df["path"].astype(str).str.startswith(hf_prefix) # for legacy rows | |
| | df["path"].astype(str).str.contains(f"/{model}/") # local fallback | |
| ) | |
| df = df[mask] | |
| # Free-text search (across common text fields) | |
| if query: | |
| q = str(query).lower() | |
| mask = ( | |
| df["filename"].astype(str).str.lower().str.contains(q, na=False) | | |
| df["tags"].astype(str).str.lower().str.contains(q, na=False) | | |
| df["keywords"].astype(str).str.lower().str.contains(q, na=False) | | |
| df["notes"].astype(str).str.lower().str.contains(q, na=False) | | |
| df["category"].astype(str).str.lower().str.contains(q, na=False) | | |
| df["dataset"].astype(str).str.lower().str.contains(q, na=False) | |
| ) | |
| df = df[mask] | |
| # Tag filter (AND semantics) | |
| for t in (tag_filters or []): | |
| t = str(t).strip() | |
| if t: | |
| df = df[df["tags"].astype(str).apply( | |
| lambda s: t in [x.strip() for x in str(s).split(",") if x.strip()] | |
| )] | |
| # Dropdown filters (exact match) | |
| if category_filter: | |
| df = df[df["category"] == category_filter] | |
| if dataset_filter: | |
| df = df[df["dataset"] == dataset_filter] | |
| # Build vocab choices—ALWAYS define these, even if df is empty | |
| if not df.empty: | |
| all_tags = sorted({t.strip() | |
| for s in df["tags"].astype(str).tolist() | |
| for t in s.split(",") if t.strip()}) | |
| all_cats = sorted([c for c in df["category"].astype(str).unique() if c]) | |
| all_sets = sorted([c for c in df["dataset"].astype(str).unique() if c]) | |
| else: | |
| all_tags, all_cats, all_sets = [], [], [] | |
| # Compose the table view safely | |
| df = df.sort_values("uploaded_at", ascending=False, na_position="last").reset_index(drop=True) | |
| view = pd.DataFrame(columns=TABLE_COLS) | |
| if not df.empty: | |
| # Ensure all TABLE_COLS exist before projection | |
| for c in TABLE_COLS: | |
| if c not in df.columns: | |
| df[c] = "" | |
| view = df[TABLE_COLS].copy() | |
| count_text = f"**Showing {len(view)} file(s)**" | |
| return ( | |
| view, | |
| gr.update(choices=all_tags), | |
| gr.update(choices=[""] + all_cats, value=category_filter or ""), | |
| gr.update(choices=[""] + all_sets, value=dataset_filter or ""), | |
| count_text | |
| ) | |
| # ---------- Preview ---------- | |
| def _iframe_from_html_string(raw_html: str, height_px: int = 720) -> str: | |
| srcdoc = raw_html.replace("&", "&").replace('"', """) | |
| return f'<iframe style="width:100%;height:{height_px}px;border:1px solid #ddd;border-radius:8px;" srcdoc="{srcdoc}"></iframe>' | |
| def select_row(evt: gr.SelectData, table_value, source_mode, current_model): | |
| try: | |
| view = _df_from_table_value(table_value) | |
| if view.empty: | |
| return "<em>No rows.</em>", "" | |
| # --- resolve row_idx robustly --- | |
| row_idx = None | |
| ix = getattr(evt, "index", None) | |
| if isinstance(ix, int): | |
| row_idx = ix | |
| elif isinstance(ix, (list, tuple)) and ix and isinstance(ix[0], int): | |
| row_idx = ix[0] | |
| if row_idx is None: | |
| val = getattr(evt, "value", None) | |
| if isinstance(val, dict) and "id" in val: | |
| hits = view.index[view["id"] == val["id"]].tolist() | |
| if hits: row_idx = hits[0] | |
| elif isinstance(val, list) and len(val) >= 1: | |
| hits = view.index[view["id"] == val[0]].tolist() | |
| if hits: row_idx = hits[0] | |
| if row_idx is None: | |
| row_idx = 0 | |
| if not (0 <= row_idx < len(view)): | |
| return "<em>Invalid selection.</em>", "" | |
| row = view.iloc[row_idx] | |
| sel_id = row["id"] | |
| # --- look up the full record from DB --- | |
| db = _load_db() | |
| rec = db[db["id"] == sel_id] | |
| if rec.empty: | |
| return "<em>Could not find file for this row.</em>", "" | |
| # --- choose source: HF vs Local --- | |
| use_hf = (str(source_mode).upper() == "HF") | |
| path_str = rec["hf_path"].values[0] if use_hf else rec["path"].values[0] | |
| path_str = str(path_str or "") | |
| if not path_str: | |
| return "<em>No path available for this source.</em>", f"📄 {row['filename']}" | |
| # HF dataset URI → lazy download then iframe from raw HTML | |
| if path_str.startswith("hf://"): | |
| repo_id, relpath = parse_hf_uri(path_str) | |
| relpath = unquote(relpath) | |
| try: | |
| # first attempt: whatever is saved in library.csv | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| filename=relpath, | |
| local_files_only=False, | |
| ) | |
| except EntryNotFoundError: | |
| # fix relpath from index.csv, **scoped by active model** (handles shard dir) | |
| fixed_hf_path = _refresh_hf_path_from_index( | |
| sel_id, | |
| row["filename"], | |
| current_model_hint=current_model | |
| ) | |
| if not fixed_hf_path: | |
| msg = ( | |
| f"<em>Entry not found for <code>{_py_html.escape(relpath)}</code>, " | |
| f"and no matching row for <code>{_py_html.escape(str(row['filename']))}</code> " | |
| f"under the current model in <code>index.csv</code>.</em>" | |
| ) | |
| return msg, f"📄 {row['filename']}" | |
| repo_id, relpath = parse_hf_uri(fixed_hf_path) | |
| relpath = unquote(relpath) | |
| # single retry with corrected shard-aware relpath | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| filename=relpath, | |
| local_files_only=False, | |
| ) | |
| except RepositoryNotFoundError: | |
| return ( | |
| "<em>Dataset repo not accessible. If it's private, set an HF token in Space Secrets " | |
| "as <code>HF_TOKEN</code> (or <code>HUGGINGFACE_HUB_TOKEN</code>) and restart.</em>", | |
| f"📄 {row['filename']}", | |
| ) | |
| # success | |
| raw_html = Path(local_path).read_text(encoding="utf-8") | |
| iframe = _iframe_from_html_string(raw_html, height_px=720) | |
| return iframe, f"📄 {row['filename']}" | |
| # Direct HTTP URL (CDN) | |
| if path_str.startswith("http"): | |
| iframe = f'<iframe style="width:100%;height:720px;border:1px solid #ddd;border-radius:8px;" src="{_py_html.escape(path_str)}"></iframe>' | |
| return iframe, f"📄 {row['filename']}" | |
| # Local file fallback | |
| p = Path(path_str) | |
| if not p.exists(): | |
| return f"<em>File not found:</em> <code>{_py_html.escape(str(p))}</code>", f"📄 {row['filename']}" | |
| raw_html = p.read_text(encoding="utf-8") | |
| iframe = _iframe_from_html_string(raw_html, height_px=720) | |
| return iframe, f"📄 {row['filename']}" | |
| except EntryNotFoundError: | |
| # just in case one bubbles up | |
| return "<em>File not found in dataset even after shard fix.</em>", "" | |
| except Exception as e: | |
| traceback.print_exc() | |
| return f"<pre>Failed to render (see terminal):\n{_py_html.escape(str(e))}</pre>", "" | |
| # ---------- Save edits ---------- | |
| def save_edits(edited_table, current_model): | |
| if edited_table is None or not len(edited_table): | |
| return gr.Info("Nothing to save.") | |
| df_db = _load_db() | |
| editable_cols = ["category","dataset","tags","keywords","notes"] | |
| for c in editable_cols: | |
| edited_table[c] = edited_table[c].fillna("").astype(str) | |
| for _, row in edited_table.iterrows(): | |
| i = df_db.index[df_db["id"] == row["id"]] | |
| if len(i): | |
| for c in editable_cols: | |
| df_db.at[i[0], c] = row[c] | |
| _save_db(df_db) | |
| # return refreshed table only (respect current_model scope) | |
| return refresh_view("", [], "", "", current_model)[0] | |
| # -------------------- UI -------------------- | |
| # CSS that targets only the three buttons via elem_id | |
| custom_css = """ | |
| /* scope styles to only these 3 components */ | |
| #sync-btn button, | |
| #refresh-btn button, | |
| #save-btn button, | |
| #sync-btn .gr-button, | |
| #refresh-btn .gr-button, | |
| #save-btn .gr-button, | |
| #sync-btn [role="button"], | |
| #refresh-btn [role="button"], | |
| #save-btn [role="button"] { | |
| background: #f97316 !important; /* orange-500 */ | |
| border-color: #f97316 !important; | |
| color: #fff !important; | |
| } | |
| /* hover/active */ | |
| #sync-btn button:hover, | |
| #refresh-btn button:hover, | |
| #save-btn button:hover, | |
| #sync-btn .gr-button:hover, | |
| #refresh-btn .gr-button:hover, | |
| #save-btn .gr-button:hover, | |
| #sync-btn [role="button"]:hover, | |
| #refresh-btn [role="button"]:hover, | |
| #save-btn [role="button"]:hover { | |
| background: #ea580c !important; /* orange-600 */ | |
| border-color: #ea580c !important; | |
| } | |
| /* (optional) also set CSS vars in case theme uses them */ | |
| #sync-btn, #refresh-btn, #save-btn { | |
| --button-primary-background-fill: #f97316; | |
| --button-primary-background-fill-hover: #ea580c; | |
| --button-text-color: #fff; | |
| } | |
| """ | |
| with gr.Blocks(title="Audio HTML Library", css=custom_css) as demo: | |
| gr.Markdown("## 🎧 Audio Reconstruction Reports — sync • search • view") | |
| current_model = gr.State("") # remembers active model prefix inside HF repo | |
| source_mode = gr.State("HF") # default | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Choose model & sync | |
| gr.Markdown(f"**Model prefix on HF dataset:** `{HF_DATASET_REPO}/<model_name>/...`") | |
| model_in = gr.Textbox(label="Model name", placeholder="e.g., WavCochV8192") | |
| sync_btn = gr.Button("Sync this model", elem_id="sync-btn") | |
| # Search & filters | |
| gr.Markdown("---\n**Search & filter**") | |
| query = gr.Textbox(label="Keyword search (filename/tags/notes/category/dataset)", placeholder="type to search…") | |
| tag_filter = gr.CheckboxGroup(choices=[], label="Filter by tags (AND)") | |
| category_filter = gr.Dropdown(choices=[], label="Category") | |
| dataset_filter = gr.Dropdown(choices=[], label="Dataset") | |
| # 🔽 Step 5: Source toggle (HF vs Local) | |
| mode_radio = gr.Radio( | |
| choices=["HF", "Local"], | |
| value="HF", | |
| label="Source", | |
| info="Preview from HF dataset or local disk" | |
| ) | |
| refresh_btn = gr.Button("Refresh", elem_id="refresh-btn") | |
| with gr.Column(scale=2): | |
| # Count of current view | |
| count_md = gr.Markdown("**Showing 0 file(s)**") | |
| gr.Markdown("**Library** (click a row to preview; edit cells and Save)") | |
| table = gr.Dataframe( | |
| headers=TABLE_COLS, | |
| datatype=["str"] * len(TABLE_COLS), | |
| interactive=True, | |
| wrap=True, | |
| row_count=(0, "dynamic"), | |
| col_count=(len(TABLE_COLS), "fixed") | |
| ) | |
| with gr.Row(): | |
| save_btn = gr.Button("Save Edits", elem_id="save-btn") | |
| preview_label = gr.Markdown("") | |
| preview_html = gr.HTML("") | |
| # wiring: sync (also sets current_model) | |
| sync_btn.click( | |
| sync_model, | |
| [model_in], | |
| [table, tag_filter, category_filter, dataset_filter, count_md, current_model] | |
| ) | |
| # wiring: refresh + live filters (respect current_model) | |
| refresh_btn.click( | |
| refresh_view, | |
| [query, tag_filter, category_filter, dataset_filter, current_model], | |
| [table, tag_filter, category_filter, dataset_filter, count_md] | |
| ) | |
| # Trigger refresh when any filter OR source mode changes | |
| for comp in (query, tag_filter, category_filter, dataset_filter, mode_radio): | |
| comp.change( | |
| refresh_view, | |
| [query, tag_filter, category_filter, dataset_filter, current_model], | |
| [table, tag_filter, category_filter, dataset_filter, count_md] | |
| ) | |
| # Keep source_mode state in sync with the radio | |
| mode_radio.change(lambda x: x, [mode_radio], [source_mode]) | |
| # Pass source_mode into select_row so it can choose hf_path vs path | |
| table.select(select_row, [table, source_mode, current_model], [preview_html, preview_label]) | |
| save_btn.click(save_edits, [table, current_model], [table]) | |
| # initial load (no model yet) | |
| demo.load( | |
| refresh_view, | |
| [query, tag_filter, category_filter, dataset_filter, current_model], | |
| [table, tag_filter, category_filter, dataset_filter, count_md] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) # auth optional | |