project_charles / local_speaker_service.py
sohojoe's picture
refactor
1a63d97
raw
history blame
No virus
1.72 kB
import subprocess
from threading import Thread
from queue import Queue, Empty
from typing import Iterator
import threading
import time
class LocalSpeakerService:
def __init__(self):
self.queue = Queue()
self._is_running = threading.Event()
self._is_running.set()
self.mpv_process = None
self.thread = Thread(target=self._process_audio_streams)
self.thread.start()
def add_audio_stream(self, audio_stream: Iterator[bytes]):
if self._is_running.is_set():
self.queue.put(audio_stream)
def _process_audio_streams(self):
self._start_mpv()
while self._is_running.is_set() or not self.queue.empty():
try:
audio_stream = self.queue.get(timeout=1)
except Empty:
continue
self._stream(audio_stream)
self._close_mpv()
def _stream(self, audio_stream: Iterator[bytes]):
for chunk in audio_stream:
if chunk is not None:
self.mpv_process.stdin.write(chunk)
self.mpv_process.stdin.flush()
def _start_mpv(self):
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
self.mpv_process = subprocess.Popen(
mpv_command,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _close_mpv(self):
if self.mpv_process.stdin:
self.mpv_process.stdin.close()
self.mpv_process.wait()
def close(self):
self._is_running.clear()
while not self.queue.empty(): # Wait until the queue is empty
time.sleep(0.1)
self.thread.join()