project_charles / ffmpeg_converter.py
sohojoe's picture
fix audio issue when starting a new responce
a642a9f
import asyncio
import ray
from ray.util.queue import Queue, Full
class FFMpegConverter:
def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'):
self.output_queue = output_queue
self.buffer_size = buffer_size
self.output_format = output_format
self.input_pipe = None
self.output_pipe = None
self.process = None
self.running = True
async def run(self):
while self.running:
try:
chunk = await self.output_pipe.readexactly(self.buffer_size)
except asyncio.IncompleteReadError:
# exit if we have finsihsed the process
if self.running == False:
return
# If the pipe is broken, restart the process.
await self.start_process()
continue
if self.running == False:
return
# print(f"FFMpegConverter: read {len(chunk)} bytes")
chunk_ref = ray.put(chunk)
keep_trying = True
while keep_trying:
try:
await self.output_queue.put_async(chunk_ref, timeout=0.01)
keep_trying = False
except Full:
if self.running == False:
return
await asyncio.sleep(0.01)
async def start_process(self):
cmd = [
'ffmpeg',
'-i', 'pipe:0', # read from stdin
'-f', self.output_format,
'-ar', '48000',
'-ac', '1',
'pipe:1' # write to stdout
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self.input_pipe = self.process.stdin
self.output_pipe = self.process.stdout
assert self.input_pipe is not None, "input_pipe was not initialized"
# print (f"input_pipe: {self.input_pipe}")
async def push_chunk(self, chunk):
if self.running == False:
return
try:
self.input_pipe.write(chunk)
await self.input_pipe.drain()
except BrokenPipeError:
# If the pipe is broken, restart the process.
await self.start_process()
self.input_pipe.write(chunk)
await self.input_pipe.drain()
async def close(self):
self.running = False # Stop the loop inside run()
if self.process:
self.process.stdin.transport.close()
self.process.kill()
self.process.terminate()