Spaces:
Configuration error
Configuration error
from neorvc.init.type_module import * | |
import torch | |
import os | |
from urllib.parse import urlparse, parse_qs | |
from pathlib import Path | |
import asyncio | |
import aiohttp | |
import aiofiles | |
import zipfile | |
import shutil | |
import re | |
import hashlib | |
import subprocess | |
import shlex | |
import argparse | |
import logging | |
import gc | |
from tqdm import tqdm | |
from pydub import AudioSegment | |
import soundfile as sf | |
from typing import Optional, Union | |
try: | |
from gradio.helpers import Progress as GradioProgress | |
except ImportError: | |
GradioProgress = None | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"Device: {device}") | |
def handle_progress(progress: Optional[Union[tqdm, 'GradioProgress']], description: Optional[str] = None, value: Optional[float] = None) -> None: | |
"""Handle progress updates for both tqdm and Gradio Progress objects.""" | |
if progress is None: | |
return | |
# Handle Gradio Progress | |
if GradioProgress is not None and isinstance(progress, GradioProgress): | |
if description: | |
progress(0, desc=description) # Gradio progress uses a callable to set description | |
if value is not None: | |
progress(value / 100) # Gradio expects progress as a fraction (0 to 1) | |
return | |
# Handle tqdm Progress | |
if isinstance(progress, tqdm): | |
if description and hasattr(progress, 'set_description'): | |
progress.set_description(description) | |
if value is not None: | |
progress.update(value - progress.n if progress.n < value else 0) | |
def get_youtube_video_id(url: str, ignore_playlist: bool = True) -> str | None: | |
parsed = urlparse(url) | |
if parsed.hostname == "youtu.be": | |
return parsed.path.lstrip("/") | |
if parsed.hostname in {"www.youtube.com", "youtube.com", "music.youtube.com"}: | |
if not ignore_playlist and "list" in parse_qs(parsed.query): | |
return parse_qs(parsed.query)["list"][0] | |
if parsed.path == "/watch": | |
return parse_qs(parsed.query)["v"][0] | |
if parsed.path.startswith(("/embed/", "/v/")): | |
return parsed.path.split("/")[-1] | |
return None | |
async def yt_download(link: str, cookies_path: str = os.path.join(BASE_DIR, "neorvc", "config.txt"), progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> Path: | |
if not os.path.exists(cookies_path): | |
raise FileNotFoundError(f"Cookies file not found: {cookies_path}") | |
video_id = get_youtube_video_id(link) | |
if not video_id: | |
raise ValueError("Invalid YouTube URL: could not extract video ID.") | |
output_file = os.path.join(OUTPUT_DIR, f"{video_id}.mp3") | |
if os.path.exists(output_file): | |
return Path(output_file) | |
handle_progress(progress, description="Downloading YouTube audio", value=10) | |
cmd = [ | |
"yt-dlp", | |
"--format", "bestaudio/best", | |
"--extract-audio", | |
"--audio-format", "mp3", | |
"--audio-quality", "192K", | |
"--cookies", str(cookies_path), | |
"--output", str(output_file), | |
"--no-check-certificate", | |
link | |
] | |
process = await asyncio.create_subprocess_exec( | |
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
) | |
await process.communicate() | |
if process.returncode != 0: | |
raise RuntimeError("yt-dlp failed") | |
if not os.path.exists(output_file): | |
raise RuntimeError(f"Downloaded file not found: {output_file}") | |
handle_progress(progress, value=20) | |
return Path(output_file) | |
def sanitize_model_name(dir_name: str) -> str: | |
if not dir_name or not re.match(r"^[a-zAZ0-9_-]+$", dir_name): | |
raise ValueError("Invalid model name") | |
return dir_name | |
async def download_online_model(url: str, dir_name: str, progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> str: | |
dir_name = sanitize_model_name(dir_name) | |
if not url or not dir_name: | |
raise ValueError("URL and model name are required") | |
if not url.startswith(("http://", "https://")): | |
raise ValueError("Invalid URL format") | |
extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) | |
if os.path.exists(extraction_folder): | |
raise ValueError(f"Model directory '{dir_name}' already exists") | |
zip_name = url.split("/")[-1] | |
if "pixeldrain.com" in url: | |
zip_name = Path(zip_name).name | |
url = f"https://pixeldrain.com/api/file/{zip_name}" | |
handle_progress(progress, description=f"Downloading model '{dir_name}'") | |
zip_path = os.path.join(OUTPUT_DIR, zip_name) | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url) as response: | |
if response.status != 200: | |
raise ValueError(f"Failed to download model: HTTP {response.status}") | |
total_size = int(response.headers.get("content-length", 0)) | |
downloaded = 0 | |
async with aiofiles.open(zip_path, "wb") as f: | |
async for chunk in response.content.iter_chunked(1024): | |
await f.write(chunk) | |
downloaded += len(chunk) | |
if total_size: | |
progress_value = 20 + (downloaded / total_size) * 30 | |
handle_progress(progress, value=progress_value) | |
handle_progress(progress, description="Extracting model") | |
try: | |
with zipfile.ZipFile(zip_path, "r") as zip_ref: | |
zip_ref.extractall(extraction_folder) | |
os.unlink(zip_path) | |
except (zipfile.BadZipFile, OSError) as e: | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise ValueError(f"Error extracting zip: {e}") | |
model_filepath = None | |
index_filepath = None | |
for file_path in Path(extraction_folder).rglob("*"): | |
if file_path.suffix == ".pth" and file_path.stat().st_size > 40 * 1024 * 1024: | |
model_filepath = file_path | |
if file_path.suffix == ".index" and file_path.stat().st_size > 100 * 1024: | |
index_filepath = file_path | |
if not model_filepath: | |
shutil.rmtree(extraction_folder, ignore_errors=True) | |
raise ValueError(f"No valid .pth model file found in {extraction_folder}") | |
for filepath in (model_filepath, index_filepath): | |
if filepath and filepath != Path(os.path.join(extraction_folder, filepath.name)): | |
os.rename(filepath, os.path.join(extraction_folder, filepath.name)) | |
for item in Path(extraction_folder).iterdir(): | |
if item.is_dir(): | |
shutil.rmtree(item, ignore_errors=True) | |
handle_progress(progress, value=10) | |
print(f"Model '{dir_name}' downloaded") | |
return f"Model '{dir_name}' downloaded" | |
def raise_exception(msg: str) -> None: | |
raise ValueError(msg) | |
def get_rvc_model(voice_model: str) -> tuple[Path, Path | None]: | |
model_dir = os.path.join(RVC_MODELS_DIR, voice_model) | |
pth = None | |
idx = None | |
for f in Path(model_dir).iterdir(): | |
if f.suffix == ".pth": | |
pth = f | |
if f.suffix == ".index": | |
idx = f | |
if not pth: | |
raise_exception(f"No model file in {model_dir}") | |
return pth, idx | |
def get_audio_paths(song_dir: Path) -> tuple[Path | None, Path | None, Path | None, Path | None]: | |
orig = inst = main_drb = backup = None | |
for f in song_dir.iterdir(): | |
if f.name.endswith("_Instrumental.wav"): | |
inst = f | |
orig = Path(os.path.join(song_dir, f.name.replace("_Instrumental", ""))) | |
elif f.name.endswith("_Vocals_Main_DeReverb.wav"): | |
main_drb = f | |
elif f.name.endswith("_Vocals_Backup.wav"): | |
backup = f | |
return orig, inst, main_drb, backup | |
def convert_to_stereo(path: Path) -> Path: | |
info = sf.info(path) | |
if info.channels == 1: | |
stereo = path.with_stem(f"{path.stem}_stereo") | |
cmd = shlex.split(f'ffmpeg -y -loglevel error -i "{path}" -ac 2 "{stereo}"') | |
subprocess.run(cmd, check=True) | |
return stereo | |
return path | |
def get_hash(fp: Path) -> str: | |
h = hashlib.blake2b() | |
with fp.open("rb") as f: | |
while chunk := f.read(8192): | |
h.update(chunk) | |
return h.hexdigest()[:11] | |
async def preprocess_song( | |
inp: str, | |
sid: str, | |
inp_type: str, | |
progress: Optional[Union[tqdm, 'GradioProgress']] = None | |
) -> tuple[Path | None, Path, Path, Path, Path, Path]: | |
keep = False | |
if inp_type == "yt": | |
handle_progress(progress, description="Downloading audio") | |
path = await yt_download(inp.split("&")[0], progress=progress) | |
else: | |
path = Path(inp.strip('"')) | |
if not path.exists() or path.suffix.lower() not in AUDIO_EXTS: | |
raise_exception(f"Invalid audio file: {path}") | |
keep = True | |
out_dir = os.path.join(UVR_OUTPUT_DIR, sid) | |
os.makedirs(out_dir, exist_ok=True) | |
separator = Separator(output_dir=out_dir, log_level=logging.WARNING) | |
path = convert_to_stereo(path) | |
base = path.stem | |
inst = Path(os.path.join(out_dir, f"{base}_Instrumental.wav")) | |
vocals = Path(os.path.join(out_dir, f"{base}_Vocals.wav")) | |
vocals_no_reverb = Path(os.path.join(out_dir, f"{base}_Vocals_NoReverb.wav")) | |
backup = Path(os.path.join(out_dir, f"{base}_Vocals_Backup.wav")) | |
main_drb = Path(os.path.join(out_dir, f"{base}_Vocals_Main_DeReverb.wav")) | |
required_files = [vocals, inst, vocals_no_reverb, backup, main_drb] | |
if all(p.exists() for p in required_files): | |
orig = path if keep else None | |
return orig, vocals, inst, vocals_no_reverb, backup, main_drb | |
handle_progress(progress, description="Separating vocals") | |
separator.load_model(model_filename="model_bs_roformer_ep_317_sdr_12.9755.ckpt") | |
voc_inst = separator.separate(str(path)) | |
Path(os.path.join(out_dir, voc_inst[0])).rename(inst) | |
Path(os.path.join(out_dir, voc_inst[1])).rename(vocals) | |
handle_progress(progress, value=10) | |
handle_progress(progress, description="DeReverbing vocals") | |
separator.load_model(model_filename="UVR-DeEcho-DeReverb.pth") | |
voc_no_reverb = separator.separate(str(vocals)) | |
Path(os.path.join(out_dir, voc_no_reverb[0])).rename(vocals_no_reverb) | |
Path(os.path.join(out_dir, voc_no_reverb[1])).rename(Path(os.path.join(out_dir, f"{base}_Vocals_Reverb.wav"))) | |
handle_progress(progress, value=20) | |
handle_progress(progress, description="Splitting main/backup vocals") | |
separator.load_model(model_filename="mel_band_roformer_karaoke_aufr33_viperx_sdr_10.1956.ckpt") | |
backing_voc = separator.separate(str(vocals_no_reverb)) | |
Path(os.path.join(out_dir, backing_voc[0])).rename(backup) | |
Path(os.path.join(out_dir, backing_voc[1])).rename(main_drb) | |
handle_progress(progress, value=30) | |
orig = path if keep else None | |
return orig, vocals, inst, vocals_no_reverb, backup, main_drb | |
def voice_change( | |
model: str, | |
vocals: Path, | |
out: Path, | |
pitch: int, | |
f0: str, | |
idx_rate: float, | |
filt_rad: int, | |
rms: float, | |
prot: float, | |
hop: int, | |
progress: Optional[Union[tqdm, 'GradioProgress']] = None | |
) -> None: | |
pth, idx = get_rvc_model(model) | |
handle_progress(progress, description="Converting voice") | |
run_infer_script( | |
pth_path=str(pth), | |
index_path=str(idx) if idx else "", | |
index_rate=idx_rate, | |
input_path=str(vocals), | |
output_path=str(out), | |
pitch=pitch, | |
f0_method=f0, | |
filter_radius=filt_rad, | |
volume_envelope=rms, | |
protect=prot, | |
hop_length=hop, | |
split_audio=False, | |
f0_autotune_strength=0.0, | |
clean_audio=False, | |
f0_autotune=False, | |
clean_strength=0.0, | |
export_format="wav", | |
f0_file=None, | |
embedder_model="contentvec" | |
) | |
gc.collect() | |
handle_progress(progress, value=50) | |
print(f"Voice conversion completed: {out}") | |
def combine_audio(paths: list[Path], out: Path, mg: float, bg: float, ig: float, fmt: str, progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> None: | |
handle_progress(progress, description="Combining tracks") | |
main = AudioSegment.from_file(paths[0]) + mg - 4 | |
backup = AudioSegment.from_file(paths[1]) + bg - 6 | |
inst = AudioSegment.from_file(paths[2]) + ig - 7 | |
main.overlay(backup).overlay(inst).export(out, format=fmt) | |
handle_progress(progress, value=60) | |
print(f"Combined audio saved: {out}") | |
async def song_cover_pipeline( | |
song_input: str, | |
voice_model: str, | |
pitch_change: int, | |
keep_files: bool, | |
main_gain: float = 0, | |
backup_gain: float = 0, | |
inst_gain: float = 0, | |
index_rate: float = 0.5, | |
filter_radius: int = 3, | |
rms_mix_rate: float = 0.25, | |
f0_method: str = "rmvpe", | |
crepe_hop_length: int = 128, | |
protect: float = 0.33, | |
output_format: str = "mp3", | |
progress: Optional[Union[tqdm, 'GradioProgress']] = None | |
) -> Path: | |
if not song_input or not voice_model: | |
raise_exception("Song input and voice model are required") | |
handle_progress(progress, description="Starting pipeline") | |
parsed = urlparse(song_input) | |
if parsed.scheme.startswith("http"): | |
inp_type = "yt" | |
sid = get_youtube_video_id(song_input) | |
if not sid: | |
raise_exception("Invalid YouTube URL") | |
base_filename = sid | |
else: | |
inp_type = "local" | |
song_input = song_input.strip('"') | |
path = Path(song_input) | |
if path.exists(): | |
sid = get_hash(path) | |
base_filename = path.stem | |
else: | |
raise_exception(f"File not found: {song_input}") | |
song_dir = os.path.join(OUTPUT_DIR, sid) | |
os.makedirs(song_dir, exist_ok=True) | |
orig_fp, inst_fp, main_drb_fp, backup_fp = get_audio_paths(Path(song_dir)) | |
if not keep_files and all((orig_fp, inst_fp, main_drb_fp, backup_fp)): | |
orig, inst, main, backup, main_drb = orig_fp, inst_fp, main_drb_fp, backup_fp, main_drb_fp | |
else: | |
orig, vocals, inst, main, backup, main_drb = await preprocess_song( | |
song_input, sid, inp_type, progress | |
) | |
ai_vocals = Path(os.path.join(OUTPUT_DIR, f"{voice_model}_Generated_{base_filename}.wav")) | |
ai_cover = Path(os.path.join(OUTPUT_DIR, f"{base_filename} ({voice_model} Ver).{output_format}")) | |
voice_change( | |
model=voice_model, | |
vocals=main_drb, | |
out=ai_vocals, | |
pitch=pitch_change, | |
f0=f0_method, | |
idx_rate=index_rate, | |
filt_rad=filter_radius, | |
rms=rms_mix_rate, | |
prot=protect, | |
hop=crepe_hop_length, | |
progress=progress | |
) | |
combine_audio([ai_vocals, backup, inst], ai_cover, main_gain, backup_gain, inst_gain, output_format, progress) | |
if not keep_files: | |
handle_progress(progress, description="Cleaning up") | |
for f in (main, inst, backup): | |
if f and f.exists(): | |
f.unlink() | |
handle_progress(progress, value=65) | |
print(f"Output saved: {ai_cover}") | |
return ai_cover | |
async def vocal_cover_pipeline( | |
song_input: str, | |
voice_model: str, | |
pitch_change: int, | |
keep_files: bool, | |
main_gain: float = 0, | |
backup_gain: float = 0, | |
inst_gain: float = 0, | |
index_rate: float = 0.5, | |
filter_radius: int = 3, | |
rms_mix_rate: float = 0.25, | |
f0_method: str = "rmvpe", | |
crepe_hop_length: int = 128, | |
protect: float = 0.33, | |
output_format: str = "mp3", | |
progress: Optional[Union[tqdm, 'GradioProgress']] = None | |
) -> Path: | |
if not song_input or not voice_model: | |
raise_exception("Song input and voice model are required") | |
handle_progress(progress, description="Starting pipeline") | |
parsed = urlparse(song_input) | |
if parsed.scheme.startswith("http"): | |
inp_type = "yt" | |
sid = get_youtube_video_id(song_input) | |
if not sid: | |
raise_exception("Invalid YouTube URL") | |
song_input = str(await yt_download(song_input.split("&")[0], progress=progress)) | |
else: | |
inp_type = "local" | |
song_input = song_input.strip('"') | |
path = Path(song_input) | |
if path.exists(): | |
sid = get_hash(path) | |
else: | |
raise_exception(f"File not found: {song_input}") | |
orig = Path(song_input) | |
song_dir = os.path.join(OUTPUT_DIR, sid) | |
ai_vocals = Path(os.path.join(OUTPUT_DIR, f"Cover_{orig.stem}_{voice_model}.wav")) | |
voice_change( | |
voice_model, | |
orig, | |
ai_vocals, | |
pitch_change, | |
f0_method, | |
index_rate, | |
filter_radius, | |
rms_mix_rate, | |
protect, | |
crepe_hop_length, | |
progress | |
) | |
print(f"Output saved: {ai_vocals}") | |
return ai_vocals | |
def parse_arguments() -> argparse.Namespace: | |
parser = argparse.ArgumentParser(description="Generate a song cover using voice conversion.") | |
subparsers = parser.add_subparsers(dest="command", help="Available commands") | |
infer_parser = subparsers.add_parser("infer", help="RVC Inference") | |
infer_parser.add_argument("song_input", help="YouTube URL or local audio file path") | |
infer_parser.add_argument("voice_model", help="Name of the RVC voice model") | |
infer_parser.add_argument("--pitch_change", type=int, default=0, help="Pitch change in semitones") | |
infer_parser.add_argument("--keep_files", action="store_true", help="Keep intermediate files") | |
infer_parser.add_argument( | |
"--output_type", choices=["full", "vocals"], default="full", | |
help="Output type: full song or vocals only" | |
) | |
infer_parser.add_argument("--main_gain", type=float, default=0, help="Main vocals gain (dB)") | |
infer_parser.add_argument("--backup_gain", type=float, default=0, help="Backup vocals gain (dB)") | |
infer_parser.add_argument("--inst_gain", type=float, default=0, help="Instrumental gain (dB)") | |
infer_parser.add_argument("--index_rate", type=float, default=0.5, help="Index rate for voice conversion") | |
infer_parser.add_argument("--filter_radius", type=int, default=3, help="Filter radius for voice conversion") | |
infer_parser.add_argument("--rms_mix_rate", type=float, default=0.25, help="RMS mix rate") | |
infer_parser.add_argument("--f0_method", default="rmvpe", help="F0 extraction method") | |
infer_parser.add_argument("--crepe_hop_length", type=int, default=128, help="CREPE hop length") | |
infer_parser.add_argument("--protect", type=float, default=0.33, help="Protect voiceless consonants") | |
infer_parser.add_argument("--output_format", default="mp3", help="Output format (e.g., mp3, wav)") | |
download_parser = subparsers.add_parser("download", help="RVC Model Downloader") | |
download_parser.add_argument("model_url", help="URL for RVC model") | |
download_parser.add_argument("voice_model", help="Name of the RVC voice model") | |
return parser.parse_args() | |
async def main() -> None: | |
args = parse_arguments() | |
if not args.command: | |
print("Please run with '-h' for help") | |
return | |
with tqdm(total=100, desc="Starting...", unit="%") as pbar: | |
if args.command == "infer": | |
pipeline = song_cover_pipeline if args.output_type == "full" else vocal_cover_pipeline | |
result = await pipeline( | |
song_input=args.song_input, | |
voice_model=args.voice_model, | |
pitch_change=args.pitch_change, | |
keep_files=args.keep_files, | |
main_gain=args.main_gain, | |
backup_gain=args.backup_gain, | |
inst_gain=args.inst_gain, | |
index_rate=args.index_rate, | |
filter_radius=args.filter_radius, | |
rms_mix_rate=args.rms_mix_rate, | |
f0_method=args.f0_method, | |
crepe_hop_length=args.crepe_hop_length, | |
protect=args.protect, | |
output_format=args.output_format, | |
progress=pbar | |
) | |
handle_progress(pbar, value=100) | |
print(f"Completed: {result}") | |
elif args.command == "download": | |
result = await download_online_model( | |
url=args.model_url, | |
dir_name=args.voice_model, | |
progress=pbar | |
) | |
handle_progress(pbar, value=100) | |
print("Download completed") | |
if __name__ == "__main__": | |
asyncio.run(main()) | |