demo-curation / plan.md
tianhaowang's picture
update plan.md
77456ce

A newer version of the Gradio SDK is available: 5.49.1

Upgrade

MVP build for “Data Curation Workbench” (Hugging Face Space)

0) MVP Goal & Scope

Goal: Let a signed‑in user upload D₀ (or reference a Hub dataset), pick a model + metrics, choose candidate datasets {D₁…Dₙ}, launch small‑scale fine‑tunes/evals as detached Jobs, and view:

  • per‑run metrics (loss / F1 / Exact‑Match),
  • a scaling‑law plot, and
  • a table ranking which Dₖ helps the most,
  • with all artifacts saved to a results dataset or Space storage.

Out of scope (for MVP):

  • Multi‑GPU distributed training, multi‑task mixing UI, complex hyperparam sweeps.
  • Non‑text tasks.

1) Repository Layout

Create these files/folders:

.
├─ README.md
├─ PLAN.md                        # this file
├─ app.py                         # Gradio UI + Job submission + status polling
├─ requirements.txt
├─ catalog/
│  └─ candidates.json             # curated {D₁…Dₙ}
├─ utils/
│  ├─ hub.py                      # upload to Hub, results repo helpers
│  ├─ data.py                     # dataset loading/mixing/helpers
│  └─ plotting.py                 # scaling plot helper
└─ jobs/
   ├─ run_experiment.py           # orchestrates one D₀ ⊕ Dₖ experiment (multi sizes)
   ├─ train.py                    # PEFT/QLoRA SFT
   ├─ eval.py                     # metrics (loss/F1/Exact-Match)
   └─ scaling.py                  # fit & predict scaling law

2) Configuration & Env

Space Settings → Secrets/Variables (already done for step 2, list here for reference):

  • SERVICE_HF_TOKEN (secret, write‑scoped; used to create/push results datasets)
  • RESULTS_REPO (optional, like your-org/curation-results; if absent, create on first run)
  • HF_HOME=/data/.huggingface (variable) if Persistent Storage is enabled
  • PERSIST_DIR=/data (variable) if Persistent Storage is enabled

NOTE: RESULTS_REPO is absent now; Persistent Storage is NOT enabled yet.

Runtime assumptions:

  • Space uses Gradio SDK.
  • Jobs will request a GPU flavor (e.g., a10g-small) for training; UI itself can run on CPU.

Currently the Space Hardware is ZeroGPU.


3) Dependencies

requirements.txt

gradio>=5
huggingface_hub>=0.25
datasets>=2.20
transformers>=4.44
peft>=0.13
trl>=0.9
evaluate>=0.4
scikit-learn>=1.5
numpy>=1.26
pandas>=2.2
matplotlib>=3.8

4) Candidate Datasets Catalog

catalog/candidates.json (minimal starter; adjust to your domain)

[
  {
    "id": "glue/sst2",
    "task": "classification",
    "license": "open",
    "size_hint": "67k",
    "columns": {"text": "sentence", "label": "label"},
    "labels": ["negative","positive"]
  },
  {
    "id": "ag_news",
    "task": "classification",
    "license": "cc-by-3.0",
    "size_hint": "120k",
    "columns": {"text": "text", "label": "label"},
    "labels": ["World","Sports","Business","Sci/Tech"]
  },
  {
    "id": "squad",
    "task": "qa",
    "license": "cc-by-sa-4.0",
    "size_hint": "100k",
    "columns": {"question": "question", "context": "context", "answers": "answers"}
  }
]

For MVP, support classification and extractive QA. The columns mapping lets us normalize heterogeneous datasets without complex UI.


5) UI — app.py (Gradio)

5.1 Features

  • LoginButton (OAuth) → captures gr.OAuthProfile and gr.OAuthToken.
  • D₀ input: either upload files (.jsonl/.csv/.parquet/.zip) or provide a Hub dataset id.
  • Model dropdown: start with meta-llama/Llama-3.1-8B-Instruct.
  • Task selector (classification or QA). (MVP: single task per run.)
  • Benchmark/test set: upload small test data or provide Hub split.
  • Metrics checkboxes: loss, f1, exact_match (show exact_match only for QA).
  • Candidate datasets: multiselect from candidates.json.
  • Run experiments button: submits one Job per selected Dₖ.
  • Jobs table: ID, Dₖ, status, logs link, artifacts link.
  • Results view: scaling plot + ranked table when jobs finish.

