|
import numpy as np |
|
from scipy import interpolate |
|
|
|
def Singleton(cls): |
|
_instance = {} |
|
|
|
def _singleton(*args, **kargs): |
|
if cls not in _instance: |
|
_instance[cls] = cls(*args, **kargs) |
|
return _instance[cls] |
|
|
|
return _singleton |
|
|
|
|
|
@Singleton |
|
class RealtimeAudioDistribution(): |
|
def __init__(self) -> None: |
|
self.data = {} |
|
self.max_len = 1024*1024 |
|
self.rate = 48000 |
|
|
|
def clean_up(self): |
|
self.data = {} |
|
|
|
def feed(self, uuid, audio): |
|
self.rate, audio_ = audio |
|
|
|
if uuid not in self.data: |
|
self.data[uuid] = audio_ |
|
else: |
|
new_arr = np.concatenate((self.data[uuid], audio_)) |
|
if len(new_arr) > self.max_len: new_arr = new_arr[-self.max_len:] |
|
self.data[uuid] = new_arr |
|
|
|
def read(self, uuid): |
|
if uuid in self.data: |
|
res = self.data.pop(uuid) |
|
print('\r read-', len(res), '-', max(res), end='', flush=True) |
|
else: |
|
res = None |
|
return res |
|
|
|
def change_sample_rate(audio, old_sr, new_sr): |
|
duration = audio.shape[0] / old_sr |
|
|
|
time_old = np.linspace(0, duration, audio.shape[0]) |
|
time_new = np.linspace(0, duration, int(audio.shape[0] * new_sr / old_sr)) |
|
|
|
interpolator = interpolate.interp1d(time_old, audio.T) |
|
new_audio = interpolator(time_new).T |
|
return new_audio.astype(np.int16) |