|
import asyncio
|
|
from datetime import datetime
|
|
import aiohttp
|
|
import pickle
|
|
|
|
import pandas as pd
|
|
|
|
from utils import normalize_audio_loudness
|
|
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from pymongo import MongoClient
|
|
from bson import Binary, ObjectId
|
|
import zlib
|
|
|
|
|
|
BASE_URL = os.environ.get("BASE_URL")
|
|
AUDIO_URL = os.environ.get("AUDIO_URL")
|
|
MONGO_URI = os.environ.get("MONGO_URI")
|
|
DATABASE_NAME = os.environ.get("DATABASE_NAME")
|
|
COLLECTION_NAME = os.environ.get("COLLECTION_NAME")
|
|
CREATE_COLLECTION = os.environ.get("CREATE_COLLECTION")
|
|
|
|
|
|
if BASE_URL is None or AUDIO_URL is None or MONGO_URI is None or DATABASE_NAME is None or COLLECTION_NAME is None or CREATE_COLLECTION is None:
|
|
print("从.env文件加载环境变量")
|
|
load_dotenv()
|
|
BASE_URL = os.getenv("BASE_URL")
|
|
AUDIO_URL = os.getenv("AUDIO_URL")
|
|
MONGO_URI = os.getenv("MONGO_URI")
|
|
DATABASE_NAME = os.getenv("DATABASE_NAME")
|
|
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
|
|
CREATE_COLLECTION = os.getenv("CREATE_COLLECTION")
|
|
|
|
client = MongoClient(MONGO_URI)
|
|
db = client[DATABASE_NAME]
|
|
collection = db[COLLECTION_NAME]
|
|
create_collection = db[CREATE_COLLECTION]
|
|
|
|
async def generate_api(voice_ids, text):
|
|
timeout = aiohttp.ClientTimeout(total=10)
|
|
try:
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.post(BASE_URL+"tts", json={"ids": voice_ids, "text": text}) as response:
|
|
if response.status == 200:
|
|
|
|
audio_data = await response.read()
|
|
|
|
audio_data = normalize_audio_loudness(audio_data)
|
|
return audio_data
|
|
else:
|
|
print(response)
|
|
return f"合成失败: {response.status}"
|
|
except asyncio.TimeoutError:
|
|
return "请求超时,请稍后重试"
|
|
except aiohttp.ClientError as e:
|
|
return f"网络错误: {str(e)}"
|
|
|
|
async def get_audio(voice_id):
|
|
url = AUDIO_URL + voice_id + ".ogg"
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
else:
|
|
return f"获取音频失败: {response.status}"
|
|
except asyncio.TimeoutError:
|
|
return "请求超时,请稍后重试"
|
|
except aiohttp.ClientError as e:
|
|
return f"网络错误: {str(e)}"
|
|
|
|
def load_characters_csv(lang):
|
|
|
|
cursor = collection.find({"language": lang, "is_public": True})
|
|
|
|
|
|
data = list(cursor)
|
|
|
|
|
|
df = pd.DataFrame(columns=["类别", "id", "名称", "情绪", "头像", "voice_id"])
|
|
|
|
|
|
for item in data:
|
|
df = pd.concat([df, pd.DataFrame({
|
|
"类别": [item["category"]],
|
|
"id": [str(item["id"])],
|
|
"名称": [item["name"]],
|
|
"情绪": [item["emotion"]],
|
|
"头像": [item["avatar"]],
|
|
"voice_id": [item["voice_id"]]
|
|
})], ignore_index=True)
|
|
|
|
指定顺序 = {
|
|
"zh": ["原神", "崩坏星穹铁道", "绝区零", "鸣潮"],
|
|
"en": ["Genshin Impact", "Honkai: Star Rail", "Zenless Zone Zero", "Wuthering Waves"],
|
|
"ja": ["原神[げんしん", "崩壊:スターレイル", "ゼンレスゾーンゼロ", "Wuthering Waves"],
|
|
"ko": ["원신", "붕괴: 스타레일", "젠레스 존 제로", "Wuthering Waves"]
|
|
}
|
|
当前语言顺序 = 指定顺序.get(lang, 指定顺序["en"])
|
|
其他类别 = sorted(set(df['类别'].unique()) - set(当前语言顺序))
|
|
unique_categories = 当前语言顺序 + 其他类别
|
|
|
|
return df, unique_categories
|
|
|
|
|
|
|
|
async def generate_voice(avatar, name, emotion, tags, gender, audio_data, language):
|
|
|
|
avatar_binary = zlib.compress(pickle.dumps(avatar))
|
|
|
|
audio_binary = zlib.compress(pickle.dumps(audio_data))
|
|
|
|
|
|
voice = {
|
|
"avatar": Binary(avatar_binary),
|
|
"name": name,
|
|
"emotion": emotion,
|
|
"tags": tags,
|
|
"gender": gender,
|
|
"audio_data": Binary(audio_binary),
|
|
"language": language,
|
|
"create_at": datetime.now().isoformat(),
|
|
"is_public": False,
|
|
"is_reviewed": False
|
|
}
|
|
result = create_collection.insert_one(voice)
|
|
return result.inserted_id
|
|
|