5.2 Implementation Sketch

  • Parse OAuth token; we’ll prefer the user token for reading gated models, but use SERVICE_HF_TOKEN for writing artifacts.

  • If user uploads D₀, compress if needed and push to a private dataset repo via utils/hub.ensure_uploaded_dataset(...).

  • Submit a Job per Dₖ with:

    • command: python jobs/run_experiment.py --model ... --d0 ... --dk ... --task ... --metrics ... --results_repo ...
    • flavor="a10g-small" (configurable)
    • timeout (e.g., 7200 seconds)
    • env: HF_TOKEN (read), SERVICE_HF_TOKEN (write), plus RESULTS_REPO if set.
  • Store job metadata in a gr.State list; start a poller (every ~10–15s) to refresh status via huggingface_hub.inspect_job(...).

  • When a job completes, show a link to its artifacts (scaling plot, metrics JSON) and update the results table.

Acceptance criteria

  • Launching a run queues N jobs (N = number of selected Dₖ).
  • Status column transitions through “queued/running/completed/failed”.
  • Clicking an artifacts link opens an image/json from results repo (or Space storage).

6) Hub Utilities — utils/hub.py

Functions to implement

  • ensure_uploaded_dataset(upload_files, d0_dataset_id, user_token) -> str

    • If d0_dataset_id is provided, return it.
    • Else create a private dataset repo under your org (e.g., your-org/curation-upload-<uuid>), upload files/folder, and return repo id.
  • ensure_results_repo(service_token, results_repo_env) -> str

    • If RESULTS_REPO is set, ensure it exists; else create your-org/curation-results.
  • push_artifacts(repo_id, local_dir, subdir) -> None

    • Upload a local folder (e.g., artifacts/<job-id>/...) to repo_id/subdir.

Acceptance criteria

  • Uploading a small CSV/JSONL creates a private dataset and returns a valid repo id.
  • Pushing artifacts creates/updates files in the results repo with versioned commits.

7) Data Helpers — utils/data.py

Responsibilities

  • Load D₀ and Dₖ from the Hub (and optional test set).
  • Normalize columns using the columns mapping from candidates.json or a provided override.
  • Build mixtures of D₀ ⊕ Dₖ at multiple sizes (e.g., {10k, 20k, 40k} examples).
  • For classification: expect {"text": str, "label": int} after normalization. For QA: expect {"question": str, "context": str, "answers": {"text":[...], "answer_start":[...]}}.

API

def load_dataset_normalized(repo_or_id, task, columns_map=None, split="train"):
    """Return a datasets.Dataset with normalized columns for the given task."""
    ...

def build_mixtures(d0_ds, dk_ds, sizes=[10_000, 20_000, 40_000], d0_ratio=0.5, seed=42):
    """Return dict: size -> datasets.Dataset of mixed examples (shuffled, repeat/trim as needed)."""

def load_benchmark(repo_or_id_or_path, task, split="validation"):
    """Return a small test set normalized for the chosen task."""

Acceptance criteria

  • Given a known dataset id, load_dataset_normalized(...) returns columns as specified.
  • build_mixtures(...) returns ≥2 sizes with the requested counts.

8) Plotting Helper — utils/plotting.py

API

def plot_scaling(sizes, y_values, y_label, out_path):
    """Save a simple matplotlib PNG (log-x) with points + fitted curve if provided."""
  • Use matplotlib; one figure per plot; do not enforce custom colors/styles.

Acceptance criteria

  • Calling plot_scaling(...) produces a PNG saved to out_path without errors.

9) Training — jobs/train.py (PEFT/QLoRA SFT)

NOTE: Currently the Space Hardware is ZeroGPU. For testing purpose, the training part can be replaced by extremely small models.

Responsibilities

  • Load model + tokenizer (e.g., meta-llama/Llama-3.1-8B-Instruct).
  • Apply LoRA (or QLoRA).
  • Tokenize dataset and run short SFT.

API (sketch)

from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments
from peft import LoraConfig, get_peft_model
from trl import SFTTrainer

