Spaces:
Sleeping
Sleeping
File size: 2,060 Bytes
c4fe843 ae52b65 d91a673 c4fe843 d91a673 ae52b65 c4fe843 d91a673 c4fe843 df0ea75 ae52b65 df0ea75 ae52b65 c4fe843 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
import asyncio
import ray
from ray.util.queue import Queue
@ray.remote
class FFMpegConverterActor:
def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
self.output_queue = output_queue
self.buffer_size = buffer_size
self.output_format = output_format
self.input_pipe = None
self.output_pipe = None
self.process = None
async def run(self):
while True:
chunk = await self.output_pipe.readexactly(self.buffer_size)
# print(f"FFMpegConverterActor: read {len(chunk)} bytes")
await self.output_queue.put_async(chunk)
async def start_process(self):
cmd = [
'ffmpeg',
'-i', 'pipe:0', # read from stdin
'-f', self.output_format,
'-ar', '48000',
'-ac', '1',
'pipe:1' # write to stdout
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.input_pipe = self.process.stdin
self.output_pipe = self.process.stdout
assert self.input_pipe is not None, "input_pipe was not initialized"
# print (f"input_pipe: {self.input_pipe}")
async def push_chunk(self, chunk):
try:
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
except BrokenPipeError:
# If the pipe is broken, restart the process.
await self.start_process()
self.input_pipe.write(chunk)
# await self.input_pipe.drain()
# def has_processed_all_data(self):
# return self.process.poll() is not None
async def flush_output_queue(self):
while not self.output_queue.empty():
await self.output_queue.get_async()
def close(self):
self.input_pipe.close()
self.output_pipe.close()
self.process.wait() |