import openai import os import time import requests from langchain import OpenAI from langchain.chains import ConversationChain from langchain.memory import ConversationSummaryBufferMemory from langchain.chat_models import ChatOpenAI from transformers import pipeline openai.api_key = os.environ["OPENAI_API_KEY"] did_api_key = os.environ.get('DID_API_KEY') memory = ConversationSummaryBufferMemory( llm=ChatOpenAI(), max_token_limit=2048 ) conversation = ConversationChain( llm=OpenAI(max_tokens=2048, temperature=0.5), memory=memory, ) avatar_url = "https://cdn.discordapp.com/attachments/1065596492796153856/1095617463112187984/John_Carmack_Potrait_668a7a8d-1bb0-427d-8655-d32517f6583d.png" # 请求D-ID创建一段小视频 def generate_talk(input, avatar_url, voice_type="microsoft", voice_id="zh-CN-YunyeNeural", api_key=did_api_key): url = "https://api.d-id.com/talks" payload = { "script": { "type": "text", "provider": { "type": voice_type, "voice_id": voice_id }, "ssml": "false", "input": input }, "config": { "fluent": "false", "pad_audio": "0.0" }, "source_url": avatar_url } headers = { "accept": "application/json", "content-type": "application/json", "authorization": "Basic " + api_key } response = requests.post(url, json=payload, headers=headers) return response.json() # 获取生成的 Talk 视频 def get_a_talk(id, api_key=did_api_key): url = "https://api.d-id.com/talks/" + id headers = { "accept": "application/json", "authorization": "Basic "+api_key } response = requests.get(url, headers=headers) return response.json() # 播放视频 def get_mp4_video(input, avatar_url=avatar_url): response = generate_talk(input=input, avatar_url=avatar_url) talk = get_a_talk(response['id']) video_url = "" index = 0 while index < 30: index += 1 if 'result_url' in talk: video_url = talk['result_url'] return video_url else: time.sleep(1) talk = get_a_talk(response['id']) return video_url # 获取gpt res,发给did获取视频 def predict(input, history=[]): if input is not None: history.append(input) response = conversation.predict(input=input) video_url = get_mp4_video(input=response, avatar_url=avatar_url) video_html = f"""""" history.append(response) # ['用户输入1', '聊天机器人回复1', ...] ==> [('用户输入1', '聊天机器人回复1'), ...] responses = [(u, b) for u, b in zip(history[::2], history[1::2])] return responses, video_html, history else: video_html = f'John Carmack' responses = [(u, b) for u, b in zip(history[::2], history[1::2])] return responses, video_html, history # 通过openai whisper 语音识别 def transcribe(audio): os.rename(audio, audio + '.wav') audio_file = open(audio + '.wav', "rb") transcript = openai.Audio.transcribe( "whisper-1", audio_file, prompt="这是一段简体中文的问题。") return transcript['text'] # 通过openai whisper 语音识别 # def transcribe(audio): # os.rename(audio, audio + '.wav') # audio_file = open(audio + '.wav', "rb") # transcriber = pipeline(model="openai/whisper-medium", device=0) # result = transcriber(audio_file) # return result['text'] def process_audio(audio, history=[]): if audio is not None: text = transcribe(audio) return predict(text, history) else: text = None return predict(text, history)