def train_peft(model_id, train_ds, output_dir, max_steps=500, lr=2e-4, lora_r=8):
    tok = AutoTokenizer.from_pretrained(model_id, use_fast=True)
    base = AutoModelForCausalLM.from_pretrained(model_id)
    peft_cfg = LoraConfig(r=lora_r, lora_alpha=16, lora_dropout=0.05, task_type="CAUSAL_LM")
    model = get_peft_model(base, peft_cfg)

    def format_example(ex):
        # classification: concatenate prompt; QA: question + context formatting
        # MVP: simple "<s>[INST] ... [/INST]" style or plain text target
        return {"text": ex["text"]}  # adjust per task

    # Tokenization & SFTTrainer; keep it simple for MVP
    tr_args = TrainingArguments(output_dir=output_dir, per_device_train_batch_size=4,
                                gradient_accumulation_steps=4, learning_rate=lr,
                                max_steps=max_steps, logging_steps=50, save_steps=0)
    trainer = SFTTrainer(model=model, tokenizer=tok, train_dataset=train_ds,
                         dataset_text_field="text", args=tr_args)
    trainer.train()
    # Save adapter only
    trainer.save_model(output_dir)
    return output_dir

Acceptance criteria

  • On a tiny dataset (few hundred samples), training completes and saves an adapter folder.

10) Evaluation — jobs/eval.py

Responsibilities

  • Run evaluation for the selected task using the fine‑tuned adapter.
  • For classification: compute loss (optional) and f1.
  • For QA: compute exact_match (and f1 if you want both).

API (sketch)

import evaluate
import numpy as np

def eval_classification(model_id_or_path, test_ds):
    # Use pipeline or model.generate + simple argmax classifier (MVP)
    # Better: a small classification head; MVP keeps it simple.
    f1 = evaluate.load("f1")
    preds, refs = ..., ...
    return {"f1": f1.compute(predictions=preds, references=refs)["f1"]}

def eval_qa(model_id_or_path, test_ds):
    exact = evaluate.load("exact_match")
    # MVP: heuristic span matching if using generative outputs;
    # or reuse baseline SQuAD eval if test_ds has 'answers'.
    em = exact.compute(predictions=preds, references=refs)["exact_match"]
    return {"exact_match": em}

Note: For MVP, inference can be slow. Keep test sets small (e.g., 500–1,000 examples) and batch where possible.

Acceptance criteria

  • For a toy dataset, returns a metrics dict with expected keys.

11) Scaling Law — jobs/scaling.py

Responsibilities

  • Fit a simple power‑law over points (size → metric).
  • For “higher‑is‑better” metrics, convert to a pseudo‑loss (e.g., 1 - score) during fitting if desired.
  • Produce a prediction at a user‑defined large‑scale target (e.g., N* = 200k examples).

API (sketch)

import numpy as np

def fit_powerlaw(sizes, scores, higher_is_better=True):
    sizes = np.asarray(sizes, float)
    y = np.asarray(scores, float)
    if higher_is_better:
        # Fit to (1 - score) ~ b * N^{-alpha}
        z = np.log(np.maximum(1e-9, 1 - y))
    else:
        # Direct loss scaling
        z = np.log(np.maximum(1e-9, y))
    x = np.log(sizes)
    k, c = np.polyfit(x, z, 1)         # z ≈ k*log N + c
    alpha = -k; b = np.exp(c)
    return {"alpha": float(alpha), "b": float(b)}

def predict_powerlaw(size, fit_params, higher_is_better=True):
    alpha, b = fit_params["alpha"], fit_params["b"]
    if higher_is_better:
        loss_hat = b * (size ** (-alpha))
        return float(1 - loss_hat)
    return float(b * (size ** (-alpha)))

Acceptance criteria

  • Given ≥2 points (prefer 3+), returns fit parameters and a plausible prediction.
  • Combined with utils/plotting.plot_scaling(...), writes a PNG with points + curve.

12) Experiment Orchestrator — jobs/run_experiment.py

Responsibilities

  • Parse CLI args: --model, --task, --d0, --dk, --metrics ..., --sizes 10000 20000, --target_size 200000, --results_repo <id>, --job_id <uuid>.

  • Create working dirs: artifacts/<job_id>/.

  • Load datasets (D₀, Dₖ), build mixtures for requested sizes.

  • For each size:

    1. run short train (adapter saved under artifacts/<job_id>/adapters/size-<N>),
    2. run eval on the benchmark set → collect metrics.
  • Fit scaling across sizes; produce:

    • metrics.json (per‑size metrics, fit params, predicted large‑scale performance),
    • scaling.png (plot).
  • Push artifacts/<job_id>/ to results_repo under experiments/<user>/<job_id>/... using utils/hub.push_artifacts(...).

  • Print a final JSON line to stdout with the artifacts path (UI can parse logs if needed).

CLI Skeleton

