File size: 4,121 Bytes
d20eb01
 
 
 
 
 
 
 
dc2726e
d20eb01
 
 
 
 
dc2726e
d20eb01
 
 
 
 
 
 
 
dc2726e
d20eb01
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc2726e
d20eb01
dc2726e
d20eb01
 
 
 
 
 
 
 
 
 
 
dc2726e
d20eb01
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import io
import os
from contextlib import closing

import boto3
import gradio as gr
import requests

from config.config import TALKING_HEAD_WIDTH, LOOPING_TALKING_HEAD_VIDEO_PATH
from utilities.audio import AZURE_VOICE_DATA, POLLY_VOICE_DATA
from utilities.polly_utils import NEURAL_ENGINE


def create_html_video(file_name, width):
    tmp_file = gr.File(LOOPING_TALKING_HEAD_VIDEO_PATH, visible=False)
    temp_file_url = "/file=" + tmp_file.value['name']
    html_video = f'<video width={width} height={width} autoplay muted loop><source src={temp_file_url} type="video/mp4" poster="Masahiro.png"></video>'
    return html_video

def update_talking_head(widget, state):
    if widget:
        state = widget

        video_html_talking_head = create_html_video(LOOPING_TALKING_HEAD_VIDEO_PATH, TALKING_HEAD_WIDTH)
        return state, video_html_talking_head
    else:
        # return state, create_html_video(LOOPING_TALKING_HEAD, "32")
        return None, "<pre></pre>"



def do_html_audio_speak(words_to_speak, polly_language):
    polly_client = boto3.Session(
        aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
        region_name=os.environ["AWS_DEFAULT_REGION"]
    ).client('polly')

    # voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Female")
    voice_id, language_code, engine = POLLY_VOICE_DATA.get_voice(polly_language, "Male")
    if not voice_id:
        # voice_id = "Joanna"
        voice_id = "Matthew"
        language_code = "en-US"
        engine = NEURAL_ENGINE
    response = polly_client.synthesize_speech(
        Text=words_to_speak,
        OutputFormat='mp3',
        VoiceId=voice_id,
        LanguageCode=language_code,
        Engine=engine
    )

    html_audio = '<pre>no audio</pre>'

    # Save the audio stream returned by Amazon Polly on Lambda's temp directory
    if "AudioStream" in response:
        with closing(response["AudioStream"]) as stream:
            # output = os.path.join("/tmp/", "speech.mp3")

            try:
                with open('assets/audios/tempfile.mp3', 'wb') as f:
                    f.write(stream.read())
                temp_aud_file = gr.File("assets/audios/tempfile.mp3")
                temp_aud_file_url = "/file=" + temp_aud_file.value['name']
                html_audio = f'<audio autoplay><source src={temp_aud_file_url} type="audio/mp3"></audio>'
            except IOError as error:
                # Could not write to file, exit gracefully
                print(error)
                return None, None
    else:
        # The response didn't contain audio data, exit gracefully
        print("Could not stream audio")
        return None, None

    return html_audio, "assets/audios/tempfile.mp3"




def do_html_video_speak(words_to_speak, azure_language):
    azure_voice = AZURE_VOICE_DATA.get_voice(azure_language, "Male")
    if not azure_voice:
        azure_voice = "en-US-ChristopherNeural"

    headers = {"Authorization": f"Bearer {os.environ['EXHUMAN_API_KEY']}"}
    body = {
        'bot_name': 'Masahiro',
        'bot_response': words_to_speak,
        'azure_voice': azure_voice,
        'azure_style': 'friendly',
        'animation_pipeline': 'high_speed',
    }
    api_endpoint = "https://api.exh.ai/animations/v1/generate_lipsync"
    res = requests.post(api_endpoint, json=body, headers=headers)
    print("res.status_code: ", res.status_code)

    html_video = '<pre>no video</pre>'
    if isinstance(res.content, bytes):
        response_stream = io.BytesIO(res.content)
        print("len(res.content)): ", len(res.content))

        with open('videos/tempfile.mp4', 'wb') as f:
            f.write(response_stream.read())
        temp_file = gr.File("videos/tempfile.mp4")
        temp_file_url = "/file=" + temp_file.value['name']
        html_video = f'<video width={TALKING_HEAD_WIDTH} height={TALKING_HEAD_WIDTH} autoplay><source src={temp_file_url} type="video/mp4" poster="Masahiro.png"></video>'
    else:
        print('video url unknown')
    return html_video, "videos/tempfile.mp4"