File size: 5,747 Bytes
4904f3e
162d5c8
 
 
 
 
cf5e7f4
162d5c8
 
62a21bd
cf5e7f4
72e4889
62a21bd
 
162d5c8
 
62a21bd
d91a673
62a21bd
e2846c4
09ede70
e2846c4
72e4889
aec6f97
e2846c4
149eeaf
e2846c4
149eeaf
e2846c4
ad67495
162d5c8
 
9ed41df
 
cf5e7f4
62a21bd
149eeaf
 
28b5e08
 
 
cf5e7f4
4904f3e
 
 
 
 
cf5e7f4
4904f3e
 
 
 
cf5e7f4
 
72e4889
aec6f97
 
cf5e7f4
 
7925882
 
cf5e7f4
 
 
 
 
 
 
9ed41df
62a21bd
 
cf5e7f4
162d5c8
 
 
 
 
62a21bd
e2846c4
 
62a21bd
e2846c4
62a21bd
 
 
 
 
 
 
 
 
 
 
 
72e4889
62a21bd
 
 
162d5c8
 
d91a673
 
 
 
 
 
a642a9f
 
28b5e08
a642a9f
 
 
d91a673
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162d5c8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
from typing import List
import av
import asyncio
from collections import deque
import threading
import cv2

import numpy as np
import ray
from ray.util.queue import Queue
from app_interface_actor import AppInterfaceActor
import pydub
import torch

class StreamlitAVQueue:
    def __init__(self, audio_bit_rate=16000):
        self._output_channels = 2
        self._audio_bit_rate = audio_bit_rate
        self._listening = True
        self._looking = False
        self._lock = threading.Lock()
        self.app_interface_actor = AppInterfaceActor.get_singleton()
        self._video_output_frame = None 
        
    def set_looking_listening(self, looking, listening: bool):
        with self._lock:
            self._looking = looking
            self._listening = listening
            
    async def queued_video_frames_callback(
                self,
                frames: List[av.VideoFrame],
            ) -> av.VideoFrame:
        updated_frames = []
        try:
            with self._lock:
                should_look = self._looking
            video_output_frames = await self.app_interface_actor.dequeue_video_output_frames_async.remote()
            if len(video_output_frames) > 0:
                self._video_output_frame = video_output_frames[-1]
            for i, frame in enumerate(frames):
                
                # supress the ffmpeg warning
                saved_stderr_fd = os.dup(2)
                stderr_fd = os.open(os.devnull, os.O_WRONLY)
                os.dup2(stderr_fd, 2)
                user_image = frame.to_ndarray(format="rgb24")
                os.dup2(saved_stderr_fd, 2)
                os.close(stderr_fd)
                os.close(saved_stderr_fd)

                if should_look:
                    shared_tensor_ref = ray.put(user_image)
                    await self.app_interface_actor.enqueue_video_input_frame.remote(shared_tensor_ref)
                if self._video_output_frame is not None:
                    frame = self._video_output_frame
                    # resize user image to 1/4 size
                    user_frame = cv2.resize(user_image, (user_image.shape[1]//4, user_image.shape[0]//4), interpolation=cv2.INTER_AREA)
                    # flip horizontally
                    user_frame = cv2.flip(user_frame, 1)
                    x_user = 0
                    y_user = frame.shape[0] - user_frame.shape[0]
                    final_frame = frame.copy()
                    final_frame[y_user:y_user+user_frame.shape[0], x_user:x_user+user_frame.shape[1]] = user_frame
                    frame = av.VideoFrame.from_ndarray(final_frame, format="rgb24")

                updated_frames.append(frame)
                # print (f"tesnor len: {len(shared_tensor)}, tensor shape: {shared_tensor.shape}, tensor type:{shared_tensor.dtype} tensor ref: {shared_tensor_ref}")
        except Exception as e:
            print (e)
        return updated_frames

    async def queued_audio_frames_callback(
                self,
                frames: List[av.AudioFrame],
            ) -> av.AudioFrame:
        try:
            with self._lock:
                should_listed = self._listening
            sound_chunk = pydub.AudioSegment.empty()
            if len(frames) > 0 and should_listed:
                for frame in frames:
                    sound = pydub.AudioSegment(
                        data=frame.to_ndarray().tobytes(),
                        sample_width=frame.format.bytes,
                        frame_rate=frame.sample_rate,
                        channels=len(frame.layout.channels),
                    )
                    sound = sound.set_channels(1)
                    sound = sound.set_frame_rate(self._audio_bit_rate)
                    sound_chunk += sound
                shared_buffer = np.array(sound_chunk.get_array_of_samples())
                shared_buffer_ref = ray.put(shared_buffer)
                await self.app_interface_actor.enqueue_audio_input_frame.remote(shared_buffer_ref)
        except Exception as e:
            print (e)
            
        # return empty frames to avoid echo
        new_frames = []
        try:
            for frame in frames:
                required_samples = frame.samples
                # print (f"frame: {frame.format.name}, {frame.layout.name}, {frame.sample_rate}, {frame.samples}")
                assert frame.format.bytes == 2
                assert frame.format.name == 's16'
                import time
                start_time = time.time()
                frame_as_bytes = await self.app_interface_actor.dequeue_audio_output_frame_async.remote()
                elapsed_time = time.time() - start_time
                if elapsed_time > 0.1:
                    print (f"app_interface_actor.dequeue_audio_output_frame_async() elapsed_time: {elapsed_time}")
                if frame_as_bytes:
                    # print(f"frame_as_bytes: {len(frame_as_bytes)}")
                    assert len(frame_as_bytes) == frame.samples * frame.format.bytes
                    samples = np.frombuffer(frame_as_bytes, dtype=np.int16)
                else:
                    samples = np.zeros((required_samples * 2 * 1), dtype=np.int16)
                if self._output_channels == 2:
                    samples = np.vstack((samples, samples)).reshape((-1,), order='F')
                samples = samples.reshape(1, -1)
                layout = 'stereo' if self._output_channels == 2 else 'mono'
                new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout)
                new_frame.sample_rate = frame.sample_rate
                new_frames.append(new_frame)
        except Exception as e:
            print (e)
        return new_frames