from neorvc.init.type_module import * import torch import os from urllib.parse import urlparse, parse_qs from pathlib import Path import asyncio import aiohttp import aiofiles import zipfile import shutil import re import hashlib import subprocess import shlex import argparse import logging import gc from tqdm import tqdm from pydub import AudioSegment import soundfile as sf from typing import Optional, Union try: from gradio.helpers import Progress as GradioProgress except ImportError: GradioProgress = None device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Device: {device}") def handle_progress(progress: Optional[Union[tqdm, 'GradioProgress']], description: Optional[str] = None, value: Optional[float] = None) -> None: """Handle progress updates for both tqdm and Gradio Progress objects.""" if progress is None: return # Handle Gradio Progress if GradioProgress is not None and isinstance(progress, GradioProgress): if description: progress(0, desc=description) # Gradio progress uses a callable to set description if value is not None: progress(value / 100) # Gradio expects progress as a fraction (0 to 1) return # Handle tqdm Progress if isinstance(progress, tqdm): if description and hasattr(progress, 'set_description'): progress.set_description(description) if value is not None: progress.update(value - progress.n if progress.n < value else 0) def get_youtube_video_id(url: str, ignore_playlist: bool = True) -> str | None: parsed = urlparse(url) if parsed.hostname == "youtu.be": return parsed.path.lstrip("/") if parsed.hostname in {"www.youtube.com", "youtube.com", "music.youtube.com"}: if not ignore_playlist and "list" in parse_qs(parsed.query): return parse_qs(parsed.query)["list"][0] if parsed.path == "/watch": return parse_qs(parsed.query)["v"][0] if parsed.path.startswith(("/embed/", "/v/")): return parsed.path.split("/")[-1] return None async def yt_download(link: str, cookies_path: str = os.path.join(BASE_DIR, "neorvc", "config.txt"), progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> Path: if not os.path.exists(cookies_path): raise FileNotFoundError(f"Cookies file not found: {cookies_path}") video_id = get_youtube_video_id(link) if not video_id: raise ValueError("Invalid YouTube URL: could not extract video ID.") output_file = os.path.join(OUTPUT_DIR, f"{video_id}.mp3") if os.path.exists(output_file): return Path(output_file) handle_progress(progress, description="Downloading YouTube audio", value=10) cmd = [ "yt-dlp", "--format", "bestaudio/best", "--extract-audio", "--audio-format", "mp3", "--audio-quality", "192K", "--cookies", str(cookies_path), "--output", str(output_file), "--no-check-certificate", link ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await process.communicate() if process.returncode != 0: raise RuntimeError("yt-dlp failed") if not os.path.exists(output_file): raise RuntimeError(f"Downloaded file not found: {output_file}") handle_progress(progress, value=20) return Path(output_file) def sanitize_model_name(dir_name: str) -> str: if not dir_name or not re.match(r"^[a-zAZ0-9_-]+$", dir_name): raise ValueError("Invalid model name") return dir_name async def download_online_model(url: str, dir_name: str, progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> str: dir_name = sanitize_model_name(dir_name) if not url or not dir_name: raise ValueError("URL and model name are required") if not url.startswith(("http://", "https://")): raise ValueError("Invalid URL format") extraction_folder = os.path.join(RVC_MODELS_DIR, dir_name) if os.path.exists(extraction_folder): raise ValueError(f"Model directory '{dir_name}' already exists") zip_name = url.split("/")[-1] if "pixeldrain.com" in url: zip_name = Path(zip_name).name url = f"https://pixeldrain.com/api/file/{zip_name}" handle_progress(progress, description=f"Downloading model '{dir_name}'") zip_path = os.path.join(OUTPUT_DIR, zip_name) async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status != 200: raise ValueError(f"Failed to download model: HTTP {response.status}") total_size = int(response.headers.get("content-length", 0)) downloaded = 0 async with aiofiles.open(zip_path, "wb") as f: async for chunk in response.content.iter_chunked(1024): await f.write(chunk) downloaded += len(chunk) if total_size: progress_value = 20 + (downloaded / total_size) * 30 handle_progress(progress, value=progress_value) handle_progress(progress, description="Extracting model") try: with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(extraction_folder) os.unlink(zip_path) except (zipfile.BadZipFile, OSError) as e: shutil.rmtree(extraction_folder, ignore_errors=True) raise ValueError(f"Error extracting zip: {e}") model_filepath = None index_filepath = None for file_path in Path(extraction_folder).rglob("*"): if file_path.suffix == ".pth" and file_path.stat().st_size > 40 * 1024 * 1024: model_filepath = file_path if file_path.suffix == ".index" and file_path.stat().st_size > 100 * 1024: index_filepath = file_path if not model_filepath: shutil.rmtree(extraction_folder, ignore_errors=True) raise ValueError(f"No valid .pth model file found in {extraction_folder}") for filepath in (model_filepath, index_filepath): if filepath and filepath != Path(os.path.join(extraction_folder, filepath.name)): os.rename(filepath, os.path.join(extraction_folder, filepath.name)) for item in Path(extraction_folder).iterdir(): if item.is_dir(): shutil.rmtree(item, ignore_errors=True) handle_progress(progress, value=10) print(f"Model '{dir_name}' downloaded") return f"Model '{dir_name}' downloaded" def raise_exception(msg: str) -> None: raise ValueError(msg) def get_rvc_model(voice_model: str) -> tuple[Path, Path | None]: model_dir = os.path.join(RVC_MODELS_DIR, voice_model) pth = None idx = None for f in Path(model_dir).iterdir(): if f.suffix == ".pth": pth = f if f.suffix == ".index": idx = f if not pth: raise_exception(f"No model file in {model_dir}") return pth, idx def get_audio_paths(song_dir: Path) -> tuple[Path | None, Path | None, Path | None, Path | None]: orig = inst = main_drb = backup = None for f in song_dir.iterdir(): if f.name.endswith("_Instrumental.wav"): inst = f orig = Path(os.path.join(song_dir, f.name.replace("_Instrumental", ""))) elif f.name.endswith("_Vocals_Main_DeReverb.wav"): main_drb = f elif f.name.endswith("_Vocals_Backup.wav"): backup = f return orig, inst, main_drb, backup def convert_to_stereo(path: Path) -> Path: info = sf.info(path) if info.channels == 1: stereo = path.with_stem(f"{path.stem}_stereo") cmd = shlex.split(f'ffmpeg -y -loglevel error -i "{path}" -ac 2 "{stereo}"') subprocess.run(cmd, check=True) return stereo return path def get_hash(fp: Path) -> str: h = hashlib.blake2b() with fp.open("rb") as f: while chunk := f.read(8192): h.update(chunk) return h.hexdigest()[:11] async def preprocess_song( inp: str, sid: str, inp_type: str, progress: Optional[Union[tqdm, 'GradioProgress']] = None ) -> tuple[Path | None, Path, Path, Path, Path, Path]: keep = False if inp_type == "yt": handle_progress(progress, description="Downloading audio") path = await yt_download(inp.split("&")[0], progress=progress) else: path = Path(inp.strip('"')) if not path.exists() or path.suffix.lower() not in AUDIO_EXTS: raise_exception(f"Invalid audio file: {path}") keep = True out_dir = os.path.join(UVR_OUTPUT_DIR, sid) os.makedirs(out_dir, exist_ok=True) separator = Separator(output_dir=out_dir, log_level=logging.WARNING) path = convert_to_stereo(path) base = path.stem inst = Path(os.path.join(out_dir, f"{base}_Instrumental.wav")) vocals = Path(os.path.join(out_dir, f"{base}_Vocals.wav")) vocals_no_reverb = Path(os.path.join(out_dir, f"{base}_Vocals_NoReverb.wav")) backup = Path(os.path.join(out_dir, f"{base}_Vocals_Backup.wav")) main_drb = Path(os.path.join(out_dir, f"{base}_Vocals_Main_DeReverb.wav")) required_files = [vocals, inst, vocals_no_reverb, backup, main_drb] if all(p.exists() for p in required_files): orig = path if keep else None return orig, vocals, inst, vocals_no_reverb, backup, main_drb handle_progress(progress, description="Separating vocals") separator.load_model(model_filename="model_bs_roformer_ep_317_sdr_12.9755.ckpt") voc_inst = separator.separate(str(path)) Path(os.path.join(out_dir, voc_inst[0])).rename(inst) Path(os.path.join(out_dir, voc_inst[1])).rename(vocals) handle_progress(progress, value=10) handle_progress(progress, description="DeReverbing vocals") separator.load_model(model_filename="UVR-DeEcho-DeReverb.pth") voc_no_reverb = separator.separate(str(vocals)) Path(os.path.join(out_dir, voc_no_reverb[0])).rename(vocals_no_reverb) Path(os.path.join(out_dir, voc_no_reverb[1])).rename(Path(os.path.join(out_dir, f"{base}_Vocals_Reverb.wav"))) handle_progress(progress, value=20) handle_progress(progress, description="Splitting main/backup vocals") separator.load_model(model_filename="mel_band_roformer_karaoke_aufr33_viperx_sdr_10.1956.ckpt") backing_voc = separator.separate(str(vocals_no_reverb)) Path(os.path.join(out_dir, backing_voc[0])).rename(backup) Path(os.path.join(out_dir, backing_voc[1])).rename(main_drb) handle_progress(progress, value=30) orig = path if keep else None return orig, vocals, inst, vocals_no_reverb, backup, main_drb def voice_change( model: str, vocals: Path, out: Path, pitch: int, f0: str, idx_rate: float, filt_rad: int, rms: float, prot: float, hop: int, progress: Optional[Union[tqdm, 'GradioProgress']] = None ) -> None: pth, idx = get_rvc_model(model) handle_progress(progress, description="Converting voice") run_infer_script( pth_path=str(pth), index_path=str(idx) if idx else "", index_rate=idx_rate, input_path=str(vocals), output_path=str(out), pitch=pitch, f0_method=f0, filter_radius=filt_rad, volume_envelope=rms, protect=prot, hop_length=hop, split_audio=False, f0_autotune_strength=0.0, clean_audio=False, f0_autotune=False, clean_strength=0.0, export_format="wav", f0_file=None, embedder_model="contentvec" ) gc.collect() handle_progress(progress, value=50) print(f"Voice conversion completed: {out}") def combine_audio(paths: list[Path], out: Path, mg: float, bg: float, ig: float, fmt: str, progress: Optional[Union[tqdm, 'GradioProgress']] = None) -> None: handle_progress(progress, description="Combining tracks") main = AudioSegment.from_file(paths[0]) + mg - 4 backup = AudioSegment.from_file(paths[1]) + bg - 6 inst = AudioSegment.from_file(paths[2]) + ig - 7 main.overlay(backup).overlay(inst).export(out, format=fmt) handle_progress(progress, value=60) print(f"Combined audio saved: {out}") async def song_cover_pipeline( song_input: str, voice_model: str, pitch_change: int, keep_files: bool, main_gain: float = 0, backup_gain: float = 0, inst_gain: float = 0, index_rate: float = 0.5, filter_radius: int = 3, rms_mix_rate: float = 0.25, f0_method: str = "rmvpe", crepe_hop_length: int = 128, protect: float = 0.33, output_format: str = "mp3", progress: Optional[Union[tqdm, 'GradioProgress']] = None ) -> Path: if not song_input or not voice_model: raise_exception("Song input and voice model are required") handle_progress(progress, description="Starting pipeline") parsed = urlparse(song_input) if parsed.scheme.startswith("http"): inp_type = "yt" sid = get_youtube_video_id(song_input) if not sid: raise_exception("Invalid YouTube URL") base_filename = sid else: inp_type = "local" song_input = song_input.strip('"') path = Path(song_input) if path.exists(): sid = get_hash(path) base_filename = path.stem else: raise_exception(f"File not found: {song_input}") song_dir = os.path.join(OUTPUT_DIR, sid) os.makedirs(song_dir, exist_ok=True) orig_fp, inst_fp, main_drb_fp, backup_fp = get_audio_paths(Path(song_dir)) if not keep_files and all((orig_fp, inst_fp, main_drb_fp, backup_fp)): orig, inst, main, backup, main_drb = orig_fp, inst_fp, main_drb_fp, backup_fp, main_drb_fp else: orig, vocals, inst, main, backup, main_drb = await preprocess_song( song_input, sid, inp_type, progress ) ai_vocals = Path(os.path.join(OUTPUT_DIR, f"{voice_model}_Generated_{base_filename}.wav")) ai_cover = Path(os.path.join(OUTPUT_DIR, f"{base_filename} ({voice_model} Ver).{output_format}")) voice_change( model=voice_model, vocals=main_drb, out=ai_vocals, pitch=pitch_change, f0=f0_method, idx_rate=index_rate, filt_rad=filter_radius, rms=rms_mix_rate, prot=protect, hop=crepe_hop_length, progress=progress ) combine_audio([ai_vocals, backup, inst], ai_cover, main_gain, backup_gain, inst_gain, output_format, progress) if not keep_files: handle_progress(progress, description="Cleaning up") for f in (main, inst, backup): if f and f.exists(): f.unlink() handle_progress(progress, value=65) print(f"Output saved: {ai_cover}") return ai_cover async def vocal_cover_pipeline( song_input: str, voice_model: str, pitch_change: int, keep_files: bool, main_gain: float = 0, backup_gain: float = 0, inst_gain: float = 0, index_rate: float = 0.5, filter_radius: int = 3, rms_mix_rate: float = 0.25, f0_method: str = "rmvpe", crepe_hop_length: int = 128, protect: float = 0.33, output_format: str = "mp3", progress: Optional[Union[tqdm, 'GradioProgress']] = None ) -> Path: if not song_input or not voice_model: raise_exception("Song input and voice model are required") handle_progress(progress, description="Starting pipeline") parsed = urlparse(song_input) if parsed.scheme.startswith("http"): inp_type = "yt" sid = get_youtube_video_id(song_input) if not sid: raise_exception("Invalid YouTube URL") song_input = str(await yt_download(song_input.split("&")[0], progress=progress)) else: inp_type = "local" song_input = song_input.strip('"') path = Path(song_input) if path.exists(): sid = get_hash(path) else: raise_exception(f"File not found: {song_input}") orig = Path(song_input) song_dir = os.path.join(OUTPUT_DIR, sid) ai_vocals = Path(os.path.join(OUTPUT_DIR, f"Cover_{orig.stem}_{voice_model}.wav")) voice_change( voice_model, orig, ai_vocals, pitch_change, f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, progress ) print(f"Output saved: {ai_vocals}") return ai_vocals def parse_arguments() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Generate a song cover using voice conversion.") subparsers = parser.add_subparsers(dest="command", help="Available commands") infer_parser = subparsers.add_parser("infer", help="RVC Inference") infer_parser.add_argument("song_input", help="YouTube URL or local audio file path") infer_parser.add_argument("voice_model", help="Name of the RVC voice model") infer_parser.add_argument("--pitch_change", type=int, default=0, help="Pitch change in semitones") infer_parser.add_argument("--keep_files", action="store_true", help="Keep intermediate files") infer_parser.add_argument( "--output_type", choices=["full", "vocals"], default="full", help="Output type: full song or vocals only" ) infer_parser.add_argument("--main_gain", type=float, default=0, help="Main vocals gain (dB)") infer_parser.add_argument("--backup_gain", type=float, default=0, help="Backup vocals gain (dB)") infer_parser.add_argument("--inst_gain", type=float, default=0, help="Instrumental gain (dB)") infer_parser.add_argument("--index_rate", type=float, default=0.5, help="Index rate for voice conversion") infer_parser.add_argument("--filter_radius", type=int, default=3, help="Filter radius for voice conversion") infer_parser.add_argument("--rms_mix_rate", type=float, default=0.25, help="RMS mix rate") infer_parser.add_argument("--f0_method", default="rmvpe", help="F0 extraction method") infer_parser.add_argument("--crepe_hop_length", type=int, default=128, help="CREPE hop length") infer_parser.add_argument("--protect", type=float, default=0.33, help="Protect voiceless consonants") infer_parser.add_argument("--output_format", default="mp3", help="Output format (e.g., mp3, wav)") download_parser = subparsers.add_parser("download", help="RVC Model Downloader") download_parser.add_argument("model_url", help="URL for RVC model") download_parser.add_argument("voice_model", help="Name of the RVC voice model") return parser.parse_args() async def main() -> None: args = parse_arguments() if not args.command: print("Please run with '-h' for help") return with tqdm(total=100, desc="Starting...", unit="%") as pbar: if args.command == "infer": pipeline = song_cover_pipeline if args.output_type == "full" else vocal_cover_pipeline result = await pipeline( song_input=args.song_input, voice_model=args.voice_model, pitch_change=args.pitch_change, keep_files=args.keep_files, main_gain=args.main_gain, backup_gain=args.backup_gain, inst_gain=args.inst_gain, index_rate=args.index_rate, filter_radius=args.filter_radius, rms_mix_rate=args.rms_mix_rate, f0_method=args.f0_method, crepe_hop_length=args.crepe_hop_length, protect=args.protect, output_format=args.output_format, progress=pbar ) handle_progress(pbar, value=100) print(f"Completed: {result}") elif args.command == "download": result = await download_online_model( url=args.model_url, dir_name=args.voice_model, progress=pbar ) handle_progress(pbar, value=100) print("Download completed") if __name__ == "__main__": asyncio.run(main())