Spaces:
Runtime error
Runtime error
import gradio as gr | |
import openai | |
import requests | |
import csv | |
import uuid | |
import whisper | |
import azure.cognitiveservices.speech as speechsdk | |
import base64 | |
import os | |
from polly_utils import PollyVoiceData, NEURAL_ENGINE | |
from azure_utils import AzureVoiceData | |
POLLY_VOICE_DATA = PollyVoiceData() | |
AZURE_VOICE_DATA = AzureVoiceData() | |
WHISPER_DETECT_LANG = "Chinese (Mandarin)" | |
WHISPER_MODEL = whisper.load_model("tiny") | |
print("WHISPER_MODEL", WHISPER_MODEL) | |
LOOPING_TALKING_HEAD = "videos/Michelle.mp4" | |
TALKING_HEAD_WIDTH = "192" | |
MAX_TALKING_HEAD_TEXT_LENGTH = 100 | |
prompt_templates = {"Default ChatGPT": ""} | |
def get_empty_state(): | |
return {"total_tokens": 0, "messages": []} | |
def download_prompt_templates(): | |
url = "https://raw.githubusercontent.com/f/awesome-chatgpt-prompts/main/prompts.csv" | |
try: | |
response = requests.get(url) | |
reader = csv.reader(response.text.splitlines()) | |
next(reader) # skip the header row | |
for row in reader: | |
if len(row) >= 2: | |
act = row[0].strip('"') | |
prompt = row[1].strip('"') | |
prompt_templates[act] = prompt | |
except requests.exceptions.RequestException as e: | |
print(f"An error occurred while downloading prompt templates: {e}") | |
return | |
choices = list(prompt_templates.keys()) | |
choices = choices[:1] + sorted(choices[1:]) | |
return gr.update(value=choices[0], choices=choices) | |
def on_token_change(user_token): | |
openai.api_key = user_token | |
def on_type_change(type): | |
print(type) | |
def on_prompt_template_change(prompt_template): | |
if not isinstance(prompt_template, str): return | |
return prompt_templates[prompt_template] | |
def lan_detector(audio_file): | |
print('reading the audio file') | |
audio = whisper.load_audio(audio_file) | |
audio = whisper.pad_or_trim(audio) | |
mel = whisper.log_mel_spectrogram(audio).to(WHISPER_MODEL.device) | |
_, probs = WHISPER_MODEL.detect_language(mel) | |
if max(probs, key=probs.get) == 'en': | |
return True | |
return False | |
# UNCOMMENT TO USE WHISPER | |
def transcribe(aud_inp): | |
if aud_inp is None: | |
return "" | |
text = WHISPER_MODEL.transcribe(aud_inp) | |
print("result.text", text["text"]) | |
return text["text"] | |
def create_html_video(file_name, width): | |
temp_file_url = "/file=" + tmp_file.value['name'] | |
html_video = f'<video width={width} height={width} autoplay muted loop><source src={temp_file_url} type="video/mp4" poster="Michelle.png"></video>' | |
return html_video | |
def ToBase64(file): | |
with open(file, 'rb') as fileObj: | |
image_data = fileObj.read() | |
base64_data = base64.b64encode(image_data) | |
return base64_data.decode() | |
def do_html_audio_speak_azure(words_to_speak): | |
html_audio = '<pre>no audio</pre>' | |
speech_key=os.environ["SPEECH_KEY"] | |
service_region=os.environ["SERVICE_REGION"] | |
speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region) | |
# Note: the voice setting will not overwrite the voice element in input SSML. | |
speech_config.speech_synthesis_voice_name = "zh-CN-XiaoxiaoNeural" | |
# 设置输出的音频文件路径和文件名 | |
audio_config = speechsdk.audio.AudioOutputConfig(filename="audios/tempfile.mp3") | |
text = words_to_speak | |
# use the default speaker as audio output. | |
speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config) | |
result = speech_synthesizer.speak_text_async(text).get() | |
# Check result | |
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: | |
print("Speech synthesized for text [{}]".format(text)) | |
try: | |
temp_aud_file = gr.File("audios/tempfile.mp3") | |
temp_aud_file_url = "/file=" + temp_aud_file.value['name'] | |
html_audio = f'<audio autoplay><source src={temp_aud_file_url} type="audio/mp3"></audio>' | |
except IOError as error: | |
# Could not write to file, exit gracefully | |
print(error) | |
return None, None | |
elif result.reason == speechsdk.ResultReason.Canceled: | |
cancellation_details = result.cancellation_details | |
print("Speech synthesis canceled: {}".format(cancellation_details.reason)) | |
if cancellation_details.reason == speechsdk.CancellationReason.Error: | |
print("Error details: {}".format(cancellation_details.error_details)) | |
# The response didn't contain audio data, exit gracefully | |
print("Could not stream audio") | |
return None, None | |
return html_audio, "audios/tempfile.mp3" | |
def do_html_video_speak_sad_talker(temp_aud_file): | |
GRADIO_URL=os.environ["GRADIO_URL"] | |
img_data = ToBase64("images/Michelle.png") | |
audio_data = ToBase64(temp_aud_file) | |
response = requests.post(GRADIO_URL+"/run/sad_talker", json={ | |
"data": [ | |
"data:image/png;base64,"+img_data, | |
{"name":"audio.wav","data":"data:audio/wav;base64,"+audio_data}, | |
"crop", | |
False, | |
False, | |
] | |
},timeout=3000) | |
print(response.text) | |
res = response.json() | |
data = res["data"] | |
print(data) | |
video_rul = GRADIO_URL+"/file=" + data[0][0]['name'] | |
print(video_rul) | |
html_video = '<pre>no video</pre>' | |
# with open('videos/tempfile.mp4', 'wb') as f: | |
# f.write(response_stream.read()) | |
# temp_file = gr.File("videos/tempfile.mp4") | |
# temp_file_url = "/file=" + temp_file.value['name'] | |
temp_file_url=video_rul | |
html_video = f'<video width={TALKING_HEAD_WIDTH} height={TALKING_HEAD_WIDTH} autoplay><source src={temp_file_url} type="video/mp4" poster="Michelle.png"></video>' | |
return html_video, "videos/tempfile.mp4" | |
def submit_message(type_select,user_token, prompt, prompt_template, temperature, max_tokens, context_length, state): | |
print(type_select) | |
history = state['messages'] | |
if not prompt: | |
return gr.update(value=''), [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], f"Total tokens used: {state['total_tokens']}", state | |
prompt_template = prompt_templates[prompt_template] | |
system_prompt = [] | |
if prompt_template: | |
system_prompt = [{ "role": "system", "content": prompt_template }] | |
prompt_msg = { "role": "user", "content": prompt } | |
if not type_select: | |
history.append(prompt_msg) | |
history.append({ | |
"role": "system", | |
"content": "Error: Type is not set." | |
}) | |
return '', [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], f"Total tokens used: 0", state | |
if not user_token: | |
history.append(prompt_msg) | |
history.append({ | |
"role": "system", | |
"content": "Error: OpenAI API Key is not set." | |
}) | |
return '', [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], f"Total tokens used: 0", state | |
html_video, temp_file, html_audio, temp_aud_file = None, None, None, None | |
try: | |
if type_select=='TEXT': | |
text_history = [x for x in history if x['role'] != 'image' ] | |
print(text_history) | |
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=system_prompt + text_history[-context_length*2:] + [prompt_msg], temperature=temperature, max_tokens=max_tokens) | |
print(prompt_msg,completion.choices[0].message.to_dict()) | |
history.append(prompt_msg) | |
history.append(completion.choices[0].message.to_dict()) | |
state['total_tokens'] += completion['usage']['total_tokens'] | |
answer = completion.choices[0].message.to_dict()["content"] | |
if len(answer) <= MAX_TALKING_HEAD_TEXT_LENGTH: | |
# html_video, temp_file = do_html_video_speak(output, translate_to) | |
html_audio, temp_aud_file = do_html_audio_speak_azure(answer) | |
try: | |
html_video, temp_file = do_html_video_speak_sad_talker(temp_aud_file) | |
html_audio = None | |
except Exception as e: | |
temp_file = LOOPING_TALKING_HEAD | |
html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) | |
else: | |
temp_file = LOOPING_TALKING_HEAD | |
html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) | |
html_audio, temp_aud_file = do_html_audio_speak_azure(answer) | |
elif type_select=='IMAGE': | |
response = openai.Image.create( | |
prompt=prompt, | |
n=1, | |
size="512x512" | |
) | |
print("image result ",response) | |
image_url = response['data'][0]['url'] | |
history.append({ "role": "image", "content": prompt }) | |
history.append({ "role": "image", "content": image_url }) | |
state['total_tokens'] += 0 | |
temp_file = LOOPING_TALKING_HEAD | |
html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) | |
except Exception as e: | |
# history.append(prompt_msg) | |
history.append({ | |
"role": "system", | |
"content": f"Error: {e}" | |
}) | |
total_tokens_used_msg = f"Total tokens used: {state['total_tokens']}" | |
chat_messages = [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)] | |
print(1,chat_messages) | |
chat_messages=[] | |
for i in range(0, len(history)-1, 2): | |
print(history[i]) | |
if(history[i]['role'] == 'image'): | |
picture_name = str(uuid.uuid1())+'.png' | |
reponse = requests.get(history[i+1]['content']) | |
with open('/home/user/app/'+picture_name,'wb') as f: | |
f.write(reponse.content) | |
image_his = {'name': '/home/user/app/'+picture_name, 'mime_type': 'image/png', 'alt_text': None, 'data': None, 'is_file': True} | |
chat_messages.append((history[i]['content'],image_his)) | |
else: | |
chat_messages.append((history[i]['content'], history[i+1]['content'])) | |
print(2,chat_messages) | |
return '', chat_messages, total_tokens_used_msg, state, html_video, temp_file, html_audio, temp_aud_file | |
def clear_conversation(): | |
return gr.update(value=None, visible=True), None, "", get_empty_state() | |
css = """ | |
#col-container {max-width: 80%; margin-left: auto; margin-right: auto;} | |
#chatbox {min-height: 400px;} | |
#header {text-align: center;} | |
#prompt_template_preview {padding: 1em; border-width: 1px; border-style: solid; border-color: #e0e0e0; border-radius: 4px;} | |
#total_tokens_str {text-align: right; font-size: 0.8em; color: #666;} | |
#label {font-size: 0.8em; padding: 0.5em; margin: 0;} | |
.message { font-size: 1.2em; } | |
""" | |
with gr.Blocks(css=css) as demo: | |
state = gr.State(get_empty_state()) | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown("""## OpenAI ChatGPT chat | |
Using the ofiicial API (gpt-3.5-turbo model) | |
""", | |
elem_id="header") | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=TALKING_HEAD_WIDTH, visible=True): | |
# speak_text_cb = gr.Checkbox(label="Enable speech", value=False) | |
# speak_text_cb.change(update_foo, inputs=[speak_text_cb, speak_text_state], | |
# outputs=[speak_text_state]) | |
my_file = gr.File(label="Upload a file", type="file", visible=False) | |
tmp_file = gr.File(LOOPING_TALKING_HEAD, visible=False) | |
# tmp_file_url = "/file=" + tmp_file.value['name'] | |
htm_video = create_html_video(LOOPING_TALKING_HEAD, TALKING_HEAD_WIDTH) | |
video_html = gr.HTML(htm_video) | |
# my_aud_file = gr.File(label="Audio file", type="file", visible=True) | |
tmp_aud_file = gr.File("audios/tempfile.mp3", visible=False) | |
tmp_aud_file_url = "/file=" + tmp_aud_file.value['name'] | |
htm_audio = f'<audio><source src={tmp_aud_file_url} type="audio/mp3"></audio>' | |
audio_html = gr.HTML(htm_audio) | |
with gr.Column(scale=6): | |
chatbot = gr.Chatbot(elem_id="chatbox") | |
with gr.Column(scale=3): | |
gr.Markdown("Enter your OpenAI API Key. You can get one [here](https://platform.openai.com/account/api-keys).", elem_id="label") | |
user_token = gr.Textbox(value='', placeholder="OpenAI API Key", type="password", show_label=False) | |
prompt_template = gr.Dropdown(label="Set a custom insruction for the chatbot:", choices=list(prompt_templates.keys())) | |
prompt_template_preview = gr.Markdown(elem_id="prompt_template_preview") | |
with gr.Accordion("Advanced parameters", open=False): | |
temperature = gr.Slider(minimum=0, maximum=2.0, value=0.7, step=0.1, label="Temperature", info="Higher = more creative/chaotic") | |
max_tokens = gr.Slider(minimum=100, maximum=4096, value=1000, step=1, label="Max tokens per response") | |
context_length = gr.Slider(minimum=1, maximum=10, value=2, step=1, label="Context length", info="Number of previous messages to send to the chatbot. Be careful with high values, it can blow up the token budget quickly.") | |
with gr.Row(): | |
with gr.Column(min_width=TALKING_HEAD_WIDTH, visible=True): | |
type_select = gr.Dropdown(show_label=False, choices= ["TEXT", "IMAGE"],value="TEXT",interactive=True) | |
with gr.Column(scale=6): | |
input_message = gr.Textbox(show_label=False, placeholder="Enter text and press enter", visible=True).style(container=False) | |
with gr.Column(scale=3): | |
btn_submit = gr.Button("Submit") | |
total_tokens_str = gr.Markdown(elem_id="total_tokens_str") | |
with gr.Row(): | |
audio_comp = gr.Microphone(source="microphone", type="filepath", label="Just say it!", | |
interactive=True, streaming=False) | |
audio_comp.change(transcribe, inputs=[audio_comp], outputs=[input_message]) | |
with gr.Row(): | |
btn_clear_conversation = gr.Button("🔃 Start New Conversation") | |
# gr.HTML('''<br><br><br><center>You can duplicate this Space to skip the queue:<a href="https://huggingface.co/spaces/anzorq/chatgpt-demo?duplicate=true"><img src="https://bit.ly/3gLdBN6" alt="Duplicate Space"></a><br> | |
# <p><img src="https://visitor-badge.glitch.me/badge?page_id=anzorq.chatgpt_api_demo_hf" alt="visitors"></p></center>''') | |
type_select.change(on_type_change,inputs=[type_select], outputs=[]) | |
btn_submit.click(submit_message, [type_select,user_token, input_message, prompt_template, temperature, max_tokens, context_length, state], [input_message, chatbot, total_tokens_str, state, video_html, my_file, audio_html, tmp_aud_file]) | |
input_message.submit(submit_message, [type_select,user_token, input_message, prompt_template, temperature, max_tokens, context_length, state], [input_message, chatbot, total_tokens_str, state, video_html, my_file, audio_html, tmp_aud_file]) | |
btn_clear_conversation.click(clear_conversation, [], [input_message, chatbot, total_tokens_str, state]) | |
prompt_template.change(on_prompt_template_change, inputs=[prompt_template], outputs=[prompt_template_preview]) | |
user_token.change(on_token_change, inputs=[user_token], outputs=[]) | |
demo.load(download_prompt_templates, inputs=None, outputs=[prompt_template], queur=False) | |
demo.queue(concurrency_count=10) | |
demo.launch( | |
# auth=("admin", "IBTGeE3NrPsrViDI"), | |
height='800px') |