| |
| """Generate and push per-dataset metadata stubs to the ``EEGDash`` HF org. |
| |
| Lives inside the Space on purpose: the Space already vendors |
| ``dataset_summary.csv`` and hits the same live EEGDash API that |
| ``docs/source/conf.py`` uses. No rehosting of EEG data — each repo is a |
| Markdown card + a small ``eegdash.json`` pointer. |
| |
| The field-priority rules mirror ``_build_dataset_context`` in the docs |
| Sphinx config: CSV row wins when it has a value, otherwise fall back to |
| the API response. That keeps the eegdash.org dataset pages and the HF |
| stubs in lock-step — edit the CSV (or the API), both re-render the same |
| way. |
| |
| Usage:: |
| |
| # Dry-run: write one stub README to /tmp/stub_preview/ |
| python scripts/push_metadata_stubs.py --dataset ds002718 --dry-run |
| |
| # Push a single stub |
| python scripts/push_metadata_stubs.py --dataset ds002718 |
| |
| # Push every row in the CSV, skipping repos that already exist |
| python scripts/push_metadata_stubs.py --all --skip-existing |
| |
| # Sample 10 for a smoke test |
| python scripts/push_metadata_stubs.py --all --limit 10 |
| |
| Requires ``huggingface-cli login`` (or ``HF_TOKEN`` env var) when pushing. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import ast |
| import concurrent.futures |
| import json |
| import logging |
| import os |
| import sys |
| import tempfile |
| import threading |
| import time |
| import urllib.error |
| import urllib.request |
| from pathlib import Path |
| from typing import Any, Iterable |
|
|
| import pandas as pd |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| CSV_PATH = ROOT / "dataset_summary.csv" |
| HF_ORG = "EEGDash" |
| EEGDASH_API = "https://data.eegdash.org/api/eegdash" |
| CATALOG_SPACE = f"https://huggingface.co/spaces/{HF_ORG}/catalog" |
| EEGDASH_URL = "https://eegdash.org" |
| GITHUB_URL = "https://github.com/eegdash/EEGDash" |
|
|
| logger = logging.getLogger("push_metadata_stubs") |
|
|
|
|
| |
| |
| |
| |
|
|
|
|
| def _clean_value(value: Any) -> str: |
| if value is None: |
| return "" |
| s = str(value).strip() |
| if not s or s.lower() in {"nan", "none", "null", "n/a", "—", "-"}: |
| return "" |
| return s |
|
|
|
|
| def _normalize_list(value: Any) -> list[str]: |
| if not value: |
| return [] |
| if isinstance(value, list): |
| return [str(v).strip() for v in value if str(v).strip()] |
| if isinstance(value, str): |
| cleaned = value.strip() |
| if cleaned.startswith("[") and cleaned.endswith("]"): |
| try: |
| parsed = ast.literal_eval(cleaned) |
| if isinstance(parsed, (list, tuple)): |
| return [str(v).strip() for v in parsed if str(v).strip()] |
| except (ValueError, SyntaxError): |
| pass |
| return [cleaned] |
| return [str(value).strip()] |
|
|
|
|
| def _format_hours(cell: Any) -> str: |
| s = _clean_value(cell) |
| if not s: |
| return "" |
| try: |
| h = float(s) |
| except ValueError: |
| return s |
| return f"{h:,.1f}" |
|
|
|
|
| def _format_stat_counts(cell: Any) -> str: |
| """Render a ``[{val, count}, ...]`` JSON cell as ``"val (×count)"``. |
| |
| Matches the helper of the same name in ``docs/source/conf.py`` so |
| sampling rate / channel count rows look identical on eegdash.org and |
| on HF. |
| """ |
| s = _clean_value(cell) |
| if not s: |
| return "" |
| try: |
| parsed = json.loads(s) |
| except json.JSONDecodeError: |
| try: |
| parsed = ast.literal_eval(s) |
| except (ValueError, SyntaxError): |
| return s |
| if not isinstance(parsed, list) or not parsed: |
| return "" |
| entries = [] |
| for row in parsed: |
| if not isinstance(row, dict): |
| continue |
| val = row.get("val") |
| count = row.get("count") |
| if val is None: |
| continue |
| if isinstance(val, float) and val.is_integer(): |
| val = int(val) |
| if count in (None, "", 0): |
| entries.append(str(val)) |
| else: |
| entries.append(f"{val} (×{count})") |
| return ", ".join(entries) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _fetch_api_summary(dataset_id: str, timeout: float = 10.0) -> dict[str, Any]: |
| variants = [dataset_id] |
| if dataset_id.startswith("ds"): |
| variants.append(dataset_id.lower()) |
| elif dataset_id.lower().startswith("eeg2025r"): |
| variants.append(f"EEG2025r{dataset_id.lower().replace('eeg2025r', '')}") |
|
|
| for vid in variants: |
| url = f"{EEGDASH_API}/datasets/summary/{vid}" |
| try: |
| with urllib.request.urlopen(url, timeout=timeout) as resp: |
| data = json.loads(resp.read().decode("utf-8")) |
| except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: |
| logger.debug("API %s failed: %s", vid, exc) |
| continue |
| if data.get("success"): |
| return data.get("data") or {} |
| return {} |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _parse_canonical_names(cell: Any) -> list[str]: |
| """Match eegdash.dataset.registry._parse_canonical_names output. |
| |
| The CSV ships canonical aliases as a JSON array string; some rows are |
| empty, some hold a list of strings. Returns a clean list of valid |
| Python identifiers so the rendered aliases match the ones the runtime |
| registry would register. |
| """ |
| s = _clean_value(cell) |
| if not s: |
| return [] |
| try: |
| parsed = json.loads(s) |
| except json.JSONDecodeError: |
| try: |
| parsed = ast.literal_eval(s) |
| except (ValueError, SyntaxError): |
| return [] |
| if not isinstance(parsed, (list, tuple)): |
| return [] |
| out: list[str] = [] |
| for name in parsed: |
| n = str(name).strip() |
| if n and n.isidentifier(): |
| out.append(n) |
| return out |
|
|
|
|
| def _build_context(row: pd.Series) -> dict[str, Any]: |
| dataset_id = _clean_value(row.get("dataset")).lower() |
| api = _fetch_api_summary(dataset_id) |
|
|
| def pick(row_key: str, api_key: str = "") -> str: |
| v = _clean_value(row.get(row_key)) |
| if v and v != "0": |
| return v |
| if api_key: |
| return _clean_value(api.get(api_key)) |
| return "" |
|
|
| title = _clean_value(row.get("dataset_title")) or _clean_value( |
| api.get("computed_title") or api.get("name") |
| ) |
| doi_raw = _clean_value(row.get("doi")) or _clean_value(api.get("dataset_doi")) |
| doi = doi_raw[4:].strip() if doi_raw.lower().startswith("doi:") else doi_raw |
| paper_doi_raw = _clean_value(api.get("associated_paper_doi")) |
| paper_doi = ( |
| paper_doi_raw[4:].strip() |
| if paper_doi_raw.lower().startswith("doi:") |
| else paper_doi_raw |
| ) |
| license_ = _clean_value(row.get("license")) or _clean_value(api.get("license")) |
| authors = _normalize_list(api.get("authors")) |
| source = _clean_value(row.get("source")) or "OpenNeuro" |
|
|
| ts = api.get("timestamps") or {} |
| year = "" |
| created = ts.get("dataset_created_at") or "" |
| if isinstance(created, str) and len(created) >= 4: |
| year = created[:4] |
|
|
| |
| |
| canonical_names = _parse_canonical_names(row.get("canonical_name")) |
| if not canonical_names: |
| raw = api.get("canonical_name") |
| if isinstance(raw, list): |
| canonical_names = [ |
| str(n).strip() |
| for n in raw |
| if isinstance(n, str) and str(n).strip().isidentifier() |
| ] |
|
|
| |
| dur_h = _format_hours(row.get("duration_hours_total")) |
| if not dur_h: |
| sec = _clean_value(api.get("total_duration_s")) |
| if sec: |
| try: |
| dur_h = f"{float(sec) / 3600:,.1f}" |
| except ValueError: |
| dur_h = "" |
|
|
| demographics = api.get("demographics") or {} |
| storage = api.get("storage") or {} |
| external = api.get("external_links") or {} |
| api_tags = api.get("tags") or {} |
|
|
| return { |
| "dataset_id": dataset_id, |
| "title": title or dataset_id, |
| "author_year": _clean_value(row.get("author_year")), |
| "canonical_names": canonical_names, |
| "authors": authors, |
| "senior_author": _clean_value(api.get("senior_author")), |
| "contact_info": _normalize_list(api.get("contact_info")), |
| "contributing_labs": _normalize_list(api.get("contributing_labs")), |
| "year": year, |
| "license": license_ or "Unknown", |
| "doi": doi, |
| "paper_doi": paper_doi, |
| "source": source, |
| "openneuro_url": f"https://openneuro.org/datasets/{dataset_id}", |
| "nemar_url": f"https://nemar.org/dataexplorer/detail?dataset_id={dataset_id}", |
| "source_url": _clean_value(api.get("source_url")) or _clean_value(external.get("source_url")), |
| "osf_url": _clean_value(external.get("osf_url")), |
| "github_url": _clean_value(external.get("github_url")), |
| "record_modality": _clean_value(row.get("record_modality")), |
| "modality_exp": _clean_value(row.get("modality of exp")) or _clean_value(api_tags.get("modality")), |
| "type_exp": _clean_value(row.get("type of exp")) or _clean_value(api_tags.get("type")), |
| "pathology": _clean_value(row.get("Type Subject")) or _clean_value(api_tags.get("pathology")), |
| "tasks_list": _normalize_list(api.get("tasks")), |
| "n_subjects": pick("n_subjects", "n_subjects") or str(_clean_value(demographics.get("subjects_count")) or ""), |
| "n_records": pick("n_records", "total_files"), |
| "n_tasks": pick("n_tasks", "n_tasks"), |
| "n_channels": _format_stat_counts(row.get("nchans_set")) or _format_stat_counts(api.get("nchans_counts")), |
| "sampling_freqs": _format_stat_counts(row.get("sampling_freqs")) or _format_stat_counts(api.get("sfreq_counts")), |
| "size": _clean_value(row.get("size")), |
| "size_bytes": _clean_value(api.get("size_bytes")), |
| "duration_hours_total": dur_h, |
| "bids_version": _clean_value(api.get("bids_version")), |
| "age_min": _clean_value(demographics.get("age_min")), |
| "age_max": _clean_value(demographics.get("age_max")), |
| "age_mean": _clean_value(demographics.get("age_mean")), |
| "sessions": _normalize_list(api.get("sessions")), |
| "study_design": _clean_value(api.get("study_design")), |
| "study_domain": _clean_value(api.get("study_domain")), |
| "experimental_modalities": _normalize_list(api.get("experimental_modalities")), |
| "datatypes": _normalize_list(api.get("datatypes")), |
| "funding": _normalize_list(api.get("funding")), |
| "references": _normalize_list(api.get("references")), |
| "how_to_acknowledge": _clean_value(api.get("how_to_acknowledge")), |
| "readme": _clean_value(api.get("readme")), |
| "nemar_citations": _clean_value(api.get("nemar_citation_count")) or _clean_value(row.get("nemar_citation_count")), |
| "storage_backend": _clean_value(storage.get("backend")), |
| "storage_base": _clean_value(storage.get("base")), |
| "digested_at": _clean_value(ts.get("digested_at")), |
| "stats_computed_at": _clean_value(api.get("stats_computed_at")), |
| } |
|
|
|
|
| |
| |
| |
|
|
|
|
| HF_LICENSE_MAP = { |
| |
| "cc0": "cc0-1.0", |
| "cc0-1.0": "cc0-1.0", |
| "cc-by-4.0": "cc-by-4.0", |
| "cc-by-sa-4.0": "cc-by-sa-4.0", |
| "cc-by-nc-4.0": "cc-by-nc-4.0", |
| "cc-by-nc-sa-4.0": "cc-by-nc-sa-4.0", |
| "mit": "mit", |
| "apache-2.0": "apache-2.0", |
| "bsd-3-clause": "bsd-3-clause", |
| } |
|
|
|
|
| def _hf_license(raw: str) -> str: |
| norm = raw.lower().replace("_", "-").replace(" ", "-").strip() |
| for key, val in HF_LICENSE_MAP.items(): |
| if key in norm: |
| return val |
| return "other" |
|
|
|
|
| def _size_category(n_records: str) -> str: |
| try: |
| n = int(n_records) |
| except (TypeError, ValueError): |
| return "unknown" |
| if n < 10: |
| return "n<1K" |
| if n < 1_000: |
| return "n<1K" |
| if n < 10_000: |
| return "1K<n<10K" |
| return "10K<n<100K" |
|
|
|
|
| def _escape_yaml(s: str) -> str: |
| """Quote a YAML string value safely. Assumes the content is plain text.""" |
| return '"' + s.replace("\\", "\\\\").replace('"', '\\"') + '"' |
|
|
|
|
| def _sanitize_upstream_readme(text: str) -> str: |
| """Defuse markers that could confuse HF's frontmatter parser. |
| |
| An upstream README that happens to start a line with ``---`` on its |
| own renders fine in the body of a Markdown doc, but trailing YAML |
| blocks at the top of a mixed document can trip some parsers. We also |
| strip ingested-time pollution ("Introduction:" header styling etc. |
| stays intact — only raw markers get touched). |
| """ |
| out_lines: list[str] = [] |
| for ln in text.splitlines(): |
| if ln.strip() == "---": |
| out_lines.append("***") |
| else: |
| out_lines.append(ln) |
| return "\n".join(out_lines).strip() |
|
|
|
|
| def _render_readme(ctx: dict[str, Any]) -> str: |
| |
|
|
| tags = ["neuroscience", "eegdash", "brain-computer-interface", "pytorch"] |
| rm = ctx["record_modality"].lower() |
| if rm in {"eeg", "meg", "ieeg"}: |
| tags.insert(0, rm) |
| else: |
| tags.insert(0, "eeg") |
| if ctx["modality_exp"]: |
| tags.append(ctx["modality_exp"].lower().replace(" ", "-")) |
| if ctx["type_exp"]: |
| tags.append(ctx["type_exp"].lower().replace(" ", "-").replace("/", "-")) |
| if ctx["pathology"] and ctx["pathology"].lower() not in {"unknown", "healthy"}: |
| tags.append(ctx["pathology"].lower().replace(" ", "-").replace("/", "-")) |
| for t in ctx["tasks_list"][:5]: |
| slug = t.lower().replace("_", "-").replace(" ", "-") |
| if slug and slug not in tags: |
| tags.append(slug) |
| |
| tags = list(dict.fromkeys(tags)) |
|
|
| license_slug = _hf_license(ctx["license"]) |
| size_cat = _size_category(ctx["n_records"]) |
|
|
| yaml_parts = ["---"] |
| yaml_parts.append(f"pretty_name: {_escape_yaml(ctx['title'] or ctx['dataset_id'])}") |
| yaml_parts.append(f"license: {license_slug}") |
| yaml_parts.append("tags:") |
| for t in tags: |
| yaml_parts.append(f" - {t}") |
| yaml_parts.append("size_categories:") |
| yaml_parts.append(f" - {size_cat}") |
| if ctx["record_modality"]: |
| yaml_parts.append("task_categories:") |
| yaml_parts.append(" - other") |
| if ctx["authors"]: |
| yaml_parts.append("authors:") |
| for a in ctx["authors"][:12]: |
| yaml_parts.append(f" - {_escape_yaml(a)}") |
| yaml_parts.append("---") |
| frontmatter = "\n".join(yaml_parts) |
|
|
| |
|
|
| hero_title = ctx["title"] or ctx["dataset_id"] |
| attribution = "" |
| if ctx["author_year"]: |
| attribution = ctx["author_year"] |
| elif ctx["authors"]: |
| head = ctx["authors"][0] |
| extra = " et al." if len(ctx["authors"]) > 1 else "" |
| attribution = head + extra + (f" ({ctx['year']})" if ctx["year"] else "") |
| alias_line = "" |
| if ctx["canonical_names"]: |
| joined = " · ".join(f"`{n}`" for n in ctx["canonical_names"]) |
| alias_line = f"**Canonical aliases:** {joined}" |
| hero_bits = [f"# {hero_title}", f"**Dataset ID:** `{ctx['dataset_id']}`"] |
| if attribution: |
| hero_bits.append(f"_{attribution}_") |
| if alias_line: |
| hero_bits.append(alias_line) |
| hero = "\n\n".join(hero_bits) |
|
|
| |
|
|
| tl_bits = [] |
| if ctx["record_modality"]: |
| tl_bits.append(ctx["record_modality"].upper()) |
| if ctx["modality_exp"] and ctx["type_exp"]: |
| tl_bits.append(f"{ctx['modality_exp']} {ctx['type_exp'].lower()}") |
| elif ctx["modality_exp"]: |
| tl_bits.append(ctx["modality_exp"]) |
| if ctx["pathology"]: |
| tl_bits.append(ctx["pathology"].lower()) |
| if ctx["n_subjects"]: |
| tl_bits.append(f"{ctx['n_subjects']} subjects") |
| if ctx["n_records"]: |
| tl_bits.append(f"{ctx['n_records']} recordings") |
| if ctx["license"]: |
| tl_bits.append(ctx["license"]) |
| tldr = "> **At a glance:** " + " · ".join(tl_bits) if tl_bits else "" |
|
|
| |
|
|
| aliases_hint = "" |
| if ctx["canonical_names"]: |
| a0 = ctx["canonical_names"][0] |
| aliases_hint = ( |
| f"\nYou can also load it by canonical alias — these are registered " |
| f"classes in `eegdash.dataset`:\n\n" |
| f"```python\n" |
| f"from eegdash.dataset import {a0}\n" |
| f"ds = {a0}(cache_dir=\"./cache\")\n" |
| f"```\n" |
| ) |
|
|
| load_block = f"""## Load this dataset |
| |
| This repo is a **pointer**. The raw EEG data lives at its canonical source |
| (OpenNeuro / NEMAR); [EEGDash](https://github.com/eegdash/EEGDash) streams it |
| on demand and returns a PyTorch / braindecode dataset. |
| |
| ```python |
| # pip install eegdash |
| from eegdash import EEGDashDataset |
| |
| ds = EEGDashDataset(dataset="{ctx['dataset_id']}", cache_dir="./cache") |
| print(len(ds), "recordings") |
| ``` |
| {aliases_hint} |
| If the dataset has been mirrored to the HF Hub in braindecode's Zarr layout, |
| you can also pull it directly: |
| |
| ```python |
| from braindecode.datasets import BaseConcatDataset |
| ds = BaseConcatDataset.pull_from_hub("{HF_ORG}/{ctx['dataset_id']}") |
| ``` |
| """ |
|
|
| |
|
|
| age_str = "" |
| if ctx["age_min"] or ctx["age_max"] or ctx["age_mean"]: |
| parts = [] |
| if ctx["age_min"] and ctx["age_max"]: |
| parts.append(f"{ctx['age_min']}–{ctx['age_max']} yrs") |
| if ctx["age_mean"]: |
| try: |
| parts.append(f"mean {float(ctx['age_mean']):.1f}") |
| except ValueError: |
| parts.append(f"mean {ctx['age_mean']}") |
| age_str = ", ".join(parts) |
|
|
| rows = [ |
| ("Subjects", ctx["n_subjects"]), |
| ("Age range", age_str), |
| ("Recordings", ctx["n_records"]), |
| ("Tasks (count)", ctx["n_tasks"]), |
| ("Sessions", str(len(ctx["sessions"])) if ctx["sessions"] else ""), |
| ("Channels", ctx["n_channels"]), |
| ("Sampling rate (Hz)", ctx["sampling_freqs"]), |
| ("Total duration (h)", ctx["duration_hours_total"]), |
| ("Size on disk", ctx["size"]), |
| ("Recording type", ctx["record_modality"].upper() if ctx["record_modality"] else ""), |
| ("Experimental modality", ctx["modality_exp"]), |
| ("Paradigm type", ctx["type_exp"]), |
| ("Population", ctx["pathology"]), |
| ("Study design", ctx["study_design"]), |
| ("Study domain", ctx["study_domain"]), |
| ("BIDS version", ctx["bids_version"]), |
| ("Source", ctx["source"]), |
| ("License", ctx["license"]), |
| ("NEMAR citations", ctx["nemar_citations"]), |
| ] |
| md_rows = "\n".join( |
| f"| **{k}** | {v} |" for k, v in rows if str(v or "").strip() |
| ) |
| meta_table = "## Dataset metadata\n\n| | |\n|---|---|\n" + md_rows |
|
|
| |
|
|
| tasks_block = "" |
| if ctx["tasks_list"]: |
| items = "\n".join(f"- `{t}`" for t in ctx["tasks_list"]) |
| tasks_block = f"## Tasks\n\n{items}\n" |
|
|
| |
|
|
| upstream_block = "" |
| if ctx["readme"]: |
| body = _sanitize_upstream_readme(ctx["readme"]) |
| upstream_block = ( |
| "## Upstream README\n\n" |
| "_Verbatim from the dataset's authors — the canonical " |
| "description._\n\n" |
| f"{body}\n" |
| ) |
|
|
| |
|
|
| people_lines = [] |
| if ctx["authors"]: |
| people_lines.append("### Authors") |
| for a in ctx["authors"]: |
| marker = " _(senior)_" if a.strip() == ctx["senior_author"].strip() else "" |
| people_lines.append(f"- {a}{marker}") |
| if ctx["contributing_labs"]: |
| people_lines.append("\n### Contributing labs") |
| for lab in ctx["contributing_labs"]: |
| people_lines.append(f"- {lab}") |
| if ctx["contact_info"]: |
| people_lines.append("\n### Contact") |
| for c in ctx["contact_info"]: |
| people_lines.append(f"- {c}") |
| people_block = "## People\n\n" + "\n".join(people_lines) if people_lines else "" |
|
|
| |
|
|
| funding_block = "" |
| if ctx["funding"]: |
| items = "\n".join(f"- {f}" for f in ctx["funding"]) |
| funding_block = f"## Funding\n\n{items}" |
|
|
| cite_block = "" |
| if ctx["how_to_acknowledge"]: |
| cite_block = ( |
| "## How to cite\n\n" |
| "Please follow the upstream dataset's citation policy:\n\n" |
| + "\n".join( |
| f"> {ln}" for ln in ctx["how_to_acknowledge"].strip().splitlines() |
| ) |
| ) |
| if ctx["references"]: |
| if cite_block: |
| cite_block += "\n\n### References\n\n" |
| else: |
| cite_block = "## References\n\n" |
| cite_block += "\n".join(f"- {r}" for r in ctx["references"]) |
|
|
| |
|
|
| links = [] |
| if ctx["doi"]: |
| links.append(f"- **DOI:** [{ctx['doi']}](https://doi.org/{ctx['doi']})") |
| if ctx["paper_doi"]: |
| links.append( |
| f"- **Associated paper:** [{ctx['paper_doi']}]" |
| f"(https://doi.org/{ctx['paper_doi']})" |
| ) |
| if ctx["source"].lower() == "openneuro": |
| links.append(f"- **OpenNeuro:** [{ctx['dataset_id']}]({ctx['openneuro_url']})") |
| if ctx["source"].lower() == "nemar": |
| links.append(f"- **NEMAR:** [{ctx['dataset_id']}]({ctx['nemar_url']})") |
| if ctx["source_url"] and ctx["source_url"] not in (ctx["openneuro_url"], ctx["nemar_url"]): |
| links.append(f"- **Source:** <{ctx['source_url']}>") |
| if ctx["osf_url"]: |
| links.append(f"- **OSF:** <{ctx['osf_url']}>") |
| if ctx["github_url"]: |
| links.append(f"- **GitHub:** <{ctx['github_url']}>") |
| links.append(f"- **Browse 700+ datasets:** [EEGDash catalog]({CATALOG_SPACE})") |
| links.append(f"- **Docs:** <{EEGDASH_URL}>") |
| links.append(f"- **Code:** <{GITHUB_URL}>") |
| links_block = "## Links\n\n" + "\n".join(links) |
|
|
| |
|
|
| prov_lines = [] |
| if ctx["storage_backend"] and ctx["storage_base"]: |
| prov_lines.append( |
| f"- **Backend:** `{ctx['storage_backend']}` — " |
| f"`{ctx['storage_base']}`" |
| ) |
| elif ctx["storage_backend"]: |
| prov_lines.append(f"- **Backend:** `{ctx['storage_backend']}`") |
| if ctx["size_bytes"]: |
| try: |
| sb = float(ctx["size_bytes"]) |
| prov_lines.append(f"- **Exact size:** {int(sb):,} bytes ({ctx['size']})") |
| except ValueError: |
| pass |
| if ctx["digested_at"]: |
| prov_lines.append(f"- **Ingested:** {ctx['digested_at'][:10]}") |
| if ctx["stats_computed_at"]: |
| prov_lines.append( |
| f"- **Stats computed:** {ctx['stats_computed_at'][:10]}" |
| ) |
| prov_block = "## Provenance\n\n" + "\n".join(prov_lines) if prov_lines else "" |
|
|
| |
|
|
| footer = ( |
| f"---\n\n" |
| f"_Auto-generated from " |
| f"[dataset_summary.csv]({GITHUB_URL}/blob/main/eegdash/dataset/dataset_summary.csv) " |
| f"and the [EEGDash API]({EEGDASH_API}/datasets/summary/{ctx['dataset_id']}). " |
| f"Do not edit this file by hand — update the upstream source and " |
| f"re-run `scripts/push_metadata_stubs.py`._" |
| ) |
|
|
| sections = [ |
| frontmatter, |
| hero, |
| tldr, |
| load_block, |
| meta_table, |
| tasks_block, |
| upstream_block, |
| people_block, |
| funding_block, |
| cite_block, |
| links_block, |
| prov_block, |
| footer, |
| ] |
| return "\n\n".join(s for s in sections if s).strip() + "\n" |
|
|
|
|
| def _render_pointer(ctx: dict[str, Any]) -> str: |
| """Small machine-readable sibling — the same fields the web catalog uses.""" |
| return json.dumps( |
| { |
| "dataset_id": ctx["dataset_id"], |
| "title": ctx["title"], |
| "source": ctx["source"], |
| "source_url": ctx["source_url"] or ctx["openneuro_url"] or ctx["nemar_url"], |
| "doi": ctx["doi"], |
| "license": ctx["license"], |
| "loader": { |
| "library": "eegdash", |
| "class": "EEGDashDataset", |
| "kwargs": {"dataset": ctx["dataset_id"]}, |
| }, |
| "catalog": CATALOG_SPACE, |
| "generated_by": "huggingface-space/scripts/push_metadata_stubs.py", |
| }, |
| indent=2, |
| ensure_ascii=False, |
| ) + "\n" |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _iter_slugs(df: pd.DataFrame, args: argparse.Namespace) -> Iterable[pd.Series]: |
| if args.dataset: |
| wanted = {s.lower() for s in args.dataset} |
| yield from (r for _, r in df.iterrows() if str(r["dataset"]).lower() in wanted) |
| return |
| if args.all: |
| it = df.iterrows() |
| if args.limit: |
| it = list(df.head(args.limit).iterrows()) |
| for _, r in it: |
| yield r |
| return |
| raise SystemExit("Pass --dataset <slug> [...] or --all") |
|
|
|
|
| def _push_one(ctx: dict[str, Any], args: argparse.Namespace) -> str: |
| from huggingface_hub import HfApi |
|
|
| api = HfApi(token=args.token) |
| repo_id = f"{HF_ORG}/{ctx['dataset_id']}" |
| api.create_repo( |
| repo_id=repo_id, |
| repo_type="dataset", |
| exist_ok=True, |
| private=args.private, |
| ) |
| with tempfile.TemporaryDirectory() as tmp: |
| readme = Path(tmp) / "README.md" |
| pointer = Path(tmp) / "eegdash.json" |
| readme.write_text(_render_readme(ctx), encoding="utf-8") |
| pointer.write_text(_render_pointer(ctx), encoding="utf-8") |
| api.upload_folder( |
| repo_id=repo_id, |
| folder_path=tmp, |
| repo_type="dataset", |
| commit_message=f"Metadata stub for {ctx['dataset_id']}", |
| ) |
| return repo_id |
|
|
|
|
| def main(argv: list[str] | None = None) -> int: |
| parser = argparse.ArgumentParser( |
| description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter |
| ) |
| parser.add_argument("--dataset", nargs="+", help="One or more slugs.") |
| parser.add_argument("--all", action="store_true", help="Every row in the CSV.") |
| parser.add_argument("--limit", type=int, default=0, help="Cap --all to N rows.") |
| parser.add_argument("--skip-existing", action="store_true") |
| parser.add_argument( |
| "--dry-run", |
| action="store_true", |
| help="Write one stub README + pointer to a temp dir, no push.", |
| ) |
| parser.add_argument("--dry-run-out", type=Path, default=Path("/tmp/stub_preview")) |
| parser.add_argument("--private", action="store_true") |
| parser.add_argument("--token", default=os.environ.get("HF_TOKEN")) |
| parser.add_argument( |
| "--workers", |
| type=int, |
| default=1, |
| help="Parallel pushes (IO-bound — 8-16 is safe; higher risks rate-limits).", |
| ) |
| parser.add_argument("-v", "--verbose", action="count", default=0) |
| args = parser.parse_args(argv) |
|
|
| logging.basicConfig( |
| level=logging.DEBUG if args.verbose else logging.INFO, |
| format="%(asctime)s %(levelname)s %(name)s — %(message)s", |
| ) |
|
|
| df = pd.read_csv(CSV_PATH) |
| rows = list(_iter_slugs(df, args)) |
| if not rows: |
| raise SystemExit("No rows matched the given slugs.") |
|
|
| existing: set[str] = set() |
| if args.skip_existing and not args.dry_run: |
| from huggingface_hub import HfApi |
|
|
| existing = { |
| r.id.split("/", 1)[-1] |
| for r in HfApi().list_datasets(author=HF_ORG, limit=2000) |
| } |
|
|
| if args.dry_run: |
| args.dry_run_out.mkdir(parents=True, exist_ok=True) |
| for r in rows[:3]: |
| ctx = _build_context(r) |
| (args.dry_run_out / f"{ctx['dataset_id']}_README.md").write_text( |
| _render_readme(ctx), encoding="utf-8" |
| ) |
| (args.dry_run_out / f"{ctx['dataset_id']}_eegdash.json").write_text( |
| _render_pointer(ctx), encoding="utf-8" |
| ) |
| logger.info("Wrote dry-run preview for %s", ctx["dataset_id"]) |
| logger.info("Dry-run output: %s", args.dry_run_out) |
| return 0 |
|
|
| pending = [r for r in rows if str(r["dataset"]).lower() not in existing] |
| for r in rows: |
| slug = str(r["dataset"]).lower() |
| if slug in existing: |
| logger.info("skipping %s (exists)", slug) |
|
|
| failed: list[tuple[str, str]] = [] |
| done = 0 |
| done_lock = threading.Lock() |
|
|
| def _one(r: pd.Series) -> tuple[str, Exception | None]: |
| slug = str(r["dataset"]).lower() |
| try: |
| ctx = _build_context(r) |
| _push_one(ctx, args) |
| return slug, None |
| except Exception as exc: |
| return slug, exc |
|
|
| if args.workers and args.workers > 1: |
| logger.info( |
| "parallel push: %d workers, %d pending", args.workers, len(pending) |
| ) |
| with concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) as pool: |
| futures = {pool.submit(_one, r): r for r in pending} |
| for fut in concurrent.futures.as_completed(futures): |
| slug, err = fut.result() |
| if err is None: |
| with done_lock: |
| done += 1 |
| logger.info("pushed EEGDash/%s (%d/%d)", slug, done, len(pending)) |
| else: |
| logger.exception("failed %s", slug, exc_info=err) |
| failed.append((slug, str(err))) |
| else: |
| for r in pending: |
| slug, err = _one(r) |
| if err is None: |
| done += 1 |
| logger.info("pushed EEGDash/%s (%d/%d)", slug, done, len(pending)) |
| else: |
| logger.exception("failed %s", slug, exc_info=err) |
| failed.append((slug, str(err))) |
| |
| time.sleep(0.15) |
|
|
| if failed: |
| logger.error("%d failures:", len(failed)) |
| for slug, err in failed: |
| logger.error(" %s — %s", slug, err) |
| return 1 |
| logger.info("done — %d stubs processed (%d skipped)", done, len(existing)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|