import argparse, json, os, uuid
from utils import hub, data, plotting
from jobs import train, eval as evalm, scaling

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--model", required=True)
    ap.add_argument("--task", choices=["classification","qa"], required=True)
    ap.add_argument("--d0", required=True)
    ap.add_argument("--dk", required=True)
    ap.add_argument("--metrics", nargs="+", default=["f1"])
    ap.add_argument("--sizes", nargs="+", type=int, default=[10000, 20000, 40000])
    ap.add_argument("--target_size", type=int, default=200000)
    ap.add_argument("--results_repo", default=os.getenv("RESULTS_REPO",""))
    ap.add_argument("--job_id", default=str(uuid.uuid4()))
    args = ap.parse_args()

    # Setup dirs
    out_dir = os.path.abspath(os.path.join("artifacts", args.job_id))
    os.makedirs(out_dir, exist_ok=True)

    # Load datasets
    d0 = data.load_dataset_normalized(args.d0, args.task)
    dk = data.load_dataset_normalized(args.dk, args.task)
    test = data.load_benchmark(args.d0, args.task, split="validation")  # MVP: reuse D₀ val if none provided

    # Build mixtures & run train/eval
    per_size = []
    for N in args.sizes:
        mix = data.build_mixtures(d0, dk, sizes=[N])[N]
        adapter_dir = os.path.join(out_dir, f"adapter_size_{N}")
        train.train_peft(args.model, mix, adapter_dir, max_steps=300)  # MVP: few steps
        metrics = {}
        if args.task == "classification":
            metrics.update(evalm.eval_classification(adapter_dir, test))
        else:
            metrics.update(evalm.eval_qa(adapter_dir, test))
        per_size.append({"size": N, "metrics": metrics})

    # Fit scaling on the primary metric
    key = "exact_match" if args.task == "qa" else "f1"
    sizes = [r["size"] for r in per_size]
    scores = [r["metrics"][key] for r in per_size]
    fit = scaling.fit_powerlaw(sizes, scores, higher_is_better=True)
    pred = scaling.predict_powerlaw(args.target_size, fit, higher_is_better=True)

    # Write artifacts
    mpath = os.path.join(out_dir, "metrics.json")
    with open(mpath, "w") as f:
        json.dump({"runs": per_size, "fit": fit, "prediction": { "target_size": args.target_size, key: pred }}, f, indent=2)

    plotting.plot_scaling(sizes, scores, key, os.path.join(out_dir, "scaling.png"))

    # Push artifacts
    repo_id = hub.ensure_results_repo(os.getenv("SERVICE_HF_TOKEN"), args.results_repo)
    hub.push_artifacts(repo_id, out_dir, subdir=f"experiments/{args.job_id}")

    print(json.dumps({"status":"ok","artifacts_repo": repo_id, "path": f"experiments/{args.job_id}"}))

if __name__ == "__main__":
    main()

Acceptance criteria

  • Running with tiny toy inputs creates artifacts/<job_id>/ + pushes to results repo.
  • metrics.json and scaling.png exist and look sensible.

13) Job Submission from UI — app.py (continued)

Core actions

  • Submit: for each selected Dₖ → call huggingface_hub.run_job(...) with:

    • image: CUDA‑capable (e.g., pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel)
    • command: ["python","jobs/run_experiment.py", "--model", model_id, "--task", task, "--d0", d0_repo, "--dk", dk_id, "--metrics", *metrics, "--sizes", *sizes, "--target_size", str(target_size), "--results_repo", results_repo_or_empty]
    • flavor: "a10g-small"
    • timeout: e.g., 7200 (seconds)
    • env: {"HF_TOKEN": user_token or SERVICE_HF_TOKEN, "SERVICE_HF_TOKEN": SERVICE_HF_TOKEN, "RESULTS_REPO": RESULTS_REPO}
  • Poll: keep a dict {job_id: {dk, status, url, artifacts}}; update via inspect_job(job_id); for completed, set artifacts link to hf://<results_repo>/experiments/<job_id>/.

Acceptance criteria

  • Submitting 2 Dₖ creates 2 jobs; both progress independently; artifacts link works.

14) Guardrails & Licensing

  • Gated models: probe download with hf_hub_download(model_id, filename="README.md", token=user_token) to confirm access; if 401/403, show a clear message to accept the license on the model card.
  • Dataset licensing: surface the license field from candidates.json next to each Dₖ; later fetch from Hub.
  • Uploads: warn users that uploaded D₀ will be stored in a private dataset (repo id shown in UI); provide a “Delete my upload” note linking to the repo.
  • Resource limits: cap sizes (sizes=[5_000, 10_000] for MVP), cap number of concurrent jobs per user (client‑side only for MVP).

