Spaces:
Sleeping
Sleeping
File size: 2,431 Bytes
c4fe843 ae52b65 d91a673 c4fe843 2bb91de c4fe843 2bb91de d91a673 cf5e7f4 c4fe843 d91a673 c4fe843 9b7ec10 c4fe843 9b7ec10 c4fe843 2bb91de |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import asyncio
import ray
from ray.util.queue import Queue
class FFMpegConverterActor:
def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
self.output_queue = output_queue
self.buffer_size = buffer_size
self.output_format = output_format
self.input_pipe = None
self.output_pipe = None
self.process = None
self.running = True
async def run(self):
while self.running:
try:
chunk = await self.output_pipe.readexactly(self.buffer_size)
except asyncio.IncompleteReadError:
# exit if we have finsihsed the process
if self.running == False:
return
# If the pipe is broken, restart the process.
await self.start_process()
continue
# print(f"FFMpegConverterActor: read {len(chunk)} bytes")
chunk_ref = ray.put(chunk)
await self.output_queue.put_async(chunk_ref)
async def start_process(self):
cmd = [
'ffmpeg',
'-i', 'pipe:0', # read from stdin
'-f', self.output_format,
'-ar', '48000',
'-ac', '1',
'pipe:1' # write to stdout
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.input_pipe = self.process.stdin
self.output_pipe = self.process.stdout
assert self.input_pipe is not None, "input_pipe was not initialized"
# print (f"input_pipe: {self.input_pipe}")
async def push_chunk(self, chunk):
try:
self.input_pipe.write(chunk)
await self.input_pipe.drain()
except BrokenPipeError:
# If the pipe is broken, restart the process.
await self.start_process()
self.input_pipe.write(chunk)
await self.input_pipe.drain()
async def close(self):
self.running = False # Stop the loop inside run()
if self.process:
self.process.stdin.transport.close()
self.process.kill()
self.process.terminate()
# while not self.output_queue.empty():
# await self.output_queue.get_async()
|