| import os |
| import gradio as gr |
| import spaces |
| from infer_rvc_python import BaseLoader |
| import random |
| import logging |
| import time |
| import soundfile as sf |
| from infer_rvc_python.main import download_manager, load_hu_bert, Config |
| import zipfile |
| import edge_tts |
| import asyncio |
| import librosa |
| import traceback |
| from pedalboard import Pedalboard, Reverb, Compressor, HighpassFilter |
| from pedalboard.io import AudioFile |
| from pydub import AudioSegment |
| import noisereduce as nr |
| import numpy as np |
| import urllib.request |
| import shutil |
| import threading |
| import argparse |
| import sys |
|
|
| |
| parser = argparse.ArgumentParser(description="Run the app with optional sharing") |
| parser.add_argument('--share', action='store_true', help='Enable sharing mode') |
| parser.add_argument('--theme', type=str, default="aliabid94/new-theme", help='Set the theme') |
| args = parser.parse_args() |
|
|
| IS_COLAB = True if ('google.colab' in sys.modules or args.share) else False |
| IS_ZERO_GPU = os.getenv("SPACES_ZERO_GPU") |
|
|
| logging.getLogger("infer_rvc_python").setLevel(logging.ERROR) |
|
|
| |
| converter = BaseLoader(only_cpu=False, hubert_path=None, rmvpe_path=None) |
| converter.hu_bert_model = load_hu_bert(Config(only_cpu=False), converter.hubert_path) |
|
|
| title = "<center><strong><font size='7'>RVC⚡ZERO</font></strong></center>" |
| description = "This demo is provided for educational and research purposes only." if IS_ZERO_GPU else "" |
| RESOURCES = "- You can also try `RVC⚡ZERO` in Colab’s free tier [link](https://github.com/R3gm/rvc_zero_ui?tab=readme-ov-file#rvczero)." |
| theme = args.theme |
| delete_cache_time = (3200, 3200) if IS_ZERO_GPU else (86400, 86400) |
|
|
| PITCH_ALGO_OPT = ["pm", "harvest", "crepe", "rmvpe", "rmvpe+"] |
|
|
| |
| async def get_voices_list(proxy=None): |
| from edge_tts import list_voices |
| voices = await list_voices(proxy=proxy) |
| voices = sorted(voices, key=lambda v: v["ShortName"]) |
| return [ |
| { |
| "ShortName": v["ShortName"], |
| "Gender": v["Gender"], |
| "ContentCategories": ", ".join(v["VoiceTag"]["ContentCategories"]), |
| "VoicePersonalities": ", ".join(v["VoiceTag"]["VoicePersonalities"]), |
| "FriendlyName": v["FriendlyName"], |
| } |
| for v in voices |
| ] |
|
|
| |
| def find_files(directory): |
| file_paths = [] |
| for fname in os.listdir(directory): |
| if fname.endswith(('.pth', '.zip', '.index')): |
| file_paths.append(os.path.join(directory, fname)) |
| return file_paths |
|
|
| def unzip_in_folder(my_zip, my_dir): |
| with zipfile.ZipFile(my_zip) as zf: |
| for info in zf.infolist(): |
| if info.is_dir(): |
| continue |
| info.filename = os.path.basename(info.filename) |
| zf.extract(info, my_dir) |
|
|
| def find_my_model(a_, b_): |
| if a_ is None or a_.endswith(".pth"): |
| return a_, b_ |
| txt_files = [f for f in [a_, b_] if f and f.endswith(".txt")] |
| directory = os.path.dirname(a_) |
| for txt in txt_files: |
| with open(txt) as f: |
| url = f.readline().strip() |
| download_manager(url=url, path=directory, extension="") |
| for f in find_files(directory): |
| if f.endswith(".zip"): |
| unzip_in_folder(f, directory) |
| model = index = None |
| for ff in find_files(directory): |
| if ff.endswith(".pth"): |
| model = ff |
| gr.Info(f"Model found: {ff}") |
| if ff.endswith(".index"): |
| index = ff |
| gr.Info(f"Index found: {ff}") |
| if not model: |
| gr.Error("Model not found") |
| if not index: |
| gr.Warning("Index not found") |
| return model, index |
|
|
| def ensure_valid_file(url): |
| if "huggingface" not in url: |
| raise ValueError("Only Hugging Face URLs allowed") |
| req = urllib.request.Request(url, method="HEAD") |
| with urllib.request.urlopen(req) as resp: |
| size = int(resp.headers.get("Content-Length", 0)) |
| if size > 900_000_000 and IS_ZERO_GPU: |
| raise ValueError("File too large for Zero GPU") |
| return size |
|
|
| def clear_files(directory): |
| time.sleep(15) |
| shutil.rmtree(directory, ignore_errors=True) |
|
|
| def get_my_model(url_data, progress=gr.Progress(track_tqdm=True)): |
| if not url_data: |
| return None, None |
| if "," in url_data: |
| a_, b_ = url_data.split(",") |
| a_, b_ = a_.strip().replace("/blob/", "/resolve/"), b_.strip().replace("/blob/", "/resolve/") |
| else: |
| a_, b_ = url_data.strip().replace("/blob/", "/resolve/"), None |
| out_dir = "downloads" |
| folder = str(random.randint(1000, 9999)) |
| directory = os.path.join(out_dir, folder) |
| os.makedirs(directory, exist_ok=True) |
| try: |
| for link in [a_] if not b_ else [a_, b_]: |
| ensure_valid_file(link) |
| download_manager(url=link, path=directory, extension="") |
| for f in find_files(directory): |
| if f.endswith(".zip"): |
| unzip_in_folder(f, directory) |
| model = index = None |
| for ff in find_files(directory): |
| if ff.endswith(".pth"): |
| model = ff |
| if ff.endswith(".index"): |
| index = ff |
| if not model: |
| raise ValueError("Model .pth not found") |
| if not index: |
| gr.Warning("Index not found") |
| return os.path.abspath(model), os.path.abspath(index) if index else None |
| finally: |
| threading.Thread(target=clear_files, args=(directory,)).start() |
|
|
| |
| def scan_models(): |
| """ |
| logs फ़ोल्डर के अंदर कहीं भी .pth और .index जोड़ी ढूंढता है। |
| ड्रॉपडाउन के लिए (display_name, pth_path, idx_path) की सूची बनाता है। |
| """ |
| logs_dir = "logs" |
| if not os.path.isdir(logs_dir): |
| return [] |
| |
| models = [] |
| |
| pth_files = [] |
| for root, dirs, files in os.walk(logs_dir): |
| for f in files: |
| if f.endswith(".pth"): |
| pth_files.append(os.path.join(root, f)) |
| |
| for pth_path in pth_files: |
| base = os.path.splitext(pth_path)[0] |
| |
| idx_path = None |
| |
| dir_name = os.path.dirname(pth_path) |
| for ext in ['.index', '.added.index']: |
| candidate = base + ext |
| if os.path.isfile(candidate): |
| idx_path = candidate |
| break |
| |
| if not idx_path: |
| for ext in ['.index', '.added.index']: |
| candidate = base + ext |
| if os.path.isfile(candidate): |
| idx_path = candidate |
| break |
| |
| if idx_path and os.path.isfile(idx_path): |
| |
| rel_path = os.path.relpath(pth_path, logs_dir) |
| display_name = os.path.splitext(rel_path)[0].replace(os.sep, " > ") |
| models.append((display_name, pth_path, idx_path)) |
| |
| return models |
|
|
| def update_model_paths(display_name): |
| models = scan_models() |
| for name, pth, idx in models: |
| if name == display_name: |
| |
| abs_pth = os.path.abspath(pth) |
| abs_idx = os.path.abspath(idx) if idx else None |
| print(f"DEBUG: Selected model pth = {abs_pth}") |
| print(f"DEBUG: Selected model index = {abs_idx}") |
| |
| if os.path.isfile(abs_pth): |
| return abs_pth, abs_idx |
| else: |
| gr.Error(f"Model file missing: {abs_pth}") |
| return None, None |
| return None, None |
|
|
| |
| def add_audio_effects(audio_list, type_output): |
| result = [] |
| for audio_path in audio_list: |
| try: |
| out_path = f'{os.path.splitext(audio_path)[0]}_effects.{type_output}' |
| board = Pedalboard([ |
| HighpassFilter(), |
| Compressor(ratio=4, threshold_db=-15), |
| Reverb(room_size=0.1, dry_level=0.8, wet_level=0.2, damping=0.7) |
| ]) |
| temp_wav = f'{os.path.splitext(audio_path)[0]}_temp.wav' |
| with AudioFile(audio_path) as f: |
| with AudioFile(temp_wav, 'w', f.samplerate, f.num_channels) as o: |
| while f.tell() < f.frames: |
| chunk = f.read(int(f.samplerate)) |
| o.write(board(chunk, f.samplerate, reset=False)) |
| AudioSegment.from_file(temp_wav).export(out_path, format=type_output) |
| os.remove(temp_wav) |
| result.append(out_path) |
| except Exception: |
| result.append(audio_path) |
| return result |
|
|
| def apply_noisereduce(audio_list, type_output): |
| result = [] |
| for audio_path in audio_list: |
| out_path = f"{os.path.splitext(audio_path)[0]}_noisereduce.{type_output}" |
| try: |
| audio = AudioSegment.from_file(audio_path) |
| samples = np.array(audio.get_array_of_samples()) |
| reduced = nr.reduce_noise(samples, sr=audio.frame_rate, prop_decrease=0.6) |
| reduced_audio = AudioSegment( |
| reduced.tobytes(), |
| frame_rate=audio.frame_rate, |
| sample_width=audio.sample_width, |
| channels=audio.channels |
| ) |
| reduced_audio.export(out_path, format=type_output) |
| result.append(out_path) |
| except Exception: |
| result.append(audio_path) |
| return result |
|
|
| @spaces.GPU() |
| def convert_now(audio_files, random_tag, converter, type_output, steps): |
| for _ in range(steps): |
| audio_files = converter( |
| audio_files, random_tag, |
| overwrite=False, |
| parallel_workers=(2 if IS_COLAB else 8), |
| type_output=type_output |
| ) |
| return audio_files |
|
|
| def run(audio_files, file_m, pitch_alg, pitch_lvl, file_index, index_inf, r_m_f, e_r, c_b_p, active_noise_reduce, audio_effects, type_output, steps): |
| print("DEBUG: file_m received =", file_m) |
| print("DEBUG: file_index received =", file_index) |
| |
| |
| if not file_m or not os.path.isfile(str(file_m)): |
| |
| default_models = scan_models() |
| if default_models: |
| file_m, file_index = default_models[0][1], default_models[0][2] |
| print(f"WARNING: Using fallback model: {file_m}") |
| else: |
| raise ValueError("No model available. Please upload a model to logs/ folder.") |
| |
| |
| if not audio_files: |
| raise ValueError("Please provide audio files") |
| |
|
|
| |
| |
|
|
| |
| if isinstance(audio_files, str): |
| audio_files = [audio_files] |
|
|
| try: |
| duration_base = librosa.get_duration(filename=audio_files[0]) |
| print("Duration:", duration_base) |
| except Exception as e: |
| print(e) |
|
|
| if file_m is not None and file_m.endswith(".txt"): |
| file_m, file_index = find_my_model(file_m, file_index) |
| print(file_m, file_index) |
|
|
| random_tag = "USER_" + str(random.randint(10000000, 99999999)) |
|
|
| converter.apply_conf( |
| tag=random_tag, |
| file_model=file_m, |
| pitch_algo=pitch_alg, |
| pitch_lvl=pitch_lvl, |
| file_index=file_index, |
| index_influence=index_inf, |
| respiration_median_filtering=r_m_f, |
| envelope_ratio=e_r, |
| consonant_breath_protection=c_b_p, |
| resample_sr=0, |
| ) |
| time.sleep(0.1) |
|
|
| result = convert_now(audio_files, random_tag, converter, type_output, steps) |
|
|
| if active_noise_reduce: |
| result = apply_noisereduce(result, type_output) |
|
|
| if audio_effects: |
| result = add_audio_effects(result, type_output) |
|
|
| return result |
|
|
| |
| def audio_input_conf(): |
| """ |
| दो तरह के इनपुट: |
| 1. gr.Audio - माइक्रोफ़ोन से रिकॉर्ड या एकल फ़ाइल अपलोड |
| 2. gr.File - एक साथ कई फ़ाइलें अपलोड करने के लिए |
| """ |
| return gr.Audio( |
| label="🎤 Record or Upload Audio", |
| type="filepath", |
| sources=["microphone", "upload"] |
| ) |
|
|
| def multi_audio_conf(): |
| return gr.File( |
| label="📁 Upload Multiple Audio Files (Optional)", |
| file_count="multiple", |
| file_types=[".wav", ".mp3", ".flac", ".m4a", ".ogg"], |
| type="filepath" |
| ) |
|
|
| def model_dropdown_conf(): |
| models = scan_models() |
| choices = [key for key, _, _ in models] |
| return gr.Dropdown( |
| label="🤖 Select Model", |
| choices=choices, |
| value=choices[0] if choices else None, |
| interactive=True |
| ) |
|
|
| def hidden_model_path_conf(): |
| return gr.Textbox(visible=False) |
|
|
| def hidden_index_path_conf(): |
| return gr.Textbox(visible=False) |
|
|
| def pitch_algo_conf(): |
| return gr.Dropdown(PITCH_ALGO_OPT, value="rmvpe+", label="Pitch algorithm") |
|
|
| def pitch_lvl_conf(): |
| return gr.Slider(-24, 24, value=0, step=1, label="Pitch level") |
|
|
| def index_inf_conf(): |
| return gr.Slider(0, 1, value=0.75, label="Index influence") |
|
|
| def respiration_filter_conf(): |
| return gr.Slider(0, 7, value=3, step=1, label="Respiration median filtering") |
|
|
| def envelope_ratio_conf(): |
| return gr.Slider(0, 1, value=0.25, label="Envelope ratio") |
|
|
| def consonant_protec_conf(): |
| return gr.Slider(0, 0.5, value=0.5, label="Consonant breath protection") |
|
|
| def button_conf(): |
| return gr.Button("🚀 Inference", variant="primary") |
|
|
| def output_conf(): |
| return gr.File(label="✅ Result", file_count="multiple", interactive=False) |
|
|
| def active_tts_conf(): |
| return gr.Checkbox(False, label="🔊 TTS", container=False) |
|
|
| def tts_voice_conf(voices): |
| return gr.Dropdown(label="TTS Voice", choices=voices, visible=False) |
|
|
| def tts_text_conf(): |
| return gr.Textbox(placeholder="Write the text here...", label="Text", visible=False, lines=3) |
|
|
| def tts_button_conf(): |
| return gr.Button("Process TTS", variant="secondary", visible=False) |
|
|
| def tts_play_conf(): |
| return gr.Checkbox(False, label="Play", container=False, visible=False) |
|
|
| def sound_gui(): |
| return gr.Audio(type="filepath", autoplay=True, visible=True, interactive=False, elem_id="audio_tts") |
|
|
| def steps_conf(): |
| return gr.Slider(1, 3, value=1, step=1, label="Steps") |
|
|
| def format_output_gui(): |
| return gr.Dropdown(choices=["wav", "mp3", "flac"], value="wav", label="Format output") |
|
|
| def denoise_conf(): |
| return gr.Checkbox(False, label="🧹 Denoise", container=False) |
|
|
| def effects_conf(): |
| return gr.Checkbox(False, label="🎚️ Reverb", container=False) |
|
|
| |
| def infer_tts_audio(tts_voice, tts_text, play_tts): |
| out_dir = "output" |
| folder_tts = "USER_" + str(random.randint(10000, 99999)) |
| os.makedirs(os.path.join(out_dir, folder_tts), exist_ok=True) |
| out_path = os.path.join(out_dir, folder_tts, "tts.mp3") |
| asyncio.run(edge_tts.Communicate(tts_text, "-".join(tts_voice.split('-')[:-1])).save(out_path)) |
| if play_tts: |
| return [out_path], out_path |
| return [out_path], None |
|
|
| def show_components_tts(val): |
| return (gr.update(visible=val),) * 4 |
|
|
| def down_active_conf(): |
| return gr.Checkbox(False, label="🌐 URL-to-Model", container=False) |
|
|
| def down_url_conf(): |
| return gr.Textbox(placeholder="Write the url here...", label="Enter URL", visible=False) |
|
|
| def down_button_conf(): |
| return gr.Button("Process", variant="secondary", visible=False) |
|
|
| def show_components_down(val): |
| return (gr.update(visible=val),) * 3 |
|
|
| |
| CSS = """ |
| #audio_tts { |
| visibility: hidden; height: 0px; width: 0px; max-width: 0px; max-height: 0px; |
| } |
| """ |
|
|
| def get_gui(theme, voices): |
| with gr.Blocks(theme=theme, css=CSS, delete_cache=delete_cache_time) as app: |
| gr.Markdown(title) |
| gr.Markdown(description) |
|
|
| |
| active_tts = active_tts_conf() |
| with gr.Row(): |
| with gr.Column(scale=1): |
| tts_text = tts_text_conf() |
| with gr.Column(scale=2): |
| with gr.Row(): |
| tts_voice = tts_voice_conf(voices) |
| tts_active_play = tts_play_conf() |
| tts_button = tts_button_conf() |
| tts_play = sound_gui() |
|
|
| active_tts.change(show_components_tts, [active_tts], [tts_voice, tts_text, tts_button, tts_active_play]) |
|
|
| |
| gr.Markdown("## 📥 Input Audio") |
| with gr.Row(): |
| audio_record = audio_input_conf() |
| audio_multi = multi_audio_conf() |
|
|
| |
| tts_button.click(infer_tts_audio, [tts_voice, tts_text, tts_active_play], [audio_multi, tts_play]) |
|
|
| |
| down_active = down_active_conf() |
| down_info = gr.Markdown( |
| "Provide a link to a zip file, or separate links with comma for .pth and .index files.", |
| visible=False |
| ) |
| with gr.Row(): |
| down_url = down_url_conf() |
| down_button = down_button_conf() |
|
|
| hidden_model = hidden_model_path_conf() |
| hidden_index = hidden_index_path_conf() |
|
|
| down_active.change(show_components_down, [down_active], [down_info, down_url, down_button]) |
|
|
| def update_from_url(url_data): |
| model_p, index_p = get_my_model(url_data) |
| return model_p, index_p |
|
|
| down_button.click(update_from_url, [down_url], [hidden_model, hidden_index]) |
|
|
| |
| model_dropdown = model_dropdown_conf() |
| model_dropdown.change(update_model_paths, [model_dropdown], [hidden_model, hidden_index]) |
|
|
| |
| with gr.Accordion("⚙️ Advanced settings", open=False): |
| algo = pitch_algo_conf() |
| algo_lvl = pitch_lvl_conf() |
| idx_inf = index_inf_conf() |
| res_fc = respiration_filter_conf() |
| env_r = envelope_ratio_conf() |
| cons = consonant_protec_conf() |
| steps_gui = steps_conf() |
| fmt_out = format_output_gui() |
| with gr.Row(): |
| denoise_gui = denoise_conf() |
| effects_gui = effects_conf() |
|
|
| btn = button_conf() |
| out = output_conf() |
|
|
| |
| def combined_audio_inputs(record_audio, multi_files): |
| """ |
| यदि multi_files में फ़ाइलें हैं तो उन्हें प्राथमिकता दें, |
| अन्यथा record_audio का उपयोग करें। |
| """ |
| if multi_files: |
| return multi_files |
| elif record_audio: |
| return record_audio |
| else: |
| return None |
|
|
| btn.click( |
| lambda rec, multi, *rest: run(combined_audio_inputs(rec, multi), *rest), |
| inputs=[ |
| audio_record, audio_multi, |
| hidden_model, algo, algo_lvl, hidden_index, |
| idx_inf, res_fc, env_r, cons, |
| denoise_gui, effects_gui, fmt_out, steps_gui |
| ], |
| outputs=out |
| ) |
|
|
| gr.Markdown(RESOURCES) |
|
|
| return app |
|
|
| if __name__ == "__main__": |
| tts_voice_list = asyncio.new_event_loop().run_until_complete(get_voices_list(proxy=None)) |
| voices = sorted([ |
| (" - ".join(reversed(v["FriendlyName"].split("-"))).replace("Microsoft ", "").replace("Online (Natural)", f"({v['Gender']})").strip(), |
| f"{v['ShortName']}-{v['Gender']}") |
| for v in tts_voice_list |
| ]) |
|
|
| app = get_gui(theme, voices) |
| app.queue(default_concurrency_limit=40) |
| app.launch(max_threads=40, share=IS_COLAB, show_error=True, quiet=False, debug=IS_COLAB, ssr_mode=False) |