import io import json import os import time import uuid from typing import Optional import gradio as gr import pandas as pd import requests from pydub import AudioSegment from TTSs.base_tts import Base_TTS class avaliable_voice_type: 语言: Optional[str] = "" 场景: Optional[str] = "" 音色名称: str voice_type: str 时间戳支持: bool = False 支持情感与风格类型: Optional[str] = "" 支持语言类型: Optional[str] = "" def __repr__(self): data = self.__dict__ text = "" text += f"{data['音色名称']}" if data["语言"]: text += f"——{data['语言']}" if data["场景"]: text += f"——{data['场景']}" if data["支持情感与风格类型"]: text += f"——{data['支持情感与风格类型']}" if data["支持语言类型"]: text += f"——{data['支持语言类型']}" return text class Volcengine_long_TTS(Base_TTS): def get_name(self): return '火山引擎精品长语音' def __init__(self): self.useful_voice = self.get_data_map() self.emo_voice = { "擎苍": "BV701_streaming", "阳光青年": "BV123_streaming", "反卷青年": "BV120_streaming", "通用赘婿": "BV119_streaming", "古风少御": "BV115_streaming", "霸气青叔": "BV107_streaming", "质朴青年": "BV100_streaming", "温柔淑女": "BV104_streaming", "开朗青年": "BV004_streaming", "甜宠少御": "BV113_streaming", "儒雅青年": "BV102_streaming" } def get_data_map(self, filename="voice_list.xlsx"): path = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename) df = pd.read_excel(path) df.fillna('', inplace=True) useful_voice = {} for index, row in df.iterrows(): data = avaliable_voice_type() data.语言 = row['语言'] data.场景 = row['场景'] data.音色名称 = row['音色名称'] data.voice_type = row['voice_type'] data.时间戳支持 = row['时间戳'] data.支持情感与风格类型 = row['支持情感/风格类型'] data.支持语言类型 = row['支持语言类型'] useful_voice[str(data)] = data return useful_voice def update_dropdown(self, version): if version == "普通版(不支持情感预测)": voices = list(self.useful_voice.keys()) elif version == "情感预测版": voices = list(self.emo_voice.keys()) change = gr.Dropdown(choices=voices, value=voices[0]) return change def _get_config_page(self): with gr.Group(visible=False) as config_volcengine: voices = list(self.useful_voice.keys()) with gr.Row(): volcengine_appid = gr.Textbox(label="volcengine的appid(默认为环境变量值)", placeholder="请输入volcengine的appid", type="password", interactive=True, value=os.environ.get('VOLCENGINE_APPID', '')) volcengine_access_token = gr.Textbox(label="volcengine的access_token(默认为环境变量值)", placeholder="请输入volengine的access_token", type="password", interactive=True, value=os.environ.get('VOLCENGINE_ACCESS_TOKEN', '')) version = gr.Dropdown(choices=["普通版(不支持情感预测)", "情感预测版"], value="普通版(不支持情感预测)", label="使用版本", interactive=True) voice_type = gr.Dropdown(choices=voices, value=voices[0], label="音色选择", interactive=True) with gr.Row(): speed_ratio = gr.Slider(minimum=0.2, maximum=3, value=1, step=0.1, label="语速", interactive=True) volume_ratio = gr.Slider(minimum=0.1, maximum=3, value=1, step=0.1, label="音量", interactive=True) pitch_ratio = gr.Slider(minimum=0.1, maximum=3, value=1, step=0.1, label="音高", interactive=True) with gr.Row(): emotion = gr.Textbox(label="情感/风格(还未适配)", placeholder="请输入情感", interactive=True) language = gr.Textbox(label="语言类型(还未适配)", placeholder="请输入语言", interactive=True) version.change(self.update_dropdown, inputs=[version], outputs=[voice_type]) inputs = [ volcengine_appid, version, volcengine_access_token, voice_type, speed_ratio, volume_ratio, pitch_ratio ] return config_volcengine, inputs def _generate(self, text, appid, version, access_token, voice, speed_ratio, volume_ratio, pitch_ratio): host = "openspeech.bytedance.com" if version == "普通版(不支持情感预测)": submit_api_url = f"https://{host}/api/v1/tts_async/submit" query_api_url = f"https://{host}/api/v1/tts_async/query" header = {"Authorization": f"Bearer;{access_token}", "Resource-Id": "volc.tts_async.default"} voice_type = self.useful_voice[voice].voice_type elif version == "情感预测版": submit_api_url = f"https://{host}/api/v1/tts_async_with_emotion/submit" query_api_url = f"https://{host}/api/v1/tts_async_with_emotion/query" header = {"Authorization": f"Bearer;{access_token}", "Resource-Id": "volc.tts_async.emotion"} voice_type = self.emo_voice[voice] request_json = { "appid": appid, "format": "mp3", "reqid": str(uuid.uuid4()), "voice_type": voice_type, "speed": speed_ratio, "volume": volume_ratio, "pitch": pitch_ratio, "text": text } resp = requests.post(submit_api_url, data=json.dumps(request_json), headers=header) resp = resp.json() if resp['task_status'] != 0: raise Exception(resp) task_id = resp['task_id'] # 每隔1秒查询一次任务状态,设置超时时间为300秒 query_json = { "appid": appid, "task_id": task_id } start_time = time.time() while True: time.sleep(2) query_resp = requests.get(query_api_url, params=query_json, headers=header) query_resp = query_resp.json() if query_resp.get('task_status', 2) == 2: raise Exception(query_resp) elif query_resp.get('task_status') == 1: break if time.time() - start_time > 300: raise TimeoutError("等待任务超时") audio_url = query_resp['audio_url'] data = requests.get(audio_url).content original_audio = AudioSegment.from_mp3(io.BytesIO(data)) return original_audio