15) Testing

Local (CPU) sanity checks

  • Use a very small subset (e.g., 200 examples) and max_steps=10 to verify the end‑to‑end loop without a GPU.
  • Mock run_job(...) (optional) to test UI job table logic.

Space integration

  • Create a private test Space results repo (e.g., your-org/curation-results-test).

  • Submit a single Dₖ job and verify:

    • artifacts/ created,
    • metrics.json contains per‑size metrics and prediction,
    • scaling.png renders,
    • artifacts are uploaded and visible from the UI link.

16) Definition of Done (DoD)

  • A signed‑in user can:

    1. Provide D₀ (upload or Hub id),
    2. Choose model, task, metrics, and ≥1 Dₖ,
    3. Click Run and see a job per Dₖ with live status,
    4. Open artifacts (plot + metrics),
    5. See a ranked table of Dₖ by the chosen primary metric,
    6. (Optional) Download metrics.json.
  • All long work executes as Jobs (no HTTP timeouts).

  • Artifacts persist in a results dataset or Space storage.


17) Nice‑to‑Have (post‑MVP)

  • Column mapping UI: let users map their D₀ columns to text/label or question/context/answers.
  • Seed sweeps and confidence intervals on scaling fit.
  • Hardware selector and budget estimator.
  • vLLM/TGI inference for faster eval.
  • Per‑user “My Experiments” page (prefix experiments/<username>/...).

18) Task Checklist (assignable to your agent)

A. Scaffolding

  • Add requirements.txt; ensure importable on the Space.
  • Create folders: catalog/, utils/, jobs/.

B. Catalog

  • Fill catalog/candidates.json (3–6 datasets), including columns mapping.

C. Hub utilities (utils/hub.py)

  • ensure_uploaded_dataset(...)
  • ensure_results_repo(...)
  • push_artifacts(...)

D. Data helpers (utils/data.py)

  • load_dataset_normalized(...) for classification + QA
  • build_mixtures(...)
  • load_benchmark(...)

E. Plotting (utils/plotting.py)

  • plot_scaling(...)

F. Jobs

  • jobs/train.py (PEFT SFT)
  • jobs/eval.py (classification + QA)
  • jobs/scaling.py (fit + predict)
  • jobs/run_experiment.py (glue the above, produce artifacts, push)

G. UI (app.py)

  • Build form (inputs, selectors, candidates list)
  • Submit one job per Dₖ via run_job(...)
  • Poll job status & render jobs table
  • Artifacts viewer: link to results repo path
  • Basic error messages (license issues, upload failures)

H. Tests

  • Local micro‑run (CPU) with tiny sizes
  • Space run on GPU flavor with one Dₖ
  • Verify artifacts + plot + ranking table

19) Code Snippets to Start Implementation

app.py — minimal UI skeleton (submit + poll)

import os, json, time, gradio as gr
from huggingface_hub import run_job, inspect_job
from utils.hub import ensure_uploaded_dataset, ensure_results_repo

CANDIDATES = json.load(open("catalog/candidates.json"))

def submit(d0_files, d0_id, task, model, metrics, dk_list, sizes, target_size,
           profile: gr.OAuthProfile | None, oauth: gr.OAuthToken | None):
    user_token = getattr(oauth, "token", None)
    d0_repo = ensure_uploaded_dataset(d0_files, d0_id, user_token=user_token)
    results_repo = ensure_results_repo(os.getenv("SERVICE_HF_TOKEN"), os.getenv("RESULTS_REPO",""))
    jobs = []
    for dk in dk_list:
        cmd = ["python","jobs/run_experiment.py",
               "--model", model, "--task", task, "--d0", d0_repo, "--dk", dk,
               "--metrics", *metrics, "--sizes", *[str(s) for s in sizes],
               "--target_size", str(target_size), "--results_repo", results_repo]
        job = run_job(
            image="pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel",
            command=cmd,
            flavor="a10g-small",
            timeout=7200,
            env={"HF_TOKEN": user_token or os.getenv("SERVICE_HF_TOKEN"),
                 "SERVICE_HF_TOKEN": os.getenv("SERVICE_HF_TOKEN"),
                 "RESULTS_REPO": results_repo},
        )
        jobs.append({"id": job.id, "dk": dk, "url": job.url, "status": "queued", "artifacts": ""})
    return jobs

