File size: 6,811 Bytes
6327a30 04dacae 6327a30 04dacae 6327a30 4465af7 8db92ed 579fccc ec8ba93 579fccc 8db92ed 63b64fa fe90cff 8db92ed 33551a3 515f8e3 8db92ed 7eeb257 09c6470 7eeb257 09c6470 7eeb257 09c6470 8db92ed 4bead6e 7eeb257 dd205e4 229bbd8 515f8e3 a8a860e 229bbd8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
'''
pip install datasets soundfile huggingface_hub librosa
from datasets import load_dataset
import soundfile as sf
import os
from collections import defaultdict
import io
def download_voices_with_dynamic_counting(output_folder='genshin_voices_sample_5', max_files_per_speaker=5):
"""动态统计并下载所有speaker的音频和转录文件(兼容bytes/path格式)"""
# 加载数据集(流式模式)
dataset = load_dataset('simon3000/genshin-voice', split='train', streaming=True)
# 过滤条件:中文、有转录、类型为对话
filtered_data = dataset.filter(
lambda x: (
x['language'] == 'Chinese' and
x['transcription'] != '' and
x['type'] == 'Dialog'
)
)
# 动态统计speaker计数和文件下载
speaker_counts = defaultdict(int)
speaker_file_indices = defaultdict(int)
os.makedirs(output_folder, exist_ok=True)
for voice in filtered_data:
speaker = voice['speaker']
# 如果该speaker已下载足够文件,跳过
if speaker_counts[speaker] >= max_files_per_speaker:
continue
# 更新speaker计数
speaker_counts[speaker] += 1
file_num = str(speaker_file_indices[speaker] + 1).zfill(5) # 从00001开始
# 创建speaker子文件夹
speaker_folder = os.path.join(output_folder, speaker)
os.makedirs(speaker_folder, exist_ok=True)
# 构建文件路径
audio_path = os.path.join(speaker_folder, f'{speaker}_{file_num}.wav')
transcription_path = os.path.join(speaker_folder, f'{speaker}_{file_num}.txt')
# 处理音频数据(兼容bytes或path格式)
audio_data = voice['audio']
try:
if 'bytes' in audio_data and audio_data['bytes'] is not None:
# 从bytes直接读取音频
with io.BytesIO(audio_data['bytes']) as audio_bytes:
data, samplerate = sf.read(audio_bytes)
sf.write(audio_path, data, samplerate)
#elif 'path' in audio_data and os.path.exists(audio_data['path']):
# 如果提供path且文件存在,直接复制
#data, samplerate = sf.read(audio_data['path'])
#sf.write(audio_path, data, samplerate)
else:
print(f"警告: {speaker}的音频数据格式不支持,跳过")
speaker_counts[speaker] -= 1 # 回滚计数
continue
except Exception as e:
print(f"处理{speaker}的音频时出错: {str(e)}")
speaker_counts[speaker] -= 1
continue
# 保存转录文件
with open(transcription_path, 'w', encoding='utf-8') as f:
f.write(voice['transcription'])
speaker_file_indices[speaker] += 1
print(
f"[下载进度] {speaker}_{file_num} | "
f"进度: {speaker_counts[speaker]}/{max_files_per_speaker}"
)
# 打印最终统计
print("\n=== 下载结果 ===")
for speaker, count in speaker_counts.items():
print(f"{speaker}: {count}个文件")
if __name__ == '__main__':
download_voices_with_dynamic_counting()
from gradio_client import Client, handle_file
client = Client("http://localhost:7860")
result = client.predict(
prompt=handle_file('genshin_voices_sample_5/Ahangar/Ahangar_00001.wav'),
text="偷窃者没有好下场",
api_name="/gen_single"
)
print(result)
from shutil import copy2
copy2(result["value"], result["value"].split("/")[-1])
'''
import spaces
import os
import shutil
import threading
import time
import sys
from huggingface_hub import snapshot_download
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
sys.path.append(os.path.join(current_dir, "indextts"))
import gradio as gr
from indextts.infer import IndexTTS
from tools.i18n.i18n import I18nAuto
i18n = I18nAuto(language="zh_CN")
MODE = 'local'
snapshot_download("IndexTeam/IndexTTS-1.5",local_dir="checkpoints",)
tts = IndexTTS(model_dir="checkpoints", cfg_path="checkpoints/config.yaml")
os.makedirs("outputs/tasks",exist_ok=True)
os.makedirs("prompts",exist_ok=True)
@spaces.GPU
def infer(voice, text,output_path=None):
if not tts:
raise Exception("Model not loaded")
if not output_path:
output_path = os.path.join("outputs", f"spk_{int(time.time())}.wav")
tts.infer(voice, text, output_path)
return output_path
def tts_api(voice, text):
try:
output_path = infer(voice, text)
with open(output_path, "rb") as f:
audio_bytes = f.read()
return (200, {}, audio_bytes)
except Exception as e:
return (500, {"error": str(e)}, None)
def gen_single(prompt, text):
output_path = infer(prompt, text)
return gr.update(value=output_path,visible=True)
def update_prompt_audio():
update_button = gr.update(interactive=True)
return update_button
with gr.Blocks() as demo:
mutex = threading.Lock()
gr.HTML('''
<h2><center>IndexTTS: An Industrial-Level Controllable and Efficient Zero-Shot Text-To-Speech System</h2>
<p align="center">
<a href='https://arxiv.org/abs/2502.05512'><img src='https://img.shields.io/badge/ArXiv-2502.05512-red'></a>
''')
with gr.Tab("音频生成"):
with gr.Row():
os.makedirs("prompts",exist_ok=True)
prompt_audio = gr.Audio(label="请上传参考音频",key="prompt_audio",
sources=["upload","microphone"],type="filepath")
prompt_list = os.listdir("prompts")
default = ''
if prompt_list:
default = prompt_list[0]
input_text_single = gr.Textbox(label="请输入目标文本",key="input_text_single")
gen_button = gr.Button("生成语音",key="gen_button",interactive=True)
output_audio = gr.Audio(label="生成结果", visible=False,key="output_audio")
prompt_audio.upload(update_prompt_audio,
inputs=[],
outputs=[gen_button])
gen_button.click(gen_single,
inputs=[prompt_audio, input_text_single],
outputs=[output_audio])
# 移除 Interface 相关内容,避免重复渲染
# 只保留 Blocks demo,UI和API共用
# 这样既有UI,也能通过Gradio HTTP API调用
# 通过POST /run/predict即可API调用
# 移除 add_api_route 和 mount_gradio_app,Spaces 不支持
def main():
tts.load_normalizer()
demo.launch(server_name="0.0.0.0", server_port=7860, share = True)
if __name__ == "__main__":
main()
|