File size: 2,431 Bytes
c4fe843
 
 
 
 
 
ae52b65
 
d91a673
c4fe843
 
 
 
2bb91de
c4fe843
 
2bb91de
 
 
 
 
 
 
 
 
 
d91a673
cf5e7f4
 
c4fe843
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d91a673
c4fe843
 
 
 
9b7ec10
c4fe843
 
 
 
9b7ec10
c4fe843
2bb91de
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

import asyncio
import ray
from ray.util.queue import Queue

class FFMpegConverterActor:
    def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
        self.output_queue = output_queue
        self.buffer_size = buffer_size
        self.output_format = output_format
        self.input_pipe = None
        self.output_pipe = None
        self.process = None
        self.running = True

    async def run(self):
        while self.running:
            try:
                chunk = await self.output_pipe.readexactly(self.buffer_size)
            except asyncio.IncompleteReadError:
                # exit if we have finsihsed the process
                if self.running == False:
                    return
                # If the pipe is broken, restart the process.
                await self.start_process()
                continue
            # print(f"FFMpegConverterActor: read {len(chunk)} bytes")
            chunk_ref = ray.put(chunk)
            await self.output_queue.put_async(chunk_ref)       

    async def start_process(self):
        cmd = [
            'ffmpeg',
            '-i', 'pipe:0',  # read from stdin
            '-f', self.output_format,
            '-ar', '48000',
            '-ac', '1',
            'pipe:1'  # write to stdout
        ]

        self.process = await asyncio.create_subprocess_exec(
            *cmd,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        self.input_pipe = self.process.stdin
        self.output_pipe = self.process.stdout

        assert self.input_pipe is not None, "input_pipe was not initialized"
        # print (f"input_pipe: {self.input_pipe}")

    async def push_chunk(self, chunk):
        try:
            self.input_pipe.write(chunk)
            await self.input_pipe.drain()
        except BrokenPipeError:
            # If the pipe is broken, restart the process.
            await self.start_process()
            self.input_pipe.write(chunk)
            await self.input_pipe.drain()

    async def close(self):
        self.running = False  # Stop the loop inside run()
        if self.process:
            self.process.stdin.transport.close()
            self.process.kill()
            self.process.terminate()
        # while not self.output_queue.empty():
        #     await self.output_queue.get_async()