Spaces:
Sleeping
Sleeping
import numpy as np | |
from scipy import interpolate | |
def Singleton(cls): | |
_instance = {} | |
def _singleton(*args, **kargs): | |
if cls not in _instance: | |
_instance[cls] = cls(*args, **kargs) | |
return _instance[cls] | |
return _singleton | |
class RealtimeAudioDistribution(): | |
def __init__(self) -> None: | |
self.data = {} | |
self.max_len = 1024*1024 | |
self.rate = 48000 # 只读,每秒采样数量 | |
def clean_up(self): | |
self.data = {} | |
def feed(self, uuid, audio): | |
self.rate, audio_ = audio | |
# print('feed', len(audio_), audio_[-25:]) | |
if uuid not in self.data: | |
self.data[uuid] = audio_ | |
else: | |
new_arr = np.concatenate((self.data[uuid], audio_)) | |
if len(new_arr) > self.max_len: new_arr = new_arr[-self.max_len:] | |
self.data[uuid] = new_arr | |
def read(self, uuid): | |
if uuid in self.data: | |
res = self.data.pop(uuid) | |
print('\r read-', len(res), '-', max(res), end='', flush=True) | |
else: | |
res = None | |
return res | |
def change_sample_rate(audio, old_sr, new_sr): | |
duration = audio.shape[0] / old_sr | |
time_old = np.linspace(0, duration, audio.shape[0]) | |
time_new = np.linspace(0, duration, int(audio.shape[0] * new_sr / old_sr)) | |
interpolator = interpolate.interp1d(time_old, audio.T) | |
new_audio = interpolator(time_new).T | |
return new_audio.astype(np.int16) |