"Open

In [None]:
# %%capture
! pip install git+https://github.com/nateraw/so-vits-svc-fork@main
! pip install openai-whisper yt-dlp huggingface_hub demucs

---

# Restart runtime

After running the cell above, you'll need to restart the Colab runtime because we installed a different version of numpy.

`Runtime -> Restart runtime`

---

In [None]:
from huggingface_hub import login

login()

## Settings

In [None]:
CHARACTER = "kanye"
DO_EXTRACT_VOCALS = False
MODEL_REPO_ID = "dog/kanye"

## Data Preparation

Prepare a data.csv file here with `ytid,start,end` as the first line (they're the expected column names). Then, prepare a training set given YouTube IDs and their start and end segment times in seconds. Try to pick segments that have dry vocal only, as that'll provide the best results.

An example is given below for Kanye West.

In [None]:
%%writefile data.csv
ytid,start,end
lkK4de9nbzQ,0,137
gXU9Am2Seo0,30,69
gXU9Am2Seo0,94,135
iVgrhWvQpqU,0,55
iVgrhWvQpqU,58,110
UIV-q-gneKA,85,99
UIV-q-gneKA,110,125
UIV-q-gneKA,127,141
UIV-q-gneKA,173,183
GmlyYCGE9ak,0,102
x-7aWcPmJ60,25,43
x-7aWcPmJ60,47,72
x-7aWcPmJ60,98,113
DK2LCIzIBrU,0,56
DK2LCIzIBrU,80,166
_W56nZk0fCI,184,224

In [None]:
import subprocess
from pathlib import Path
import librosa
from scipy.io import wavfile
import numpy as np
from demucs.pretrained import get_model, DEFAULT_MODEL
from demucs.apply import apply_model
import torch
import csv
import whisper


def download_youtube_clip(video_identifier, start_time, end_time, output_filename, num_attempts=5, url_base="https://www.youtube.com/watch?v="):
 status = False

 output_path = Path(output_filename)
 if output_path.exists():
 return True, "Already Downloaded"

 command = f"""
 yt-dlp --quiet --no-warnings -x --audio-format wav -f bestaudio -o "{output_filename}" --download-sections "*{start_time}-{end_time}" "{url_base}{video_identifier}"
 """.strip()

 attempts = 0
 while True:
 try:
 output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT)
 except subprocess.CalledProcessError as err:
 attempts += 1
 if attempts == num_attempts:
 return status, err.output
 else:
 break

 status = output_path.exists()
 return status, "Downloaded"


def split_long_audio(model, filepaths, character_name, save_dir="data_dir", out_sr=44100):
 if isinstance(filepaths, str):
 filepaths = [filepaths]

 for file_idx, filepath in enumerate(filepaths):

 save_path = Path(save_dir) / character_name
 save_path.mkdir(exist_ok=True, parents=True)

 print(f"Transcribing file {file_idx}: '{filepath}' to segments...")
 result = model.transcribe(filepath, word_timestamps=True, task="transcribe", beam_size=5, best_of=5)
 segments = result['segments']
 
 wav, sr = librosa.load(filepath, sr=None, offset=0, duration=None, mono=True)
 wav, _ = librosa.effects.trim(wav, top_db=20)
 peak = np.abs(wav).max()
 if peak > 1.0:
 wav = 0.98 * wav / peak
 wav2 = librosa.resample(wav, orig_sr=sr, target_sr=out_sr)
 wav2 /= max(wav2.max(), -wav2.min())

 for i, seg in enumerate(segments):
 start_time = seg['start']
 end_time = seg['end']
 wav_seg = wav2[int(start_time * out_sr):int(end_time * out_sr)]
 wav_seg_name = f"{character_name}_{file_idx}_{i}.wav"
 out_fpath = save_path / wav_seg_name
 wavfile.write(out_fpath, rate=out_sr, data=(wav_seg * np.iinfo(np.int16).max).astype(np.int16))


def extract_vocal_demucs(model, filename, out_filename, sr=44100, device=None, shifts=1, split=True, overlap=0.25, jobs=0):
 wav, sr = librosa.load(filename, mono=False, sr=sr)
 wav = torch.tensor(wav)
 ref = wav.mean(0)
 wav = (wav - ref.mean()) / ref.std()
 sources = apply_model(
 model,
 wav[None],
 device=device,
 shifts=shifts,
 split=split,
 overlap=overlap,
 progress=True,
 num_workers=jobs
 )[0]
 sources = sources * ref.std() + ref.mean()

 wav = sources[-1]
 wav = wav / max(1.01 * wav.abs().max(), 1)
 wavfile.write(out_filename, rate=sr, data=wav.numpy().T)
 return out_filename


