egtts / api.py
chaore's picture
Upload 5 files
6ad2a4c verified
"""
Edge TTS API 接口实现
提供文本转语音功能的API接口
"""
import edge_tts
import asyncio
import logging
from typing import Optional, Dict, Any
import tempfile
import os
import zipfile
import json
from pydub import AudioSegment
import io
import aiohttp
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EdgeTTSAPI:
def __init__(self):
# 中文语音
self.chinese_voices = [
"zh-CN-XiaoxiaoNeural", "zh-CN-XiaoyiNeural", "zh-CN-YunjianNeural",
"zh-CN-YunxiNeural", "zh-CN-YunxiaNeural", "zh-CN-YunyangNeural",
"zh-CN-liaoning-XiaobeiNeural", "zh-CN-shaanxi-XiaoniNeural",
"zh-HK-HiuGaaiNeural", "zh-HK-HiuMaanNeural", "zh-HK-WanLungNeural",
"zh-TW-HsiaoChenNeural", "zh-TW-YunJheNeural", "zh-TW-HsiaoYuNeural"
]
# 英文语音
self.english_voices = [
"en-US-AriaNeural", "en-US-GuyNeural", "en-US-JennyNeural",
"en-US-RogerNeural", "en-GB-SoniaNeural", "en-GB-RyanNeural"
]
# 日语语音
self.japanese_voices = [
"ja-JP-NanamiNeural", "ja-JP-KeitaNeural"
]
# 韩语语音
self.korean_voices = [
"ko-KR-SunHiNeural", "ko-KR-InJoonNeural"
]
# 其他语言语音
self.other_voices = [
"de-DE-KatjaNeural", "de-DE-ConradNeural",
"fr-FR-DeniseNeural", "fr-FR-HenriNeural",
"es-ES-ElviraNeural", "es-ES-AlvaroNeural"
]
# 合并所有语音
self.all_voices = (self.chinese_voices + self.english_voices +
self.japanese_voices + self.korean_voices +
self.other_voices)
def get_available_voices(self, language: Optional[str] = None) -> list:
"""
获取可用的语音列表
:param language: 语言代码,如 'zh', 'en', 'ja' 等,如果为None则返回所有语音
:return: 语音列表
"""
if language is None:
return self.all_voices
elif language == 'zh':
return self.chinese_voices
elif language == 'en':
return self.english_voices
elif language == 'ja':
return self.japanese_voices
elif language == 'ko':
return self.korean_voices
else:
# 根据语言代码筛选
return [voice for voice in self.all_voices if voice.startswith(language)]
async def text_to_speech(self,
text: str,
voice: str = "zh-CN-XiaoxiaoNeural",
rate: int = 0,
pitch: int = 0,
output_file: Optional[str] = None,
output_format: str = "mp3") -> Optional[str]:
"""
将文本转换为语音
:param text: 输入文本
:param voice: 选择的语音
:param rate: 语速调整 (-50 到 50)
:param pitch: 音调调整 (-50 到 50)
:param output_file: 输出文件路径,如果为None则创建临时文件
:param output_format: 输出格式,"mp3" 或 "wav"
:return: 输出文件路径,失败返回None
"""
if not text or not text.strip():
logger.error("输入文本为空")
return None
# 验证语音是否可用
if voice not in self.all_voices:
logger.warning(f"语音 {voice} 不在可用列表中,使用默认语音")
voice = "zh-CN-XiaoxiaoNeural"
# 创建输出文件路径
if output_file is None:
ext = f".{output_format}"
_, output_file = tempfile.mkstemp(suffix=ext)
else:
# 确保输出文件有正确的扩展名
if not output_file.lower().endswith((f'.{output_format}',)):
output_file += f'.{output_format}'
try:
# 设置语速和音调
rate_str = f"{rate:+d}%" if rate >= 0 else f"{rate:d}%"
pitch_str = f"{pitch:+d}Hz" if pitch >= 0 else f"{pitch:d}Hz"
#直使用使用纯文本,避免TTS读出XML标签
# Edge TTS会自动处理文本,不需要SSML包装
ssml_text = text
# 创建Communicate对象并保存音频
communicate = edge_tts.Communicate(ssml_text, voice)
temp_mp3 = output_file if output_format == "mp3" else output_file + ".mp3"
await communicate.save(temp_mp3)
# 如果需要转换格式
if output_format == "wav":
# 将MP3转换为WAV
audio = AudioSegment.from_mp3(temp_mp3)
audio.export(output_file, format="wav")
# 删除临时MP3文件
os.remove(temp_mp3)
else:
output_file = temp_mp3
logger.info(f"语音生成成功: {output_file}")
return output_file
except Exception as e:
logger.error(f"语音转换失败: {str(e)}")
# 如果生成失败,删除临时文件
if output_file and os.path.exists(output_file):
try:
os.remove(output_file)
except:
pass
return None
async def text_to_speech_hf_sync(self,
text: str,
voice: str = "zh-CN-XiaoxiaoNeural",
rate: int = 0,
pitch: int = 0,
output_file: Optional[str] = None,
output_format: str = "mp3") -> Optional[str]:
"""
同步版本的Hugging Face API文本转语音函数
通过异步包装实现同步调用
:param text: 输入文本
:param voice: 选择的语音
:param rate: 语速调整 (-50到50)
:param pitch:音调调整 (-50到 50)
:param output_file: 输出文件路径,如果为None则创建临时文件
:param output_format: 输出格式,"mp3" 或 "wav"
:return: 输出文件路径,失败返回None
"""
import asyncio
return asyncio.run(self.text_to_speech_hf(text, voice, rate, pitch, output_file, output_format))
async def text_to_speech_hf(self,
text: str,
voice: str = "zh-CN-XiaoxiaoNeural",
rate: int = 0,
pitch: int = 0,
output_file: Optional[str] = None,
output_format: str = "mp3") -> Optional[str]:
"""
使用Hugging Face Spaces API将文本转换为语音
:param text: 输入文本
:param voice: 选择的语音
:param rate: 语速调整 (-50 到 50)
:param pitch: 音调调整 (-50 到 50)
:param output_file: 输出文件路径,如果为None则创建临时文件
:param output_format: 输出格式,"mp3" 或 "wav"
:return: 输出文件路径,失败返回None
"""
if not text or not text.strip():
logger.error("输入文本为空")
return None
# 创建输出文件路径
if output_file is None:
ext = f".{output_format}"
_, output_file = tempfile.mkstemp(suffix=ext)
try:
# 使用正确的Hugging Face Spaces API
# 使用Gradio API端点
api_url = "https://chaore-ttsedge.hf.space/gradio_api"
# 构建请求数据
payload = {
"data": [text, voice, rate, pitch, output_format],
"event_data": None,
"fn_index": 0,
"session_hash": "abc123test"
}
async with aiohttp.ClientSession() as session:
headers = {
"Content-Type": "application/json",
"Origin": "https://chaore-ttsedge.hf.space",
"Referer": "https://chaore-ttsedge.hf.space/"
}
# 使用命名端点调用
predict_url = f"{api_url}/call/text_to_speech"
async with session.post(predict_url, json=payload, headers=headers) as response:
if response.status == 200:
result = await response.json()
if 'event_id' in result:
# 获取任务状态
event_id = result['event_id']
result_url = f"{api_url}/call/text_to_speech/{event_id}"
# 等待任务完成
import time
max_wait = 30 # 最大等待时间30秒
wait_time = 0
while wait_time < max_wait:
await asyncio.sleep(2)
wait_time += 2
async with session.get(result_url, headers=headers) as status_response:
if status_response.status == 200:
# 读取SSE流
async for line in status_response.content:
try:
line_str = line.decode('utf-8').strip()
if line_str.startswith('data:'):
data_content = line_str[5:].strip()
if data_content and data_content.lower() != 'null':
try:
json_data = json.loads(data_content)
#检查是否有音频数据
if isinstance(json_data, dict):
if 'data' in json_data and json_data['data']:
if len(json_data['data']) > 0:
potential_audio = json_data['data'][0]
if isinstance(potential_audio, str) and potential_audio.startswith('data:'):
#处理base64编码的音频数据
import base64
header, encoded = potential_audio.split(',', 1)
audio_data = base64.b64decode(encoded)
# 保存到输出文件
with open(output_file, 'wb') as f:
f.write(audio_data)
logger.info(f"从Hugging Face API获取语音成功: {output_file}")
return output_file
#检查是否完成
if 'status' in json_data and json_data['status'] == 'COMPLETE':
if 'data' in json_data and json_data['data']:
if len(json_data['data']) > 0:
result_item = json_data['data'][0]
if isinstance(result_item, str) and result_item.startswith('data:'):
import base64
header, encoded = result_item.split(',', 1)
audio_data = base64.b64decode(encoded)
with open(output_file, 'wb') as f:
f.write(audio_data)
logger.info(f"从Hugging Face API获取语音成功: {output_file}")
return output_file
break
elif 'status' in json_data and json_data['status'] == 'FAILED':
logger.error("Hugging Face API任务失败")
return None
except json.JSONDecodeError:
#处理纯文本数据
if data_content.startswith('data:audio/') or data_content.startswith('data:application/'):
import base64
header, encoded = data_content.split(',', 1)
audio_data = base64.b64decode(encoded)
with open(output_file, 'wb') as f:
f.write(audio_data)
logger.info(f"从Hugging Face API获取语音成功: {output_file}")
return output_file
except Exception as e:
continue
else:
logger.error("Hugging Face API任务超时")
return None
else:
logger.error(f"Hugging Face API返回格式错误: {result}")
return None
else:
logger.error(f"Hugging Face API请求失败: {response.status}")
logger.error(f"响应内容: {await response.text()}")
return None
except Exception as e:
logger.error(f"从Hugging Face API获取语音失败: {str(e)}")
# 如果生成失败,删除临时文件
if output_file and os.path.exists(output_file):
try:
os.remove(output_file)
except:
pass
return None
async def get_voice_info(self, voice: str) -> Optional[Dict[str, Any]]:
"""
获取语音信息
:param voice: 语音名称
:return: 语音信息字典
"""
if voice not in self.all_voices:
return None
lang_parts = voice.split('-')
language_code = f"{lang_parts[0]}-{lang_parts[1]}"
# 确定性别
gender = "Female"
if any(neural in voice.lower() for neural in ['guy', 'roger', 'ryan', 'keita', 'alvaro', 'conrad', 'henri', 'jake', 'eric', 'tony']):
gender = "Male"
return {
"name": voice,
"language": language_code,
"gender": gender,
"locale": f"{lang_parts[0]}-{lang_parts[1]}-{lang_parts[2] if len(lang_parts) > 2 else 'General'}"
}
async def batch_text_to_speech(self,
texts: list,
voice: str = "zh-CN-XiaoxiaoNeural",
rate: int = 0,
pitch: int = 0,
output_format: str = "mp3") -> list:
"""
批量将文本转换为语音
:param texts: 文本列表
:param voice: 选择的语音
:param rate: 语速调整
:param pitch: 音调调整
:param output_format: 输出格式
:return: 生成的音频文件路径列表
"""
results = []
for text in texts:
if text.strip(): # 只处理非空文本
audio_file = await self.text_to_speech(text, voice, rate, pitch, output_format=output_format)
results.append(audio_file)
else:
results.append(None)
return results
async def create_audio_project(self,
project_name: str,
segments: list,
voice: str = "zh-CN-XiaoxiaoNeural",
rate: int = 0,
pitch: int = 0,
output_format: str = "mp3") -> Optional[str]:
"""
创建音频项目,将多个文本片段合并为一个音频文件
:param project_name: 项目名称
:param segments: 包含文本和时间信息的片段列表,格式: [{"text": "文本", "delay": 毫秒}]
:param voice: 选择的语音
:param rate: 语速调整
:param pitch: 音调调整
:param output_format: 输出格式
:return: 生成的音频文件路径
"""
try:
# 创建临时目录存储各个片段
temp_dir = tempfile.mkdtemp()
segment_files = []
# 生成每个片段的音频
for i, segment in enumerate(segments):
text = segment.get("text", "")
if not text.strip():
continue
delay = segment.get("delay", 0) # 延迟时间(毫秒)
# 生成音频片段
segment_file = os.path.join(temp_dir, f"segment_{i}.{output_format}")
result = await self.text_to_speech(text, voice, rate, pitch, segment_file, output_format)
if result:
segment_files.append((result, delay))
if not segment_files:
logger.error("没有生成任何音频片段")
return None
# 合并音频片段
combined_audio = AudioSegment.empty()
for audio_file, delay in segment_files:
if delay > 0:
# 添加静音间隔
silence = AudioSegment.silent(duration=delay)
combined_audio += silence
# 添加音频片段
segment_audio = AudioSegment.from_file(audio_file, format=output_format)
combined_audio += segment_audio
# 生成最终输出文件
output_path = os.path.join(temp_dir, f"{project_name}.{output_format}")
combined_audio.export(output_path, format=output_format)
# 清理临时片段文件
for audio_file, _ in segment_files:
try:
os.remove(audio_file)
except:
pass
return output_path
except Exception as e:
logger.error(f"创建音频项目失败: {str(e)}")
return None
async def export_voice_settings(self) -> str:
"""
导出语音设置
:return: JSON格式的设置字符串
"""
settings = {
"chinese_voices": self.chinese_voices,
"english_voices": self.english_voices,
"japanese_voices": self.japanese_voices,
"korean_voices": self.korean_voices,
"other_voices": self.other_voices,
"all_voices": self.all_voices
}
return json.dumps(settings, ensure_ascii=False, indent=2)
# 创建全局API实例
tts_api = EdgeTTSAPI()
# 同步包装函数,方便在非异步环境中调用
def sync_text_to_speech(text: str, voice: str = "zh-CN-XiaoxiaoNeural",
rate: int = 0, pitch: int = 0, output_file: Optional[str] = None,
output_format: str = "mp3") -> Optional[str]:
"""
同步版本的文本转语音函数
"""
return asyncio.run(tts_api.text_to_speech(text, voice, rate, pitch, output_file, output_format))
def sync_text_to_speech_hf(text: str, voice: str = "zh-CN-XiaoxiaoNeural",
rate: int = 0, pitch: int = 0, output_file: Optional[str] = None,
output_format: str = "mp3") -> Optional[str]:
"""
同步版本的Hugging Face API文本转语音函数
"""
return asyncio.run(tts_api.text_to_speech_hf(text, voice, rate, pitch, output_file, output_format))