import openai
import os
import time
import requests
from langchain import OpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationSummaryBufferMemory
from langchain.chat_models import ChatOpenAI
from transformers import pipeline
openai.api_key = os.environ["OPENAI_API_KEY"]
did_api_key = os.environ.get('DID_API_KEY')
memory = ConversationSummaryBufferMemory(
llm=ChatOpenAI(),
max_token_limit=2048
)
conversation = ConversationChain(
llm=OpenAI(max_tokens=2048, temperature=0.5),
memory=memory,
)
avatar_url = "https://cdn.discordapp.com/attachments/1065596492796153856/1095617463112187984/John_Carmack_Potrait_668a7a8d-1bb0-427d-8655-d32517f6583d.png"
# 请求D-ID创建一段小视频
def generate_talk(input, avatar_url, voice_type="microsoft", voice_id="zh-CN-YunyeNeural", api_key=did_api_key):
url = "https://api.d-id.com/talks"
payload = {
"script": {
"type": "text",
"provider": {
"type": voice_type,
"voice_id": voice_id
},
"ssml": "false",
"input": input
},
"config": {
"fluent": "false",
"pad_audio": "0.0"
},
"source_url": avatar_url
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": "Basic " + api_key
}
response = requests.post(url, json=payload, headers=headers)
return response.json()
# 获取生成的 Talk 视频
def get_a_talk(id, api_key=did_api_key):
url = "https://api.d-id.com/talks/" + id
headers = {
"accept": "application/json",
"authorization": "Basic "+api_key
}
response = requests.get(url, headers=headers)
return response.json()
# 播放视频
def get_mp4_video(input, avatar_url=avatar_url):
response = generate_talk(input=input, avatar_url=avatar_url)
talk = get_a_talk(response['id'])
video_url = ""
index = 0
while index < 30:
index += 1
if 'result_url' in talk:
video_url = talk['result_url']
return video_url
else:
time.sleep(1)
talk = get_a_talk(response['id'])
return video_url
# 获取gpt res,发给did获取视频
def predict(input, history=[]):
if input is not None:
history.append(input)
response = conversation.predict(input=input)
video_url = get_mp4_video(input=response, avatar_url=avatar_url)
video_html = f""""""
history.append(response)
# ['用户输入1', '聊天机器人回复1', ...] ==> [('用户输入1', '聊天机器人回复1'), ...]
responses = [(u, b) for u, b in zip(history[::2], history[1::2])]
return responses, video_html, history
else:
video_html = f''
responses = [(u, b) for u, b in zip(history[::2], history[1::2])]
return responses, video_html, history
# 通过openai whisper 语音识别
def transcribe(audio):
os.rename(audio, audio + '.wav')
audio_file = open(audio + '.wav', "rb")
transcript = openai.Audio.transcribe(
"whisper-1", audio_file, prompt="这是一段简体中文的问题。")
return transcript['text']
# 通过openai whisper 语音识别
# def transcribe(audio):
# os.rename(audio, audio + '.wav')
# audio_file = open(audio + '.wav', "rb")
# transcriber = pipeline(model="openai/whisper-medium", device=0)
# result = transcriber(audio_file)
# return result['text']
def process_audio(audio, history=[]):
if audio is not None:
text = transcribe(audio)
return predict(text, history)
else:
text = None
return predict(text, history)