def create_dataset(
 clips_csv_filepath = "data.csv",
 character = "somebody",
 do_extract_vocals = False,
 whisper_size = "medium",
 # Where raw yt clips will be downloaded to
 dl_dir = "downloads",
 # Where actual data will be organized
 data_dir = "dataset_raw",
 **kwargs
):
 dl_path = Path(dl_dir) / character
 dl_path.mkdir(exist_ok=True, parents=True)
 if do_extract_vocals:
 demucs_model = get_model(DEFAULT_MODEL)

 with Path(clips_csv_filepath).open() as f:
 reader = csv.DictReader(f)
 for i, row in enumerate(reader):
 outfile_path = dl_path / f"{character}_{i:04d}.wav"
 download_youtube_clip(row['ytid'], row['start'], row['end'], outfile_path)
 if do_extract_vocals:
 extract_vocal_demucs(demucs_model, outfile_path, outfile_path)

 filenames = sorted([str(x) for x in dl_path.glob("*.wav")])
 whisper_model = whisper.load_model(whisper_size)
 split_long_audio(whisper_model, filenames, character, data_dir) 

In [None]:
"""
Here, we override config to have num_workers=0 because
of a limitation in HF Spaces Docker /dev/shm.
"""

import json
from pathlib import Path
import multiprocessing

def update_config(config_file="configs/44k/config.json"):
 config_path = Path(config_file)
 data = json.loads(config_path.read_text())
 data['train']['batch_size'] = 32
 data['train']['eval_interval'] = 500
 data['train']['num_workers'] = multiprocessing.cpu_count()
 data['train']['persistent_workers'] = True
 data['train']['push_to_hub'] = True
 data['train']['repo_id'] = MODEL_REPO_ID # tuple(data['spk'])[0]
 data['train']['private'] = True
 config_path.write_text(json.dumps(data, indent=2, sort_keys=False))

## Run all Preprocessing Steps

In [None]:
create_dataset(character=CHARACTER, do_extract_vocals=DO_EXTRACT_VOCALS)
! svc pre-resample
! svc pre-config
! svc pre-hubert -fm crepe
update_config()

## Training

In [None]:
from __future__ import annotations

import os
import re
import warnings
from logging import getLogger
from multiprocessing import cpu_count
from pathlib import Path
from typing import Any

import lightning.pytorch as pl
import torch
from lightning.pytorch.accelerators import MPSAccelerator, TPUAccelerator
from lightning.pytorch.loggers import TensorBoardLogger
from lightning.pytorch.strategies.ddp import DDPStrategy
from lightning.pytorch.tuner import Tuner
from torch.cuda.amp import autocast
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torch.utils.tensorboard.writer import SummaryWriter

import so_vits_svc_fork.f0
import so_vits_svc_fork.modules.commons as commons
import so_vits_svc_fork.utils

from so_vits_svc_fork import utils
from so_vits_svc_fork.dataset import TextAudioCollate, TextAudioDataset
from so_vits_svc_fork.logger import is_notebook
from so_vits_svc_fork.modules.descriminators import MultiPeriodDiscriminator
from so_vits_svc_fork.modules.losses import discriminator_loss, feature_loss, generator_loss, kl_loss
from so_vits_svc_fork.modules.mel_processing import mel_spectrogram_torch
from so_vits_svc_fork.modules.synthesizers import SynthesizerTrn

from so_vits_svc_fork.train import VitsLightning, VCDataModule

LOG = getLogger(__name__)
torch.set_float32_matmul_precision("high")


from pathlib import Path

from huggingface_hub import create_repo, upload_folder, login, list_repo_files, delete_file

# if os.environ.get("HF_TOKEN"):
# login(os.environ.get("HF_TOKEN"))


