|
""" |
|
google voice service |
|
""" |
|
import json |
|
|
|
import openai |
|
|
|
from bridge.reply import Reply, ReplyType |
|
from common.log import logger |
|
from config import conf |
|
from voice.voice import Voice |
|
import requests |
|
from common import const |
|
import datetime, random |
|
|
|
class OpenaiVoice(Voice): |
|
def __init__(self): |
|
openai.api_key = conf().get("open_ai_api_key") |
|
|
|
def voiceToText(self, voice_file): |
|
logger.debug("[Openai] voice file name={}".format(voice_file)) |
|
try: |
|
file = open(voice_file, "rb") |
|
result = openai.Audio.transcribe("whisper-1", file) |
|
text = result["text"] |
|
reply = Reply(ReplyType.TEXT, text) |
|
logger.info("[Openai] voiceToText text={} voice file name={}".format(text, voice_file)) |
|
except Exception as e: |
|
reply = Reply(ReplyType.ERROR, "我暂时还无法听清您的语音,请稍后再试吧~") |
|
finally: |
|
return reply |
|
|
|
|
|
def textToVoice(self, text): |
|
try: |
|
api_base = conf().get("open_ai_api_base") or "https://api.openai.com/v1" |
|
url = f'{api_base}/audio/speech' |
|
headers = { |
|
'Authorization': 'Bearer ' + conf().get("open_ai_api_key"), |
|
'Content-Type': 'application/json' |
|
} |
|
data = { |
|
'model': conf().get("text_to_voice_model") or const.TTS_1, |
|
'input': text, |
|
'voice': conf().get("tts_voice_id") or "alloy" |
|
} |
|
response = requests.post(url, headers=headers, json=data) |
|
file_name = "tmp/" + datetime.datetime.now().strftime('%Y%m%d%H%M%S') + str(random.randint(0, 1000)) + ".mp3" |
|
logger.debug(f"[OPENAI] text_to_Voice file_name={file_name}, input={text}") |
|
with open(file_name, 'wb') as f: |
|
f.write(response.content) |
|
logger.info(f"[OPENAI] text_to_Voice success") |
|
reply = Reply(ReplyType.VOICE, file_name) |
|
except Exception as e: |
|
logger.error(e) |
|
reply = Reply(ReplyType.ERROR, "遇到了一点小问题,请稍后再问我吧") |
|
return reply |
|
|