import asyncio import ray from ray.util.queue import Queue, Full class FFMpegConverter: def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'): self.output_queue = output_queue self.buffer_size = buffer_size self.output_format = output_format self.input_pipe = None self.output_pipe = None self.process = None self.running = True async def run(self): while self.running: try: chunk = await self.output_pipe.readexactly(self.buffer_size) except asyncio.IncompleteReadError: # exit if we have finsihsed the process if self.running == False: return # If the pipe is broken, restart the process. await self.start_process() continue if self.running == False: return # print(f"FFMpegConverter: read {len(chunk)} bytes") chunk_ref = ray.put(chunk) keep_trying = True while keep_trying: try: await self.output_queue.put_async(chunk_ref, timeout=0.01) keep_trying = False except Full: if self.running == False: return await asyncio.sleep(0.01) async def start_process(self): cmd = [ 'ffmpeg', '-i', 'pipe:0', # read from stdin '-f', self.output_format, '-ar', '48000', '-ac', '1', 'pipe:1' # write to stdout ] self.process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) self.input_pipe = self.process.stdin self.output_pipe = self.process.stdout assert self.input_pipe is not None, "input_pipe was not initialized" # print (f"input_pipe: {self.input_pipe}") async def push_chunk(self, chunk): if self.running == False: return try: self.input_pipe.write(chunk) await self.input_pipe.drain() except BrokenPipeError: # If the pipe is broken, restart the process. await self.start_process() self.input_pipe.write(chunk) await self.input_pipe.drain() async def close(self): self.running = False # Stop the loop inside run() if self.process: self.process.stdin.transport.close() self.process.kill() self.process.terminate()