import gradio as gr import openai import requests import csv import uuid import whisper import azure.cognitiveservices.speech as speechsdk import base64 import os from polly_utils import PollyVoiceData, NEURAL_ENGINE from azure_utils import AzureVoiceData POLLY_VOICE_DATA = PollyVoiceData() AZURE_VOICE_DATA = AzureVoiceData() WHISPER_DETECT_LANG = "Chinese (Mandarin)" WHISPER_MODEL = whisper.load_model("tiny") print("WHISPER_MODEL", WHISPER_MODEL) LOOPING_TALKING_HEAD = "videos/Michelle.mp4" TALKING_HEAD_WIDTH = "192" MAX_TALKING_HEAD_TEXT_LENGTH = 100 prompt_templates = {"Default ChatGPT": ""} def get_empty_state(): return {"total_tokens": 0, "messages": []} def download_prompt_templates(): url = "https://raw.githubusercontent.com/f/awesome-chatgpt-prompts/main/prompts.csv" try: response = requests.get(url) reader = csv.reader(response.text.splitlines()) next(reader) # skip the header row for row in reader: if len(row) >= 2: act = row[0].strip('"') prompt = row[1].strip('"') prompt_templates[act] = prompt except requests.exceptions.RequestException as e: print(f"An error occurred while downloading prompt templates: {e}") return choices = list(prompt_templates.keys()) choices = choices[:1] + sorted(choices[1:]) return gr.update(value=choices[0], choices=choices) def on_token_change(user_token): openai.api_key = user_token def on_type_change(type): print(type) def on_prompt_template_change(prompt_template): if not isinstance(prompt_template, str): return return prompt_templates[prompt_template] def lan_detector(audio_file): print('reading the audio file') audio = whisper.load_audio(audio_file) audio = whisper.pad_or_trim(audio) mel = whisper.log_mel_spectrogram(audio).to(WHISPER_MODEL.device) _, probs = WHISPER_MODEL.detect_language(mel) if max(probs, key=probs.get) == 'en': return True return False # UNCOMMENT TO USE WHISPER def transcribe(aud_inp): if aud_inp is None: return "" text = WHISPER_MODEL.transcribe(aud_inp) print("result.text", text["text"]) return text["text"] def create_html_video(file_name, width): temp_file_url = "/file=" + tmp_file.value['name'] html_video = f'' return html_video def ToBase64(file): with open(file, 'rb') as fileObj: image_data = fileObj.read() base64_data = base64.b64encode(image_data) return base64_data.decode() def do_html_audio_speak_azure(words_to_speak): html_audio = '
no audio
' speech_key=os.environ["SPEECH_KEY"] service_region=os.environ["SERVICE_REGION"] speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region) # Note: the voice setting will not overwrite the voice element in input SSML. speech_config.speech_synthesis_voice_name = "zh-CN-XiaoxiaoNeural" # 设置输出的音频文件路径和文件名 audio_config = speechsdk.audio.AudioOutputConfig(filename="audios/tempfile.mp3") text = words_to_speak # use the default speaker as audio output. speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config) result = speech_synthesizer.speak_text_async(text).get() # Check result if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: print("Speech synthesized for text [{}]".format(text)) try: temp_aud_file = gr.File("audios/tempfile.mp3") temp_aud_file_url = "/file=" + temp_aud_file.value['name'] html_audio = f'' except IOError as error: # Could not write to file, exit gracefully print(error) return None, None elif result.reason == speechsdk.ResultReason.Canceled: cancellation_details = result.cancellation_details print("Speech synthesis canceled: {}".format(cancellation_details.reason)) if cancellation_details.reason == speechsdk.CancellationReason.Error: print("Error details: {}".format(cancellation_details.error_details)) # The response didn't contain audio data, exit gracefully print("Could not stream audio") return None, None return html_audio, "audios/tempfile.mp3" def do_html_video_speak_sad_talker(temp_aud_file): GRADIO_URL=os.environ["GRADIO_URL"] img_data = ToBase64("images/Michelle.png") audio_data = ToBase64(temp_aud_file) response = requests.post(GRADIO_URL+"/run/sad_talker", json={ "data": [ "data:image/png;base64,"+img_data, {"name":"audio.wav","data":"data:audio/wav;base64,"+audio_data}, "crop", False, False, ] },timeout=3000) print(response.text) res = response.json() data = res["data"] print(data) video_rul = GRADIO_URL+"/file=" + data[0][0]['name'] print(video_rul) html_video = '
no video
' # with open('videos/tempfile.mp4', 'wb') as f: # f.write(response_stream.read()) # temp_file = gr.File("videos/tempfile.mp4") # temp_file_url = "/file=" + temp_file.value['name'] temp_file_url=video_rul html_video = f'' return html_video, "videos/tempfile.mp4" def submit_message(type_select,user_token, prompt, prompt_template, temperature, max_tokens, context_length, state): # print(type_select,user_token, prompt, prompt_template, temperature, max_tokens, context_length, state) history = state['messages'] if not prompt: return gr.update(value=''), [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], f"Total tokens used: {state['total_tokens']}", state prompt_template = prompt_templates[prompt_template] system_prompt = [] if prompt_template: system_prompt = [{ "role": "system", "content": prompt_template }] prompt_msg = { "role": "user", "content": prompt } if not type_select: history.append(prompt_msg) history.append({ "role": "system", "content": "Error: Type is not set." }) return '', [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], f"Total tokens used: 0", state if not user_token: history.append(prompt_msg) history.append({ "role": "system", "content": "Error: OpenAI API Key is not set." }) return '', [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], f"Total tokens used: 0", state else: openai.api_key = user_token html_video, temp_file, html_audio, temp_aud_file = None, None, None, None try: if type_select=='TEXT': text_history = [x for x in history if x['role'] != 'image' ] print(text_history) completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=system_prompt + text_history[-context_length*2:] + [prompt_msg], temperature=temperature, max_tokens=max_tokens) print(prompt_msg,completion.choices[0].message.to_dict()) history.append(prompt_msg) history.append(completion.choices[0].message.to_dict()) state['total_tokens'] += completion['usage']['total_tokens'] answer = completion.choices[0].message.to_dict()["content"] if len(answer) <= MAX_TALKING_HEAD_TEXT_LENGTH: # html_video, temp_file = do_html_video_speak(output, translate_to) html_audio, temp_aud_file = do_html_audio_speak_azure(answer) try: html_video, temp_file = do_html_video_speak_sad_talker(temp_aud_file) html_audio = None except Exception as e: temp_file = LOOPING_TALKING_HEAD html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) else: temp_file = LOOPING_TALKING_HEAD html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) html_audio, temp_aud_file = do_html_audio_speak_azure(answer) elif type_select=='IMAGE': response = openai.Image.create( prompt=prompt, n=1, size="512x512" ) print("image result ",response) image_url = response['data'][0]['url'] history.append({ "role": "image", "content": prompt }) history.append({ "role": "image", "content": image_url }) state['total_tokens'] += 0 temp_file = LOOPING_TALKING_HEAD html_video = create_html_video(temp_file, TALKING_HEAD_WIDTH) except Exception as e: # history.append(prompt_msg) history.append({ "role": "system", "content": f"Error: {e}" }) total_tokens_used_msg = f"Total tokens used: {state['total_tokens']}" chat_messages = [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)] print(1,chat_messages) chat_messages=[] for i in range(0, len(history)-1, 2): print(history[i]) if(history[i]['role'] == 'image'): picture_name = str(uuid.uuid1())+'.png' reponse = requests.get(history[i+1]['content']) with open('/home/user/app/'+picture_name,'wb') as f: f.write(reponse.content) image_his = {'name': '/home/user/app/'+picture_name, 'mime_type': 'image/png', 'alt_text': None, 'data': None, 'is_file': True} chat_messages.append((history[i]['content'],image_his)) else: chat_messages.append((history[i]['content'], history[i+1]['content'])) print(2,chat_messages) return '', chat_messages, total_tokens_used_msg, state, html_video, temp_file, html_audio, temp_aud_file def clear_conversation(): return gr.update(value=None, visible=True), None, "", get_empty_state() css = """ #col-container {max-width: 80%; margin-left: auto; margin-right: auto;} #chatbox {min-height: 400px;} #header {text-align: center;} #prompt_template_preview {padding: 1em; border-width: 1px; border-style: solid; border-color: #e0e0e0; border-radius: 4px;} #total_tokens_str {text-align: right; font-size: 0.8em; color: #666;} #label {font-size: 0.8em; padding: 0.5em; margin: 0;} .message { font-size: 1.2em; } """ with gr.Blocks(css=css) as demo: state = gr.State(get_empty_state()) with gr.Column(elem_id="col-container"): gr.Markdown("""## OpenAI ChatGPT chat Using the ofiicial API (gpt-3.5-turbo model) """, elem_id="header") with gr.Row(): with gr.Column(scale=1, min_width=TALKING_HEAD_WIDTH, visible=True): # speak_text_cb = gr.Checkbox(label="Enable speech", value=False) # speak_text_cb.change(update_foo, inputs=[speak_text_cb, speak_text_state], # outputs=[speak_text_state]) my_file = gr.File(label="Upload a file", type="file", visible=False) tmp_file = gr.File(LOOPING_TALKING_HEAD, visible=False) # tmp_file_url = "/file=" + tmp_file.value['name'] htm_video = create_html_video(LOOPING_TALKING_HEAD, TALKING_HEAD_WIDTH) video_html = gr.HTML(htm_video) # my_aud_file = gr.File(label="Audio file", type="file", visible=True) tmp_aud_file = gr.File("audios/tempfile.mp3", visible=False) tmp_aud_file_url = "/file=" + tmp_aud_file.value['name'] htm_audio = f'' audio_html = gr.HTML(htm_audio) with gr.Column(scale=6): chatbot = gr.Chatbot(elem_id="chatbox") with gr.Column(scale=3): gr.Markdown("Enter your OpenAI API Key. You can get one [here](https://platform.openai.com/account/api-keys).", elem_id="label") user_token = gr.Textbox(value='', placeholder="OpenAI API Key", type="password", show_label=False) prompt_template = gr.Dropdown(label="Set a custom insruction for the chatbot:", choices=list(prompt_templates.keys())) prompt_template_preview = gr.Markdown(elem_id="prompt_template_preview") with gr.Accordion("Advanced parameters", open=False): temperature = gr.Slider(minimum=0, maximum=2.0, value=0.7, step=0.1, label="Temperature", info="Higher = more creative/chaotic") max_tokens = gr.Slider(minimum=100, maximum=4096, value=1000, step=1, label="Max tokens per response") context_length = gr.Slider(minimum=1, maximum=10, value=2, step=1, label="Context length", info="Number of previous messages to send to the chatbot. Be careful with high values, it can blow up the token budget quickly.") with gr.Row(): with gr.Column(min_width=TALKING_HEAD_WIDTH, visible=True): type_select = gr.Dropdown(show_label=False, choices= ["TEXT", "IMAGE"],value="TEXT",interactive=True) with gr.Column(scale=6): input_message = gr.Textbox(show_label=False, placeholder="Enter text and press enter", visible=True).style(container=False) with gr.Column(scale=3): btn_submit = gr.Button("Submit") total_tokens_str = gr.Markdown(elem_id="total_tokens_str") with gr.Row(): audio_comp = gr.Microphone(source="microphone", type="filepath", label="Just say it!", interactive=True, streaming=False) audio_comp.change(transcribe, inputs=[audio_comp], outputs=[input_message], api_name='audio_comp') with gr.Row(): btn_clear_conversation = gr.Button("🔃 Start New Conversation") # gr.HTML('''


You can duplicate this Space to skip the queue:Duplicate Space
#

visitors

''') type_select.change(on_type_change,inputs=[type_select], outputs=[]) btn_submit.click(submit_message, [type_select,user_token, input_message, prompt_template, temperature, max_tokens, context_length, state], [input_message, chatbot, total_tokens_str, state, video_html, my_file, audio_html, tmp_aud_file], api_name='submit_message') input_message.submit(submit_message, [type_select,user_token, input_message, prompt_template, temperature, max_tokens, context_length, state], [input_message, chatbot, total_tokens_str, state, video_html, my_file, audio_html, tmp_aud_file]) btn_clear_conversation.click(clear_conversation, [], [input_message, chatbot, total_tokens_str, state], api_name='clear_conversation') prompt_template.change(on_prompt_template_change, inputs=[prompt_template], outputs=[prompt_template_preview]) user_token.change(on_token_change, inputs=[user_token], outputs=[]) demo.load(download_prompt_templates, inputs=None, outputs=[prompt_template], queur=False) demo.queue(concurrency_count=10) demo.launch( # auth=("admin", "IBTGeE3NrPsrViDI"), height='800px')