File size: 1,845 Bytes
ac3675c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import numpy as np
import soundfile as sf
import time
def audio_stream_generator(audio_file_path, chunk_size=4096, simulate_realtime=True):
"""
音频流生成器,从音频文件中读取数据并以流的方式输出
参数:
audio_file_path: 音频文件路径
chunk_size: 每个数据块的大小(采样点数)
simulate_realtime: 是否模拟实时流处理的速度
生成:
numpy.ndarray: 每次生成一个chunk_size大小的np.float32数据块
"""
# 加载音频文件
audio_data, sample_rate = sf.read(audio_file_path)
# 确保音频数据是float32类型
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
# 如果是立体声,转换为单声道
if len(audio_data.shape) > 1 and audio_data.shape[1] > 1:
audio_data = audio_data.mean(axis=1)
print(f"已加载音频文件: {audio_file_path}")
print(f"采样率: {sample_rate} Hz")
print(f"音频长度: {len(audio_data)/sample_rate:.2f} 秒")
# 计算每个块的时长(秒)
chunk_duration = chunk_size / sample_rate if simulate_realtime else 0
# 按块生成数据
audio_len = len(audio_data)
for pos in range(0, audio_len, chunk_size):
# 获取当前块
end_pos = min(pos + chunk_size, audio_len)
chunk = audio_data[pos:end_pos]
# 如果块大小不足,用0填充
if len(chunk) < chunk_size:
padded_chunk = np.zeros(chunk_size, dtype=np.float32)
padded_chunk[:len(chunk)] = chunk
chunk = padded_chunk
# 模拟实时处理的延迟
if simulate_realtime:
time.sleep(chunk_duration)
yield chunk
print("音频流处理完成") |