joytou's picture
init project
882ea5e
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import librosa
import numpy as np
import torch
from lightning import LightningDataModule
from torch.utils.data import DataLoader, Dataset
from fish_speech.utils import RankedLogger
logger = RankedLogger(__name__, rank_zero_only=False)
class VQGANDataset(Dataset):
def __init__(
self,
filelist: str,
sample_rate: int = 32000,
hop_length: int = 640,
slice_frames: Optional[int] = None,
):
super().__init__()
filelist = Path(filelist)
root = filelist.parent
self.files = [
root / line.strip()
for line in filelist.read_text(encoding="utf-8").splitlines()
if line.strip()
]
self.sample_rate = sample_rate
self.hop_length = hop_length
self.slice_frames = slice_frames
def __len__(self):
return len(self.files)
def get_item(self, idx):
file = self.files[idx]
audio, _ = librosa.load(file, sr=self.sample_rate, mono=True)
# Slice audio and features
if (
self.slice_frames is not None
and audio.shape[0] > self.slice_frames * self.hop_length
):
start = np.random.randint(
0, audio.shape[0] - self.slice_frames * self.hop_length
)
audio = audio[start : start + self.slice_frames * self.hop_length]
if len(audio) == 0:
return None
max_value = np.abs(audio).max()
if max_value > 1.0:
audio = audio / max_value
return {
"audio": torch.from_numpy(audio),
}
def __getitem__(self, idx):
try:
return self.get_item(idx)
except Exception as e:
import traceback
traceback.print_exc()
logger.error(f"Error loading {self.files[idx]}: {e}")
return None
@dataclass
class VQGANCollator:
def __call__(self, batch):
batch = [x for x in batch if x is not None]
audio_lengths = torch.tensor([len(x["audio"]) for x in batch])
audio_maxlen = audio_lengths.max()
# Rounds up to nearest multiple of 2 (audio_lengths)
audios = []
for x in batch:
audios.append(
torch.nn.functional.pad(x["audio"], (0, audio_maxlen - len(x["audio"])))
)
return {
"audios": torch.stack(audios),
"audio_lengths": audio_lengths,
}
class VQGANDataModule(LightningDataModule):
def __init__(
self,
train_dataset: VQGANDataset,
val_dataset: VQGANDataset,
batch_size: int = 32,
num_workers: int = 4,
val_batch_size: Optional[int] = None,
):
super().__init__()
self.train_dataset = train_dataset
self.val_dataset = val_dataset
self.batch_size = batch_size
self.val_batch_size = val_batch_size or batch_size
self.num_workers = num_workers
def train_dataloader(self):
return DataLoader(
self.train_dataset,
batch_size=self.batch_size,
collate_fn=VQGANCollator(),
num_workers=self.num_workers,
shuffle=True,
persistent_workers=True,
)
def val_dataloader(self):
return DataLoader(
self.val_dataset,
batch_size=self.val_batch_size,
collate_fn=VQGANCollator(),
num_workers=self.num_workers,
persistent_workers=True,
)
if __name__ == "__main__":
dataset = VQGANDataset("data/LibriTTS_R/vq_train_filelist.txt")
dataloader = DataLoader(
dataset, batch_size=4, shuffle=False, collate_fn=VQGANCollator()
)
for batch in dataloader:
print(batch["audios"].shape)
print(batch["features"].shape)
print(batch["audio_lengths"])
print(batch["feature_lengths"])
break