import subprocess import numpy as np def ffmpeg_stream(youtube_url, sampling_rate=16_000, chunk_duration_ms=5000, pad_duration_ms=200): """ Helper function to read an audio file through ffmpeg. """ chunk_len = int(sampling_rate * chunk_duration_ms / 1000) pad_len = int(sampling_rate * pad_duration_ms / 1000) read_chunk_len = chunk_len + pad_len * 2 ar = f"{sampling_rate}" ac = "1" format_for_conversion = "f32le" dtype = np.float32 size_of_sample = 4 ffmpeg_command = [ "ffmpeg", "-i", "pipe:", "-ac", ac, "-ar", ar, "-f", format_for_conversion, "-hide_banner", "-loglevel", "quiet", "pipe:1", ] ytdl_command = ["yt-dlp", "-f", "bestaudio", youtube_url, "--quiet", "-o", "-"] try: ffmpeg_process = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=-1) ytdl_process = subprocess.Popen(ytdl_command, stdout=ffmpeg_process.stdin) except FileNotFoundError: raise ValueError("ffmpeg was not found but is required to stream audio files from filename") acc = b"" leftover = np.zeros((0,), dtype=np.float32) while ytdl_process.poll() is None: buflen = read_chunk_len * size_of_sample raw = ffmpeg_process.stdout.read(buflen) if raw == b"": break if len(acc) + len(raw) > buflen: acc = raw else: acc += raw audio = np.frombuffer(acc, dtype=dtype) audio = np.concatenate([leftover, audio]) if len(audio) < pad_len * 2: # TODO: handle end of stream better than this break yield audio leftover = audio[-pad_len * 2 :] read_chunk_len = chunk_len