def poll(jobs_state):
    updated = []
    for j in jobs_state:
        info = inspect_job(j["id"])
        st = info.status  # "queued"/"running"/"completed"/"failed"
        art = j.get("artifacts","")
        # Heuristic: artifacts live in RESULTS_REPO/experiments/<job_id> (set by run_experiment.py)
        if st == "completed" and not art:
            art = f"{os.getenv('RESULTS_REPO','(repo)')}/experiments/{j['id']}"
        updated.append({**j, "status": st, "artifacts": art})
    return updated

with gr.Blocks() as demo:
    prof = gr.LoginButton()
    with gr.Row():
        d0_files = gr.UploadButton("Upload D₀ (.csv/.jsonl/.zip)", file_count="multiple")
        d0_id = gr.Textbox(label="or Hub dataset id (user/dataset)")
    task = gr.Radio(choices=["classification","qa"], value="classification", label="Task")
    model = gr.Dropdown(choices=["meta-llama/Llama-3.1-8B-Instruct"], label="Model")
    metrics = gr.CheckboxGroup(choices=["loss","f1","exact_match"], value=["f1"], label="Metrics")
    dk = gr.CheckboxGroup(choices=[c["id"] for c in CANDIDATES], label="Candidate datasets")
    sizes = gr.CheckboxGroup(choices=[5000,10000,20000], value=[5000,10000], label="Mixture sizes")
    target_size = gr.Number(value=200000, label="Target size for prediction")
    run_btn = gr.Button("Run experiments")

    jobs_state = gr.State([])
    jobs_table = gr.Dataframe(headers=["id","dk","status","url","artifacts"], datatype=["str","str","str","str","str"])

    run_btn.click(fn=submit,
                  inputs=[d0_files, d0_id, task, model, metrics, dk, sizes, target_size, gr.OAuthProfile, gr.OAuthToken],
                  outputs=jobs_state)

    gr.Button("Refresh status").click(fn=poll, inputs=jobs_state, outputs=jobs_state)

    def render_table(jobs):  # render as simple rows
        rows = [[j["id"], j["dk"], j["status"], j["url"], j["artifacts"]] for j in jobs]
        return rows
    jobs_state.change(fn=render_table, inputs=jobs_state, outputs=jobs_table)

    gr.Markdown("Open artifacts in the results repo once jobs complete.")

demo.queue().launch()

utils/hub.py — upload & results

import os, uuid, tempfile, shutil
from huggingface_hub import HfApi, create_repo, upload_file, upload_folder

def ensure_uploaded_dataset(upload_files, d0_dataset_id, user_token=None):
    if d0_dataset_id:
        return d0_dataset_id
    if not upload_files:  # nothing uploaded
        raise ValueError("Please upload D₀ or provide a Hub dataset id.")
    api = HfApi(token=os.getenv("SERVICE_HF_TOKEN"))
    repo_id = f"{os.getenv('HF_ORG','your-org')}/curation-upload-{uuid.uuid4().hex[:8]}"
    create_repo(repo_id, repo_type="dataset", private=True, exist_ok=True, token=os.getenv("SERVICE_HF_TOKEN"))

    with tempfile.TemporaryDirectory() as tmp:
        # Gradio returns a list of tempfiles; copy them into a folder
        for f in upload_files:
            dst = os.path.join(tmp, os.path.basename(getattr(f,"name", "file")))
            shutil.copyfile(f.name if hasattr(f,"name") else f, dst)
        upload_folder(folder_path=tmp, repo_id=repo_id, repo_type="dataset", token=os.getenv("SERVICE_HF_TOKEN"))
    return repo_id

def ensure_results_repo(service_token, results_repo_env):
    api = HfApi(token=service_token)
    if results_repo_env:
        parts = results_repo_env.split("/")
        if len(parts) == 2:
            create_repo(results_repo_env, repo_type="dataset", private=True, exist_ok=True, token=service_token)
            return results_repo_env
    repo_id = f"{os.getenv('HF_ORG','your-org')}/curation-results"
    create_repo(repo_id, repo_type="dataset", private=True, exist_ok=True, token=service_token)
    return repo_id

def push_artifacts(repo_id, local_dir, subdir=""):
    path_in_repo = subdir.strip("/")
    upload_folder(folder_path=local_dir, repo_id=repo_id, repo_type="dataset",
                  path_in_repo=path_in_repo if path_in_repo else None,
                  token=os.getenv("SERVICE_HF_TOKEN"))