|
import numpy as np |
|
import soundfile as sf |
|
import time |
|
|
|
def audio_stream_generator(audio_file_path, chunk_size=4096, simulate_realtime=True): |
|
""" |
|
音频流生成器,从音频文件中读取数据并以流的方式输出 |
|
|
|
参数: |
|
audio_file_path: 音频文件路径 |
|
chunk_size: 每个数据块的大小(采样点数) |
|
simulate_realtime: 是否模拟实时流处理的速度 |
|
|
|
生成: |
|
numpy.ndarray: 每次生成一个chunk_size大小的np.float32数据块 |
|
""" |
|
|
|
audio_data, sample_rate = sf.read(audio_file_path) |
|
|
|
|
|
if audio_data.dtype != np.float32: |
|
audio_data = audio_data.astype(np.float32) |
|
|
|
|
|
if len(audio_data.shape) > 1 and audio_data.shape[1] > 1: |
|
audio_data = audio_data.mean(axis=1) |
|
|
|
print(f"已加载音频文件: {audio_file_path}") |
|
print(f"采样率: {sample_rate} Hz") |
|
print(f"音频长度: {len(audio_data)/sample_rate:.2f} 秒") |
|
|
|
|
|
chunk_duration = chunk_size / sample_rate if simulate_realtime else 0 |
|
|
|
|
|
audio_len = len(audio_data) |
|
for pos in range(0, audio_len, chunk_size): |
|
|
|
end_pos = min(pos + chunk_size, audio_len) |
|
chunk = audio_data[pos:end_pos] |
|
|
|
|
|
if len(chunk) < chunk_size: |
|
padded_chunk = np.zeros(chunk_size, dtype=np.float32) |
|
padded_chunk[:len(chunk)] = chunk |
|
chunk = padded_chunk |
|
|
|
|
|
if simulate_realtime: |
|
time.sleep(chunk_duration) |
|
|
|
yield chunk |
|
|
|
print("音频流处理完成") |