demo-curation / plan.md
tianhaowang's picture
update plan.md
77456ce
# MVP build for “Data Curation Workbench” (Hugging Face Space)
## 0) MVP Goal & Scope
**Goal:** Let a signed‑in user upload **D₀** (or reference a Hub dataset), pick a **model** + **metrics**, choose candidate datasets **{D₁…Dₙ}**, launch **small‑scale fine‑tunes/evals** as detached **Jobs**, and view:
* per‑run metrics (loss / F1 / Exact‑Match),
* a **scaling‑law** plot, and
* a table ranking which Dₖ helps the most,
* with all artifacts saved to a results dataset or Space storage.
**Out of scope (for MVP):**
* Multi‑GPU distributed training, multi‑task mixing UI, complex hyperparam sweeps.
* Non‑text tasks.
---
## 1) Repository Layout
Create these files/folders:
```
.
├─ README.md
├─ PLAN.md # this file
├─ app.py # Gradio UI + Job submission + status polling
├─ requirements.txt
├─ catalog/
│ └─ candidates.json # curated {D₁…Dₙ}
├─ utils/
│ ├─ hub.py # upload to Hub, results repo helpers
│ ├─ data.py # dataset loading/mixing/helpers
│ └─ plotting.py # scaling plot helper
└─ jobs/
├─ run_experiment.py # orchestrates one D₀ ⊕ Dₖ experiment (multi sizes)
├─ train.py # PEFT/QLoRA SFT
├─ eval.py # metrics (loss/F1/Exact-Match)
└─ scaling.py # fit & predict scaling law
```
---
## 2) Configuration & Env
**Space Settings → Secrets/Variables (already done for step 2, list here for reference):**
* `SERVICE_HF_TOKEN` (secret, write‑scoped; used to create/push results datasets)
* `RESULTS_REPO` (optional, like `your-org/curation-results`; if absent, create on first run)
* `HF_HOME=/data/.huggingface` (variable) **if** Persistent Storage is enabled
* `PERSIST_DIR=/data` (variable) **if** Persistent Storage is enabled
**NOTE: RESULTS_REPO is absent now; Persistent Storage is NOT enabled yet.**
**Runtime assumptions:**
* Space uses **Gradio SDK**.
* Jobs will request a **GPU flavor** (e.g., `a10g-small`) for training; UI itself can run on CPU.
**Currently the Space Hardware is ZeroGPU.**
---
## 3) Dependencies
`requirements.txt`
```
gradio>=5
huggingface_hub>=0.25
datasets>=2.20
transformers>=4.44
peft>=0.13
trl>=0.9
evaluate>=0.4
scikit-learn>=1.5
numpy>=1.26
pandas>=2.2
matplotlib>=3.8
```
---
## 4) Candidate Datasets Catalog
`catalog/candidates.json` (minimal starter; adjust to your domain)
```json
[
{
"id": "glue/sst2",
"task": "classification",
"license": "open",
"size_hint": "67k",
"columns": {"text": "sentence", "label": "label"},
"labels": ["negative","positive"]
},
{
"id": "ag_news",
"task": "classification",
"license": "cc-by-3.0",
"size_hint": "120k",
"columns": {"text": "text", "label": "label"},
"labels": ["World","Sports","Business","Sci/Tech"]
},
{
"id": "squad",
"task": "qa",
"license": "cc-by-sa-4.0",
"size_hint": "100k",
"columns": {"question": "question", "context": "context", "answers": "answers"}
}
]
```
> For MVP, support **classification** and **extractive QA**. The `columns` mapping lets us normalize heterogeneous datasets without complex UI.
---
## 5) UI — `app.py` (Gradio)
### 5.1 Features
* **LoginButton** (OAuth) → captures `gr.OAuthProfile` and `gr.OAuthToken`.
* **D₀ input**: either upload files (`.jsonl/.csv/.parquet/.zip`) or provide a **Hub dataset id**.
* **Model** dropdown: start with `meta-llama/Llama-3.1-8B-Instruct`.
* **Task** selector (classification or QA). (MVP: single task per run.)
* **Benchmark/test set**: upload small test data or provide Hub split.
* **Metrics** checkboxes: `loss`, `f1`, `exact_match` (show `exact_match` only for QA).
* **Candidate datasets**: multiselect from `candidates.json`.
* **Run experiments** button: submits **one Job per selected Dₖ**.
* **Jobs table**: ID, Dₖ, status, logs link, artifacts link.
* **Results view**: scaling plot + ranked table when jobs finish.
### 5.2 Implementation Sketch
* Parse OAuth token; we’ll prefer the user token for **reading gated models**, but use `SERVICE_HF_TOKEN` for **writing** artifacts.
* If user **uploads D₀**, compress if needed and push to a **private dataset repo** via `utils/hub.ensure_uploaded_dataset(...)`.
* Submit a **Job** per Dₖ with:
* command: `python jobs/run_experiment.py --model ... --d0 ... --dk ... --task ... --metrics ... --results_repo ...`
* `flavor="a10g-small"` (configurable)
* `timeout` (e.g., 7200 seconds)
* `env`: `HF_TOKEN` (read), `SERVICE_HF_TOKEN` (write), plus `RESULTS_REPO` if set.
* Store job metadata in a `gr.State` list; start a **poller** (every ~10–15s) to refresh status via `huggingface_hub.inspect_job(...)`.
* When a job completes, show a link to its artifacts (scaling plot, metrics JSON) and update the results table.
**Acceptance criteria**
* Launching a run queues N jobs (N = number of selected Dₖ).
* Status column transitions through “queued/running/completed/failed”.
* Clicking an artifacts link opens an image/json from results repo (or Space storage).
---
## 6) Hub Utilities — `utils/hub.py`
### Functions to implement
* `ensure_uploaded_dataset(upload_files, d0_dataset_id, user_token) -> str`
* If `d0_dataset_id` is provided, return it.
* Else create a **private dataset repo** under your org (e.g., `your-org/curation-upload-<uuid>`), upload files/folder, and return repo id.
* `ensure_results_repo(service_token, results_repo_env) -> str`
* If `RESULTS_REPO` is set, ensure it exists; else create `your-org/curation-results`.
* `push_artifacts(repo_id, local_dir, subdir) -> None`
* Upload a local folder (e.g., `artifacts/<job-id>/...`) to `repo_id/subdir`.
**Acceptance criteria**
* Uploading a small CSV/JSONL creates a private dataset and returns a valid repo id.
* Pushing artifacts creates/updates files in the results repo with versioned commits.
---
## 7) Data Helpers — `utils/data.py`
### Responsibilities
* Load D₀ and Dₖ from the Hub (and optional **test set**).
* Normalize columns using the `columns` mapping from `candidates.json` or a provided override.
* Build **mixtures** of D₀ ⊕ Dₖ at multiple sizes (e.g., `{10k, 20k, 40k}` examples).
* For **classification**: expect `{"text": str, "label": int}` after normalization.
For **QA**: expect `{"question": str, "context": str, "answers": {"text":[...], "answer_start":[...]}}`.
### API
```python
def load_dataset_normalized(repo_or_id, task, columns_map=None, split="train"):
"""Return a datasets.Dataset with normalized columns for the given task."""
...
def build_mixtures(d0_ds, dk_ds, sizes=[10_000, 20_000, 40_000], d0_ratio=0.5, seed=42):
"""Return dict: size -> datasets.Dataset of mixed examples (shuffled, repeat/trim as needed)."""
def load_benchmark(repo_or_id_or_path, task, split="validation"):
"""Return a small test set normalized for the chosen task."""
```
**Acceptance criteria**
* Given a known dataset id, `load_dataset_normalized(...)` returns columns as specified.
* `build_mixtures(...)` returns ≥2 sizes with the requested counts.
---
## 8) Plotting Helper — `utils/plotting.py`
### API
```python
def plot_scaling(sizes, y_values, y_label, out_path):
"""Save a simple matplotlib PNG (log-x) with points + fitted curve if provided."""
```
* Use matplotlib; one figure per plot; do not enforce custom colors/styles.
**Acceptance criteria**
* Calling `plot_scaling(...)` produces a PNG saved to `out_path` without errors.
---
## 9) Training — `jobs/train.py` (PEFT/QLoRA SFT)
**NOTE: Currently the Space Hardware is ZeroGPU. For testing purpose, the training part can be replaced by extremely small models.**
### Responsibilities
* Load model + tokenizer (e.g., `meta-llama/Llama-3.1-8B-Instruct`).
* Apply LoRA (or QLoRA).
* Tokenize dataset and run short SFT.
### API (sketch)
```python
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments
from peft import LoraConfig, get_peft_model
from trl import SFTTrainer
def train_peft(model_id, train_ds, output_dir, max_steps=500, lr=2e-4, lora_r=8):
tok = AutoTokenizer.from_pretrained(model_id, use_fast=True)
base = AutoModelForCausalLM.from_pretrained(model_id)
peft_cfg = LoraConfig(r=lora_r, lora_alpha=16, lora_dropout=0.05, task_type="CAUSAL_LM")
model = get_peft_model(base, peft_cfg)
def format_example(ex):
# classification: concatenate prompt; QA: question + context formatting
# MVP: simple "<s>[INST] ... [/INST]" style or plain text target
return {"text": ex["text"]} # adjust per task
# Tokenization & SFTTrainer; keep it simple for MVP
tr_args = TrainingArguments(output_dir=output_dir, per_device_train_batch_size=4,
gradient_accumulation_steps=4, learning_rate=lr,
max_steps=max_steps, logging_steps=50, save_steps=0)
trainer = SFTTrainer(model=model, tokenizer=tok, train_dataset=train_ds,
dataset_text_field="text", args=tr_args)
trainer.train()
# Save adapter only
trainer.save_model(output_dir)
return output_dir
```
**Acceptance criteria**
* On a tiny dataset (few hundred samples), training completes and saves an adapter folder.
---
## 10) Evaluation — `jobs/eval.py`
### Responsibilities
* Run evaluation for the selected task using the fine‑tuned adapter.
* For **classification**: compute `loss` (optional) and `f1`.
* For **QA**: compute `exact_match` (and `f1` if you want both).
### API (sketch)
```python
import evaluate
import numpy as np
def eval_classification(model_id_or_path, test_ds):
# Use pipeline or model.generate + simple argmax classifier (MVP)
# Better: a small classification head; MVP keeps it simple.
f1 = evaluate.load("f1")
preds, refs = ..., ...
return {"f1": f1.compute(predictions=preds, references=refs)["f1"]}
def eval_qa(model_id_or_path, test_ds):
exact = evaluate.load("exact_match")
# MVP: heuristic span matching if using generative outputs;
# or reuse baseline SQuAD eval if test_ds has 'answers'.
em = exact.compute(predictions=preds, references=refs)["exact_match"]
return {"exact_match": em}
```
> **Note:** For MVP, inference can be slow. Keep test sets **small** (e.g., 500–1,000 examples) and batch where possible.
**Acceptance criteria**
* For a toy dataset, returns a metrics dict with expected keys.
---
## 11) Scaling Law — `jobs/scaling.py`
### Responsibilities
* Fit a simple power‑law over points `(size → metric)`.
* For “higher‑is‑better” metrics, convert to a pseudo‑loss (e.g., `1 - score`) during fitting if desired.
* Produce a **prediction** at a user‑defined large‑scale target (e.g., `N* = 200k` examples).
### API (sketch)
```python
import numpy as np
def fit_powerlaw(sizes, scores, higher_is_better=True):
sizes = np.asarray(sizes, float)
y = np.asarray(scores, float)
if higher_is_better:
# Fit to (1 - score) ~ b * N^{-alpha}
z = np.log(np.maximum(1e-9, 1 - y))
else:
# Direct loss scaling
z = np.log(np.maximum(1e-9, y))
x = np.log(sizes)
k, c = np.polyfit(x, z, 1) # z ≈ k*log N + c
alpha = -k; b = np.exp(c)
return {"alpha": float(alpha), "b": float(b)}
def predict_powerlaw(size, fit_params, higher_is_better=True):
alpha, b = fit_params["alpha"], fit_params["b"]
if higher_is_better:
loss_hat = b * (size ** (-alpha))
return float(1 - loss_hat)
return float(b * (size ** (-alpha)))
```
**Acceptance criteria**
* Given ≥2 points (prefer 3+), returns fit parameters and a plausible prediction.
* Combined with `utils/plotting.plot_scaling(...)`, writes a PNG with points + curve.
---
## 12) Experiment Orchestrator — `jobs/run_experiment.py`
### Responsibilities
* Parse CLI args: `--model`, `--task`, `--d0`, `--dk`, `--metrics ...`, `--sizes 10000 20000`, `--target_size 200000`, `--results_repo <id>`, `--job_id <uuid>`.
* Create working dirs: `artifacts/<job_id>/`.
* Load datasets (D₀, Dₖ), build mixtures for requested sizes.
* For each size:
1. run short **train** (adapter saved under `artifacts/<job_id>/adapters/size-<N>`),
2. run **eval** on the benchmark set → collect metrics.
* Fit **scaling** across sizes; produce:
* `metrics.json` (per‑size metrics, fit params, predicted large‑scale performance),
* `scaling.png` (plot).
* Push `artifacts/<job_id>/` to `results_repo` under `experiments/<user>/<job_id>/...` using `utils/hub.push_artifacts(...)`.
* Print a final JSON line to stdout with the artifacts path (UI can parse logs if needed).
### CLI Skeleton
```python
import argparse, json, os, uuid
from utils import hub, data, plotting
from jobs import train, eval as evalm, scaling
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", required=True)
ap.add_argument("--task", choices=["classification","qa"], required=True)
ap.add_argument("--d0", required=True)
ap.add_argument("--dk", required=True)
ap.add_argument("--metrics", nargs="+", default=["f1"])
ap.add_argument("--sizes", nargs="+", type=int, default=[10000, 20000, 40000])
ap.add_argument("--target_size", type=int, default=200000)
ap.add_argument("--results_repo", default=os.getenv("RESULTS_REPO",""))
ap.add_argument("--job_id", default=str(uuid.uuid4()))
args = ap.parse_args()
# Setup dirs
out_dir = os.path.abspath(os.path.join("artifacts", args.job_id))
os.makedirs(out_dir, exist_ok=True)
# Load datasets
d0 = data.load_dataset_normalized(args.d0, args.task)
dk = data.load_dataset_normalized(args.dk, args.task)
test = data.load_benchmark(args.d0, args.task, split="validation") # MVP: reuse D₀ val if none provided
# Build mixtures & run train/eval
per_size = []
for N in args.sizes:
mix = data.build_mixtures(d0, dk, sizes=[N])[N]
adapter_dir = os.path.join(out_dir, f"adapter_size_{N}")
train.train_peft(args.model, mix, adapter_dir, max_steps=300) # MVP: few steps
metrics = {}
if args.task == "classification":
metrics.update(evalm.eval_classification(adapter_dir, test))
else:
metrics.update(evalm.eval_qa(adapter_dir, test))
per_size.append({"size": N, "metrics": metrics})
# Fit scaling on the primary metric
key = "exact_match" if args.task == "qa" else "f1"
sizes = [r["size"] for r in per_size]
scores = [r["metrics"][key] for r in per_size]
fit = scaling.fit_powerlaw(sizes, scores, higher_is_better=True)
pred = scaling.predict_powerlaw(args.target_size, fit, higher_is_better=True)
# Write artifacts
mpath = os.path.join(out_dir, "metrics.json")
with open(mpath, "w") as f:
json.dump({"runs": per_size, "fit": fit, "prediction": { "target_size": args.target_size, key: pred }}, f, indent=2)
plotting.plot_scaling(sizes, scores, key, os.path.join(out_dir, "scaling.png"))
# Push artifacts
repo_id = hub.ensure_results_repo(os.getenv("SERVICE_HF_TOKEN"), args.results_repo)
hub.push_artifacts(repo_id, out_dir, subdir=f"experiments/{args.job_id}")
print(json.dumps({"status":"ok","artifacts_repo": repo_id, "path": f"experiments/{args.job_id}"}))
if __name__ == "__main__":
main()
```
**Acceptance criteria**
* Running with tiny toy inputs creates `artifacts/<job_id>/` + pushes to results repo.
* `metrics.json` and `scaling.png` exist and look sensible.
---
## 13) Job Submission from UI — `app.py` (continued)
### Core actions
* **Submit**: for each selected Dₖ → call `huggingface_hub.run_job(...)` with:
* `image`: CUDA‑capable (e.g., `pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel`)
* `command`: `["python","jobs/run_experiment.py", "--model", model_id, "--task", task, "--d0", d0_repo, "--dk", dk_id, "--metrics", *metrics, "--sizes", *sizes, "--target_size", str(target_size), "--results_repo", results_repo_or_empty]`
* `flavor`: `"a10g-small"`
* `timeout`: e.g., `7200` (seconds)
* `env`: `{"HF_TOKEN": user_token or SERVICE_HF_TOKEN, "SERVICE_HF_TOKEN": SERVICE_HF_TOKEN, "RESULTS_REPO": RESULTS_REPO}`
* **Poll**: keep a dict `{job_id: {dk, status, url, artifacts}}`; update via `inspect_job(job_id)`; for `completed`, set artifacts link to `hf://<results_repo>/experiments/<job_id>/`.
**Acceptance criteria**
* Submitting 2 Dₖ creates 2 jobs; both progress independently; artifacts link works.
---
## 14) Guardrails & Licensing
* **Gated models**: probe download with `hf_hub_download(model_id, filename="README.md", token=user_token)` to confirm access; if 401/403, show a clear message to accept the license on the model card.
* **Dataset licensing**: surface the `license` field from `candidates.json` next to each Dₖ; later fetch from Hub.
* **Uploads**: warn users that uploaded D₀ will be stored in a **private dataset** (repo id shown in UI); provide a “Delete my upload” note linking to the repo.
* **Resource limits**: cap sizes (`sizes=[5_000, 10_000]` for MVP), cap number of concurrent jobs per user (client‑side only for MVP).
---
## 15) Testing
### Local (CPU) sanity checks
* Use a very small subset (e.g., 200 examples) and `max_steps=10` to verify the end‑to‑end loop without a GPU.
* Mock `run_job(...)` (optional) to test UI job table logic.
### Space integration
* Create a private test Space results repo (e.g., `your-org/curation-results-test`).
* Submit a single Dₖ job and verify:
* `artifacts/` created,
* `metrics.json` contains per‑size metrics and prediction,
* `scaling.png` renders,
* artifacts are uploaded and visible from the UI link.
---
## 16) Definition of Done (DoD)
* A signed‑in user can:
1. Provide **D₀** (upload or Hub id),
2. Choose **model**, **task**, **metrics**, and ≥1 **Dₖ**,
3. Click **Run** and see a job per Dₖ with live status,
4. Open **artifacts** (plot + metrics),
5. See a **ranked table** of Dₖ by the chosen primary metric,
6. (Optional) Download `metrics.json`.
* All long work executes as **Jobs** (no HTTP timeouts).
* Artifacts persist in a results dataset or Space storage.
---
## 17) Nice‑to‑Have (post‑MVP)
* **Column mapping UI**: let users map their D₀ columns to `text/label` or `question/context/answers`.
* **Seed sweeps** and confidence intervals on scaling fit.
* **Hardware selector** and budget estimator.
* **vLLM/TGI** inference for faster eval.
* **Per‑user “My Experiments”** page (prefix `experiments/<username>/...`).
---
## 18) Task Checklist (assignable to your agent)
**A. Scaffolding**
* [ ] Add `requirements.txt`; ensure importable on the Space.
* [ ] Create folders: `catalog/`, `utils/`, `jobs/`.
**B. Catalog**
* [ ] Fill `catalog/candidates.json` (3–6 datasets), including `columns` mapping.
**C. Hub utilities (`utils/hub.py`)**
* [ ] `ensure_uploaded_dataset(...)`
* [ ] `ensure_results_repo(...)`
* [ ] `push_artifacts(...)`
**D. Data helpers (`utils/data.py`)**
* [ ] `load_dataset_normalized(...)` for classification + QA
* [ ] `build_mixtures(...)`
* [ ] `load_benchmark(...)`
**E. Plotting (`utils/plotting.py`)**
* [ ] `plot_scaling(...)`
**F. Jobs**
* [ ] `jobs/train.py` (PEFT SFT)
* [ ] `jobs/eval.py` (classification + QA)
* [ ] `jobs/scaling.py` (fit + predict)
* [ ] `jobs/run_experiment.py` (glue the above, produce artifacts, push)
**G. UI (`app.py`)**
* [ ] Build form (inputs, selectors, candidates list)
* [ ] Submit one job per Dₖ via `run_job(...)`
* [ ] Poll job status & render jobs table
* [ ] Artifacts viewer: link to results repo path
* [ ] Basic error messages (license issues, upload failures)
**H. Tests**
* [ ] Local micro‑run (CPU) with tiny sizes
* [ ] Space run on GPU flavor with one Dₖ
* [ ] Verify artifacts + plot + ranking table
---
## 19) Code Snippets to Start Implementation
### `app.py` — minimal UI skeleton (submit + poll)
```python
import os, json, time, gradio as gr
from huggingface_hub import run_job, inspect_job
from utils.hub import ensure_uploaded_dataset, ensure_results_repo
CANDIDATES = json.load(open("catalog/candidates.json"))
def submit(d0_files, d0_id, task, model, metrics, dk_list, sizes, target_size,
profile: gr.OAuthProfile | None, oauth: gr.OAuthToken | None):
user_token = getattr(oauth, "token", None)
d0_repo = ensure_uploaded_dataset(d0_files, d0_id, user_token=user_token)
results_repo = ensure_results_repo(os.getenv("SERVICE_HF_TOKEN"), os.getenv("RESULTS_REPO",""))
jobs = []
for dk in dk_list:
cmd = ["python","jobs/run_experiment.py",
"--model", model, "--task", task, "--d0", d0_repo, "--dk", dk,
"--metrics", *metrics, "--sizes", *[str(s) for s in sizes],
"--target_size", str(target_size), "--results_repo", results_repo]
job = run_job(
image="pytorch/pytorch:2.6.0-cuda12.4-cudnn9-devel",
command=cmd,
flavor="a10g-small",
timeout=7200,
env={"HF_TOKEN": user_token or os.getenv("SERVICE_HF_TOKEN"),
"SERVICE_HF_TOKEN": os.getenv("SERVICE_HF_TOKEN"),
"RESULTS_REPO": results_repo},
)
jobs.append({"id": job.id, "dk": dk, "url": job.url, "status": "queued", "artifacts": ""})
return jobs
def poll(jobs_state):
updated = []
for j in jobs_state:
info = inspect_job(j["id"])
st = info.status # "queued"/"running"/"completed"/"failed"
art = j.get("artifacts","")
# Heuristic: artifacts live in RESULTS_REPO/experiments/<job_id> (set by run_experiment.py)
if st == "completed" and not art:
art = f"{os.getenv('RESULTS_REPO','(repo)')}/experiments/{j['id']}"
updated.append({**j, "status": st, "artifacts": art})
return updated
with gr.Blocks() as demo:
prof = gr.LoginButton()
with gr.Row():
d0_files = gr.UploadButton("Upload D₀ (.csv/.jsonl/.zip)", file_count="multiple")
d0_id = gr.Textbox(label="or Hub dataset id (user/dataset)")
task = gr.Radio(choices=["classification","qa"], value="classification", label="Task")
model = gr.Dropdown(choices=["meta-llama/Llama-3.1-8B-Instruct"], label="Model")
metrics = gr.CheckboxGroup(choices=["loss","f1","exact_match"], value=["f1"], label="Metrics")
dk = gr.CheckboxGroup(choices=[c["id"] for c in CANDIDATES], label="Candidate datasets")
sizes = gr.CheckboxGroup(choices=[5000,10000,20000], value=[5000,10000], label="Mixture sizes")
target_size = gr.Number(value=200000, label="Target size for prediction")
run_btn = gr.Button("Run experiments")
jobs_state = gr.State([])
jobs_table = gr.Dataframe(headers=["id","dk","status","url","artifacts"], datatype=["str","str","str","str","str"])
run_btn.click(fn=submit,
inputs=[d0_files, d0_id, task, model, metrics, dk, sizes, target_size, gr.OAuthProfile, gr.OAuthToken],
outputs=jobs_state)
gr.Button("Refresh status").click(fn=poll, inputs=jobs_state, outputs=jobs_state)
def render_table(jobs): # render as simple rows
rows = [[j["id"], j["dk"], j["status"], j["url"], j["artifacts"]] for j in jobs]
return rows
jobs_state.change(fn=render_table, inputs=jobs_state, outputs=jobs_table)
gr.Markdown("Open artifacts in the results repo once jobs complete.")
demo.queue().launch()
```
### `utils/hub.py` — upload & results
```python
import os, uuid, tempfile, shutil
from huggingface_hub import HfApi, create_repo, upload_file, upload_folder
def ensure_uploaded_dataset(upload_files, d0_dataset_id, user_token=None):
if d0_dataset_id:
return d0_dataset_id
if not upload_files: # nothing uploaded
raise ValueError("Please upload D₀ or provide a Hub dataset id.")
api = HfApi(token=os.getenv("SERVICE_HF_TOKEN"))
repo_id = f"{os.getenv('HF_ORG','your-org')}/curation-upload-{uuid.uuid4().hex[:8]}"
create_repo(repo_id, repo_type="dataset", private=True, exist_ok=True, token=os.getenv("SERVICE_HF_TOKEN"))
with tempfile.TemporaryDirectory() as tmp:
# Gradio returns a list of tempfiles; copy them into a folder
for f in upload_files:
dst = os.path.join(tmp, os.path.basename(getattr(f,"name", "file")))
shutil.copyfile(f.name if hasattr(f,"name") else f, dst)
upload_folder(folder_path=tmp, repo_id=repo_id, repo_type="dataset", token=os.getenv("SERVICE_HF_TOKEN"))
return repo_id
def ensure_results_repo(service_token, results_repo_env):
api = HfApi(token=service_token)
if results_repo_env:
parts = results_repo_env.split("/")
if len(parts) == 2:
create_repo(results_repo_env, repo_type="dataset", private=True, exist_ok=True, token=service_token)
return results_repo_env
repo_id = f"{os.getenv('HF_ORG','your-org')}/curation-results"
create_repo(repo_id, repo_type="dataset", private=True, exist_ok=True, token=service_token)
return repo_id
def push_artifacts(repo_id, local_dir, subdir=""):
path_in_repo = subdir.strip("/")
upload_folder(folder_path=local_dir, repo_id=repo_id, repo_type="dataset",
path_in_repo=path_in_repo if path_in_repo else None,
token=os.getenv("SERVICE_HF_TOKEN"))
```