File size: 2,765 Bytes
162d5c8
 
 
 
 
 
 
62a21bd
 
 
 
162d5c8
 
62a21bd
 
ad67495
 
 
 
 
162d5c8
 
 
 
62a21bd
 
 
 
 
 
 
162d5c8
 
 
 
 
 
62a21bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162d5c8
 
 
 
 
 
 
 
 
 
 
768e92a
 
 
62a21bd
768e92a
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from typing import List
import av
import asyncio
from collections import deque
import threading

import numpy as np
import ray
from input_av_queue_actor import InputAVQueueActor
import pydub
import torch

class StreamlitAVQueue:
    def __init__(self, audio_bit_rate=16000):
        self._audio_bit_rate = audio_bit_rate
        self.queue_actor = InputAVQueueActor.options(
            name="InputAVQueueActor", 
            get_if_exists=True,
            ).remote() 
            
    async def queued_video_frames_callback(
                self,
                frames: List[av.AudioFrame],
            ) -> av.AudioFrame:
        try:
            for frame in frames:
                shared_tensor = torch.from_numpy(frame.to_ndarray())
                shared_tensor_ref = ray.put(shared_tensor)
                self.queue_actor.enqueue_video_frame.remote(shared_tensor_ref)
        except Exception as e:
            print (e)
        return frames

    async def queued_audio_frames_callback(
                self,
                frames: List[av.AudioFrame],
            ) -> av.AudioFrame:
        try:
            sound_chunk = pydub.AudioSegment.empty()
            if len(frames) > 0:
                for frame in frames:
                    sound = pydub.AudioSegment(
                        data=frame.to_ndarray().tobytes(),
                        sample_width=frame.format.bytes,
                        frame_rate=frame.sample_rate,
                        channels=len(frame.layout.channels),
                    )
                    sound = sound.set_channels(1)
                    sound = sound.set_frame_rate(self._audio_bit_rate)
                    sound_chunk += sound
                shared_buffer = np.array(sound_chunk.get_array_of_samples())
                shared_buffer_ref = ray.put(shared_buffer)
                self.queue_actor.enqueue_audio_frame.remote(shared_buffer_ref)
        except Exception as e:
            print (e)
            
        # return empty frames to avoid echo
        new_frames = []
        for frame in frames:
            input_array = frame.to_ndarray()
            new_frame = av.AudioFrame.from_ndarray(
                np.zeros(input_array.shape, dtype=input_array.dtype),
                layout=frame.layout.name,
            )
            new_frame.sample_rate = frame.sample_rate
            new_frames.append(new_frame)    
        return new_frames

    async def get_audio_frames_async(self) -> List[av.AudioFrame]:
        shared_buffers = await self.queue_actor.get_audio_frames.remote()
        return shared_buffers

    async def get_video_frames_async(self) -> List[av.AudioFrame]:
        shared_tensors = await self.queue_actor.get_video_frames.remote()
        return shared_tensors