Spaces:
Sleeping
Sleeping
import asyncio | |
import ray | |
from ray.util.queue import Queue | |
class FFMpegConverterActor: | |
def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'): | |
self.output_queue = output_queue | |
self.buffer_size = buffer_size | |
self.output_format = output_format | |
self.input_pipe = None | |
self.output_pipe = None | |
self.process = None | |
self.running = True | |
async def run(self): | |
while self.running: | |
try: | |
chunk = await self.output_pipe.readexactly(self.buffer_size) | |
except asyncio.IncompleteReadError: | |
# exit if we have finsihsed the process | |
if self.running == False: | |
return | |
# If the pipe is broken, restart the process. | |
await self.start_process() | |
continue | |
# print(f"FFMpegConverterActor: read {len(chunk)} bytes") | |
chunk_ref = ray.put(chunk) | |
await self.output_queue.put_async(chunk_ref) | |
async def start_process(self): | |
cmd = [ | |
'ffmpeg', | |
'-i', 'pipe:0', # read from stdin | |
'-f', self.output_format, | |
'-ar', '48000', | |
'-ac', '1', | |
'pipe:1' # write to stdout | |
] | |
self.process = await asyncio.create_subprocess_exec( | |
*cmd, | |
stdin=asyncio.subprocess.PIPE, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE | |
) | |
self.input_pipe = self.process.stdin | |
self.output_pipe = self.process.stdout | |
assert self.input_pipe is not None, "input_pipe was not initialized" | |
# print (f"input_pipe: {self.input_pipe}") | |
async def push_chunk(self, chunk): | |
try: | |
self.input_pipe.write(chunk) | |
await self.input_pipe.drain() | |
except BrokenPipeError: | |
# If the pipe is broken, restart the process. | |
await self.start_process() | |
self.input_pipe.write(chunk) | |
await self.input_pipe.drain() | |
async def close(self): | |
self.running = False # Stop the loop inside run() | |
if self.process: | |
self.process.stdin.transport.close() | |
self.process.kill() | |
self.process.terminate() | |
# while not self.output_queue.empty(): | |
# await self.output_queue.get_async() | |