# Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import csv from pathlib import Path import zipfile from functools import reduce from multiprocessing import cpu_count from typing import Any, Dict, List, Optional, Union import io import numpy as np import pandas as pd import sentencepiece as sp from fairseq.data.audio.audio_utils import ( convert_waveform, _get_kaldi_fbank, _get_torchaudio_fbank, is_npy_data, is_sf_audio_data ) import torch import soundfile as sf from tqdm import tqdm UNK_TOKEN, UNK_TOKEN_ID = "", 3 BOS_TOKEN, BOS_TOKEN_ID = "", 0 EOS_TOKEN, EOS_TOKEN_ID = "", 2 PAD_TOKEN, PAD_TOKEN_ID = "", 1 def gen_vocab( input_path: Path, output_path_prefix: Path, model_type="bpe", vocab_size=1000, special_symbols: Optional[List[str]] = None ): # Train SentencePiece Model arguments = [ f"--input={input_path.as_posix()}", f"--model_prefix={output_path_prefix.as_posix()}", f"--model_type={model_type}", f"--vocab_size={vocab_size}", "--character_coverage=1.0", f"--num_threads={cpu_count()}", f"--unk_id={UNK_TOKEN_ID}", f"--bos_id={BOS_TOKEN_ID}", f"--eos_id={EOS_TOKEN_ID}", f"--pad_id={PAD_TOKEN_ID}", ] if special_symbols is not None: _special_symbols = ",".join(special_symbols) arguments.append(f"--user_defined_symbols={_special_symbols}") sp.SentencePieceTrainer.Train(" ".join(arguments)) # Export fairseq dictionary spm = sp.SentencePieceProcessor() spm.Load(output_path_prefix.as_posix() + ".model") vocab = {i: spm.IdToPiece(i) for i in range(spm.GetPieceSize())} assert ( vocab.get(UNK_TOKEN_ID) == UNK_TOKEN and vocab.get(PAD_TOKEN_ID) == PAD_TOKEN and vocab.get(BOS_TOKEN_ID) == BOS_TOKEN and vocab.get(EOS_TOKEN_ID) == EOS_TOKEN ) vocab = { i: s for i, s in vocab.items() if s not in {UNK_TOKEN, BOS_TOKEN, EOS_TOKEN, PAD_TOKEN} } with open(output_path_prefix.as_posix() + ".txt", "w") as f_out: for _, s in sorted(vocab.items(), key=lambda x: x[0]): f_out.write(f"{s} 1\n") def extract_fbank_features( waveform: torch.FloatTensor, sample_rate: int, output_path: Optional[Path] = None, n_mel_bins: int = 80, overwrite: bool = False, ): if output_path is not None and output_path.is_file() and not overwrite: return _waveform = convert_waveform(waveform, sample_rate, to_mono=True) # Kaldi compliance: 16-bit signed integers _waveform = _waveform * (2 ** 15) _waveform = _waveform[0].numpy() features = _get_kaldi_fbank(_waveform, sample_rate, n_mel_bins) if features is None: features = _get_torchaudio_fbank(_waveform, sample_rate, n_mel_bins) if features is None: raise ImportError( "Please install pyKaldi or torchaudio to enable fbank feature extraction" ) if output_path is not None: np.save(output_path.as_posix(), features) return features def create_zip(data_root: Path, zip_path: Path): paths = list(data_root.glob("*.npy")) with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_STORED) as f: for path in tqdm(paths): f.write(path, arcname=path.name) def get_zip_manifest( zip_path: Path, zip_root: Optional[Path] = None, is_audio=False ): _zip_path = Path.joinpath(zip_root or Path(""), zip_path) with zipfile.ZipFile(_zip_path, mode="r") as f: info = f.infolist() paths, lengths = {}, {} for i in tqdm(info): utt_id = Path(i.filename).stem offset, file_size = i.header_offset + 30 + len(i.filename), i.file_size paths[utt_id] = f"{zip_path.as_posix()}:{offset}:{file_size}" with open(_zip_path, "rb") as f: f.seek(offset) byte_data = f.read(file_size) assert len(byte_data) > 1 if is_audio: assert is_sf_audio_data(byte_data), i else: assert is_npy_data(byte_data), i byte_data_fp = io.BytesIO(byte_data) if is_audio: lengths[utt_id] = sf.info(byte_data_fp).frames else: lengths[utt_id] = np.load(byte_data_fp).shape[0] return paths, lengths def gen_config_yaml( manifest_root: Path, spm_filename: Optional[str] = None, vocab_name: Optional[str] = None, yaml_filename: str = "config.yaml", specaugment_policy: Optional[str] = "lb", prepend_tgt_lang_tag: bool = False, sampling_alpha: Optional[float] = None, input_channels: Optional[int] = 1, input_feat_per_channel: Optional[int] = 80, audio_root: str = "", cmvn_type: str = "utterance", gcmvn_path: Optional[Path] = None, extra=None ): manifest_root = manifest_root.absolute() writer = S2TDataConfigWriter(manifest_root / yaml_filename) assert spm_filename is not None or vocab_name is not None vocab_name = spm_filename.replace(".model", ".txt") if vocab_name is None \ else vocab_name writer.set_vocab_filename(vocab_name) if input_channels is not None: writer.set_input_channels(input_channels) if input_feat_per_channel is not None: writer.set_input_feat_per_channel(input_feat_per_channel) specaugment_setters = { "lb": writer.set_specaugment_lb_policy, "ld": writer.set_specaugment_ld_policy, "sm": writer.set_specaugment_sm_policy, "ss": writer.set_specaugment_ss_policy, } specaugment_setter = specaugment_setters.get(specaugment_policy, None) if specaugment_setter is not None: specaugment_setter() if spm_filename is not None: writer.set_bpe_tokenizer( { "bpe": "sentencepiece", "sentencepiece_model": (manifest_root / spm_filename).as_posix(), } ) if prepend_tgt_lang_tag: writer.set_prepend_tgt_lang_tag(True) if sampling_alpha is not None: writer.set_sampling_alpha(sampling_alpha) if cmvn_type not in ["global", "utterance"]: raise NotImplementedError if specaugment_policy is not None: writer.set_feature_transforms( "_train", [f"{cmvn_type}_cmvn", "specaugment"] ) writer.set_feature_transforms("*", [f"{cmvn_type}_cmvn"]) if cmvn_type == "global": if gcmvn_path is None: raise ValueError("Please provide path of global cmvn file.") else: writer.set_global_cmvn(gcmvn_path.as_posix()) if len(audio_root) > 0: writer.set_audio_root(audio_root) if extra is not None: writer.set_extra(extra) writer.flush() def load_df_from_tsv(path: Union[str, Path]) -> pd.DataFrame: _path = path if isinstance(path, str) else path.as_posix() return pd.read_csv( _path, sep="\t", header=0, encoding="utf-8", escapechar="\\", quoting=csv.QUOTE_NONE, na_filter=False, ) def save_df_to_tsv(dataframe, path: Union[str, Path]): _path = path if isinstance(path, str) else path.as_posix() dataframe.to_csv( _path, sep="\t", header=True, index=False, encoding="utf-8", escapechar="\\", quoting=csv.QUOTE_NONE, ) def load_tsv_to_dicts(path: Union[str, Path]) -> List[dict]: with open(path, "r") as f: reader = csv.DictReader( f, delimiter="\t", quotechar=None, doublequote=False, lineterminator="\n", quoting=csv.QUOTE_NONE, ) rows = [dict(e) for e in reader] return rows def filter_manifest_df( df, is_train_split=False, extra_filters=None, min_n_frames=5, max_n_frames=3000 ): filters = { "no speech": df["audio"] == "", f"short speech (<{min_n_frames} frames)": df["n_frames"] < min_n_frames, "empty sentence": df["tgt_text"] == "", } if is_train_split: filters[f"long speech (>{max_n_frames} frames)"] = df["n_frames"] > max_n_frames if extra_filters is not None: filters.update(extra_filters) invalid = reduce(lambda x, y: x | y, filters.values()) valid = ~invalid print( "| " + ", ".join(f"{n}: {f.sum()}" for n, f in filters.items()) + f", total {invalid.sum()} filtered, {valid.sum()} remained." ) return df[valid] def cal_gcmvn_stats(features_list): features = np.concatenate(features_list) square_sums = (features ** 2).sum(axis=0) mean = features.mean(axis=0) features = np.subtract(features, mean) var = square_sums / features.shape[0] - mean ** 2 std = np.sqrt(np.maximum(var, 1e-8)) return {"mean": mean.astype("float32"), "std": std.astype("float32")} class S2TDataConfigWriter(object): DEFAULT_VOCAB_FILENAME = "dict.txt" DEFAULT_INPUT_FEAT_PER_CHANNEL = 80 DEFAULT_INPUT_CHANNELS = 1 def __init__(self, yaml_path: Path): try: import yaml except ImportError: print("Please install PyYAML for S2T data config YAML files") self.yaml = yaml self.yaml_path = yaml_path self.config = {} def flush(self): with open(self.yaml_path, "w") as f: self.yaml.dump(self.config, f) def set_audio_root(self, audio_root=""): self.config["audio_root"] = audio_root def set_vocab_filename(self, vocab_filename: str = "dict.txt"): self.config["vocab_filename"] = vocab_filename def set_specaugment( self, time_wrap_w: int, freq_mask_n: int, freq_mask_f: int, time_mask_n: int, time_mask_t: int, time_mask_p: float, ): self.config["specaugment"] = { "time_wrap_W": time_wrap_w, "freq_mask_N": freq_mask_n, "freq_mask_F": freq_mask_f, "time_mask_N": time_mask_n, "time_mask_T": time_mask_t, "time_mask_p": time_mask_p, } def set_specaugment_lb_policy(self): self.set_specaugment( time_wrap_w=0, freq_mask_n=1, freq_mask_f=27, time_mask_n=1, time_mask_t=100, time_mask_p=1.0, ) def set_specaugment_ld_policy(self): self.set_specaugment( time_wrap_w=0, freq_mask_n=2, freq_mask_f=27, time_mask_n=2, time_mask_t=100, time_mask_p=1.0, ) def set_specaugment_sm_policy(self): self.set_specaugment( time_wrap_w=0, freq_mask_n=2, freq_mask_f=15, time_mask_n=2, time_mask_t=70, time_mask_p=0.2, ) def set_specaugment_ss_policy(self): self.set_specaugment( time_wrap_w=0, freq_mask_n=2, freq_mask_f=27, time_mask_n=2, time_mask_t=70, time_mask_p=0.2, ) def set_input_channels(self, input_channels: int = 1): self.config["input_channels"] = input_channels def set_input_feat_per_channel(self, input_feat_per_channel: int = 80): self.config["input_feat_per_channel"] = input_feat_per_channel def set_bpe_tokenizer(self, bpe_tokenizer: Dict[str, Any]): self.config["bpe_tokenizer"] = bpe_tokenizer def set_global_cmvn(self, stats_npz_path: str): self.config["global_cmvn"] = {"stats_npz_path": stats_npz_path} def set_feature_transforms(self, split: str, transforms: List[str]): if "transforms" not in self.config: self.config["transforms"] = {} self.config["transforms"][split] = transforms def set_prepend_tgt_lang_tag(self, flag: bool = True): self.config["prepend_tgt_lang_tag"] = flag def set_sampling_alpha(self, sampling_alpha: float = 1.0): self.config["sampling_alpha"] = sampling_alpha def set_extra(self, data): self.config.update(data)