import subprocess from threading import Thread from queue import Queue, Empty from typing import Iterator import threading import time class LocalSpeakerService: def __init__(self): self.queue = Queue() self._is_running = threading.Event() self._is_running.set() self.mpv_process = None self.thread = Thread(target=self._process_audio_streams) self.thread.start() def add_audio_stream(self, audio_stream: Iterator[bytes]): if self._is_running.is_set(): self.queue.put(audio_stream) def _process_audio_streams(self): self._start_mpv() while self._is_running.is_set() or not self.queue.empty(): try: audio_stream = self.queue.get(timeout=1) except Empty: continue self._stream(audio_stream) self._close_mpv() def _stream(self, audio_stream: Iterator[bytes]): for chunk in audio_stream: if chunk is not None: self.mpv_process.stdin.write(chunk) self.mpv_process.stdin.flush() def _start_mpv(self): mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"] self.mpv_process = subprocess.Popen( mpv_command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def _close_mpv(self): if self.mpv_process.stdin: self.mpv_process.stdin.close() self.mpv_process.wait() def close(self): self._is_running.clear() while not self.queue.empty(): # Wait until the queue is empty time.sleep(0.1) self.thread.join()