class HuggingFacePushCallback(pl.Callback):
 def __init__(self, repo_id, private=False, every=100):
 self.repo_id = repo_id
 self.private = private
 self.every = every

 def on_validation_epoch_end(self, trainer, pl_module):
 self.repo_url = create_repo(
 repo_id=self.repo_id,
 exist_ok=True,
 private=self.private
 )
 self.repo_id = self.repo_url.repo_id
 if pl_module.global_step == 0:
 return
 print(f"\nšŸ¤— Pushing to Hugging Face Hub: {self.repo_url}...")
 model_dir = pl_module.hparams.model_dir
 upload_folder(
 repo_id=self.repo_id,
 folder_path=model_dir,
 path_in_repo=".",
 commit_message="šŸ» cheers",
 ignore_patterns=["*.git*", "*README.md*", "*__pycache__*"],
 )
 ckpt_pattern = r'^(D_|G_)\d+\.pth$'
 todelete = []
 repo_ckpts = [x for x in list_repo_files(self.repo_id) if re.match(ckpt_pattern, x) and x not in ["G_0.pth", "D_0.pth"]]
 local_ckpts = [x.name for x in Path(model_dir).glob("*.pth") if re.match(ckpt_pattern, x.name)]
 to_delete = set(repo_ckpts) - set(local_ckpts)

 for fname in to_delete:
 print(f"šŸ—‘ Deleting {fname} from repo")
 delete_file(fname, self.repo_id)


def train(
 config_path: Path | str, model_path: Path | str, reset_optimizer: bool = False
):
 config_path = Path(config_path)
 model_path = Path(model_path)

 hparams = utils.get_backup_hparams(config_path, model_path)
 utils.ensure_pretrained_model(model_path, hparams.model.get("type_", "hifi-gan"))

 datamodule = VCDataModule(hparams)
 strategy = (
 (
 "ddp_find_unused_parameters_true"
 if os.name != "nt"
 else DDPStrategy(find_unused_parameters=True, process_group_backend="gloo")
 )
 if torch.cuda.device_count() > 1
 else "auto"
 )
 LOG.info(f"Using strategy: {strategy}")
 
 callbacks = []
 if hparams.train.push_to_hub:
 callbacks.append(HuggingFacePushCallback(hparams.train.repo_id, hparams.train.private))
 if not is_notebook():
 callbacks.append(pl.callbacks.RichProgressBar())
 if callbacks == []:
 callbacks = None

 trainer = pl.Trainer(
 logger=TensorBoardLogger(
 model_path, "lightning_logs", hparams.train.get("log_version", 0)
 ),
 # profiler="simple",
 val_check_interval=hparams.train.eval_interval,
 max_epochs=hparams.train.epochs,
 check_val_every_n_epoch=None,
 precision="16-mixed"
 if hparams.train.fp16_run
 else "bf16-mixed"
 if hparams.train.get("bf16_run", False)
 else 32,
 strategy=strategy,
 callbacks=callbacks,
 benchmark=True,
 enable_checkpointing=False,
 )
 tuner = Tuner(trainer)
 model = VitsLightning(reset_optimizer=reset_optimizer, **hparams)

 # automatic batch size scaling
 batch_size = hparams.train.batch_size
 batch_split = str(batch_size).split("-")
 batch_size = batch_split[0]
 init_val = 2 if len(batch_split) <= 1 else int(batch_split[1])
 max_trials = 25 if len(batch_split) <= 2 else int(batch_split[2])
 if batch_size == "auto":
 batch_size = "binsearch"
 if batch_size in ["power", "binsearch"]:
 model.tuning = True
 tuner.scale_batch_size(
 model,
 mode=batch_size,
 datamodule=datamodule,
 steps_per_trial=1,
 init_val=init_val,
 max_trials=max_trials,
 )
 model.tuning = False
 else:
 batch_size = int(batch_size)
 # automatic learning rate scaling is not supported for multiple optimizers
 """if hparams.train.learning_rate == "auto":
 lr_finder = tuner.lr_find(model)
 LOG.info(lr_finder.results)
 fig = lr_finder.plot(suggest=True)
 fig.savefig(model_path / "lr_finder.png")"""

 trainer.fit(model, datamodule=datamodule)

if __name__ == '__main__':
 train('configs/44k/config.json', 'logs/44k')

## Train Cluster Model

In [None]:
! svc train-cluster

In [None]:
from huggingface_hub import upload_file

upload_file(path_or_fileobj="/content/logs/44k/kmeans.pt", repo_id=MODEL_REPO_ID, path_in_repo="kmeans.pt")