import asyncio from datetime import datetime import aiohttp import pickle import pandas as pd from utils import normalize_audio_loudness import os from dotenv import load_dotenv from pymongo import MongoClient from bson import Binary, ObjectId import zlib # 尝试直接获取环境变量 BASE_URL = os.environ.get("BASE_URL") AUDIO_URL = os.environ.get("AUDIO_URL") MONGO_URI = os.environ.get("MONGO_URI") DATABASE_NAME = os.environ.get("DATABASE_NAME") COLLECTION_NAME = os.environ.get("COLLECTION_NAME") CREATE_COLLECTION = os.environ.get("CREATE_COLLECTION") # 如果直接获取不到,则从.env文件加载 if BASE_URL is None or AUDIO_URL is None or MONGO_URI is None or DATABASE_NAME is None or COLLECTION_NAME is None or CREATE_COLLECTION is None: print("从.env文件加载环境变量") load_dotenv() BASE_URL = os.getenv("BASE_URL") AUDIO_URL = os.getenv("AUDIO_URL") MONGO_URI = os.getenv("MONGO_URI") DATABASE_NAME = os.getenv("DATABASE_NAME") COLLECTION_NAME = os.getenv("COLLECTION_NAME") CREATE_COLLECTION = os.getenv("CREATE_COLLECTION") client = MongoClient(MONGO_URI) db = client[DATABASE_NAME] collection = db[COLLECTION_NAME] create_collection = db[CREATE_COLLECTION] async def generate_api(voice_ids, text): timeout = aiohttp.ClientTimeout(total=10) # 设置10秒的总超时时间 try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(BASE_URL+"tts", json={"ids": voice_ids, "text": text}) as response: if response.status == 200: # 读取响应内容 audio_data = await response.read() # print(type(audio_data)) audio_data = normalize_audio_loudness(audio_data) return audio_data else: print(response) return f"合成失败: {response.status}" except asyncio.TimeoutError: return "请求超时,请稍后重试" except aiohttp.ClientError as e: return f"网络错误: {str(e)}" async def get_audio(voice_id): url = AUDIO_URL + voice_id + ".ogg" try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: return await response.read() else: return f"获取音频失败: {response.status}" except asyncio.TimeoutError: return "请求超时,请稍后重试" except aiohttp.ClientError as e: return f"网络错误: {str(e)}" def load_characters_csv(lang): # 从MongoDB集合中获取数据 cursor = collection.find({"language": lang, "is_public": True}) # 将查询结果转换为列表 data = list(cursor) # 创建一个空的DataFrame df = pd.DataFrame(columns=["类别", "id", "名称", "情绪", "头像", "voice_id"]) # 遍历数据并填充DataFrame for item in data: df = pd.concat([df, pd.DataFrame({ "类别": [item["category"]], "id": [str(item["id"])], # 确保id是字符串类型 "名称": [item["name"]], "情绪": [item["emotion"]], "头像": [item["avatar"]], "voice_id": [item["voice_id"]] })], ignore_index=True) 指定顺序 = { "zh": ["原神", "崩坏星穹铁道", "绝区零", "鸣潮"], "en": ["Genshin Impact", "Honkai: Star Rail", "Zenless Zone Zero", "Wuthering Waves"], "ja": ["原神[げんしん", "崩壊:スターレイル", "ゼンレスゾーンゼロ", "Wuthering Waves"], "ko": ["원신", "붕괴: 스타레일", "젠레스 존 제로", "Wuthering Waves"] } 当前语言顺序 = 指定顺序.get(lang, 指定顺序["en"]) 其他类别 = sorted(set(df['类别'].unique()) - set(当前语言顺序)) unique_categories = 当前语言顺序 + 其他类别 return df, unique_categories async def generate_voice(avatar, name, emotion, tags, gender, audio_data, language): # 将图像数据转换为二进制 avatar_binary = zlib.compress(pickle.dumps(avatar)) # 将音频数据转换为二进制 audio_binary = zlib.compress(pickle.dumps(audio_data)) # 创建声音对象 voice = { "avatar": Binary(avatar_binary), "name": name, "emotion": emotion, "tags": tags, "gender": gender, "audio_data": Binary(audio_binary), "language": language, "create_at": datetime.now().isoformat(), "is_public": False, "is_reviewed": False } result = create_collection.insert_one(voice) return result.inserted_id