| import os |
| import json |
| import base64 |
| import argparse |
| from pathlib import Path |
| try: |
| from datasets import load_dataset |
| except ImportError: |
| import subprocess |
| import sys |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "datasets==2.19.1", "soundfile", "librosa", "huggingface_hub"]) |
| from datasets import load_dataset |
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--output", required=True) |
| args = parser.parse_args() |
|
|
| |
| out_dir = Path(args.output) |
| out_dir.mkdir(parents=True, exist_ok=True) |
| |
| results = [] |
| |
| |
| token = os.environ.get("HF_TOKEN") |
| if not token: |
| print("โ ๏ธ Warning: No HF_TOKEN found in environment. Common Voice 17.0 might fail because it is gated.", flush=True) |
|
|
| sources = [ |
| {"name": "CommonVoice", "path": "mozilla-foundation/common_voice_17_0", "config": "wo", "split": "test", "limit": 25}, |
| {"name": "FLEURS", "path": "google/fleurs", "config": "wo_sn", "split": "test", "limit": 25} |
| ] |
|
|
| for source in sources: |
| print(f"\n=> Loading {source['name']} ({source['path']} - {source['config']}) limit {source['limit']}...", flush=True) |
| try: |
| |
| ds = load_dataset(source["path"], source["config"], split=source["split"], streaming=True, token=token, trust_remote_code=True) |
| |
| count = 0 |
| for row in ds: |
| if count >= source["limit"]: |
| break |
| |
| |
| audio_array = None |
| sampling_rate = None |
| original_text = "" |
| |
| if "audio" in row and row["audio"] is not None: |
| audio_dict = row["audio"] |
| if "array" in audio_dict: |
| audio_array = audio_dict["array"] |
| sampling_rate = audio_dict.get("sampling_rate", 16000) |
| |
| if "sentence" in row: |
| original_text = row["sentence"] |
| elif "transcription" in row: |
| original_text = row["transcription"] |
| elif "text" in row: |
| original_text = row["text"] |
| elif "raw_transcription" in row: |
| original_text = row["raw_transcription"] |
|
|
| if audio_array is not None: |
| import soundfile as sf |
| from io import BytesIO |
| |
| buf = BytesIO() |
| sf.write(buf, audio_array, sampling_rate, format='WAV') |
| wav_data = buf.getvalue() |
| b64_audio = base64.b64encode(wav_data).decode('utf-8') |
| |
| results.append({ |
| "source": source["name"], |
| "original_text": original_text, |
| "audio_base64": b64_audio |
| }) |
| count += 1 |
| if count % 5 == 0: |
| print(f"Downloaded {count}/{source['limit']} from {source['name']}", flush=True) |
| |
| print(f"โ
Success for {source['name']}: {count} samples.", flush=True) |
| |
| except Exception as e: |
| print(f"โ Failed to load {source['name']}: {str(e)}", flush=True) |
|
|
| |
| out_file = out_dir / "hf_samples.json" |
| with open(out_file, "w") as f: |
| json.dump(results, f) |
| |
| print(f"\n๐ Finished fetching. Saved {len(results)} total samples to {out_file}", flush=True) |
|
|
| if __name__ == "__main__": |
| main() |
|
|