File size: 1,845 Bytes
ac3675c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import numpy as np
import soundfile as sf
import time

def audio_stream_generator(audio_file_path, chunk_size=4096, simulate_realtime=True):
    """
    音频流生成器,从音频文件中读取数据并以流的方式输出
    
    参数:
        audio_file_path: 音频文件路径
        chunk_size: 每个数据块的大小(采样点数)
        simulate_realtime: 是否模拟实时流处理的速度
        
    生成:
        numpy.ndarray: 每次生成一个chunk_size大小的np.float32数据块
    """
    # 加载音频文件
    audio_data, sample_rate = sf.read(audio_file_path)
    
    # 确保音频数据是float32类型
    if audio_data.dtype != np.float32:
        audio_data = audio_data.astype(np.float32)
    
    # 如果是立体声,转换为单声道
    if len(audio_data.shape) > 1 and audio_data.shape[1] > 1:
        audio_data = audio_data.mean(axis=1)
        
    print(f"已加载音频文件: {audio_file_path}")
    print(f"采样率: {sample_rate} Hz")
    print(f"音频长度: {len(audio_data)/sample_rate:.2f} 秒")
    
    # 计算每个块的时长(秒)
    chunk_duration = chunk_size / sample_rate if simulate_realtime else 0
    
    # 按块生成数据
    audio_len = len(audio_data)
    for pos in range(0, audio_len, chunk_size):
        # 获取当前块
        end_pos = min(pos + chunk_size, audio_len)
        chunk = audio_data[pos:end_pos]
        
        # 如果块大小不足,用0填充
        if len(chunk) < chunk_size:
            padded_chunk = np.zeros(chunk_size, dtype=np.float32)
            padded_chunk[:len(chunk)] = chunk
            chunk = padded_chunk
            
        # 模拟实时处理的延迟
        if simulate_realtime:
            time.sleep(chunk_duration)
            
        yield chunk
    
    print("音频流处理完成")