Update README.md
Browse files
README.md
CHANGED
@@ -1,242 +1 @@
|
|
1 |
-
|
2 |
-
STR_AUDIO_SIGNAL = 'audio_signal'
|
3 |
-
STR_TARGET_VECTOR = 'target_vector'
|
4 |
-
|
5 |
-
|
6 |
-
STR_CH_FIRST = 'channels_first'
|
7 |
-
STR_CH_LAST = 'channels_last'
|
8 |
-
|
9 |
-
import io
|
10 |
-
import os
|
11 |
-
import tqdm
|
12 |
-
import logging
|
13 |
-
import subprocess
|
14 |
-
from typing import Tuple
|
15 |
-
from pathlib import Path
|
16 |
-
|
17 |
-
# import librosa
|
18 |
-
import numpy as np
|
19 |
-
import soundfile as sf
|
20 |
-
|
21 |
-
import itertools
|
22 |
-
from numpy.fft import irfft
|
23 |
-
|
24 |
-
def _resample_load_ffmpeg(path: str, sample_rate: int, downmix_to_mono: bool) -> Tuple[np.ndarray, int]:
|
25 |
-
"""
|
26 |
-
Decoding, downmixing, and downsampling by librosa.
|
27 |
-
Returns a channel-first audio signal.
|
28 |
-
Args:
|
29 |
-
path:
|
30 |
-
sample_rate:
|
31 |
-
downmix_to_mono:
|
32 |
-
Returns:
|
33 |
-
(audio signal, sample rate)
|
34 |
-
"""
|
35 |
-
|
36 |
-
def _decode_resample_by_ffmpeg(filename, sr):
|
37 |
-
"""decode, downmix, and resample audio file"""
|
38 |
-
channel_cmd = '-ac 1 ' if downmix_to_mono else '' # downmixing option
|
39 |
-
resampling_cmd = f'-ar {str(sr)}' if sr else '' # downsampling option
|
40 |
-
cmd = f"ffmpeg -i \"{filename}\" {channel_cmd} {resampling_cmd} -f wav -"
|
41 |
-
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
42 |
-
out, err = p.communicate()
|
43 |
-
return out
|
44 |
-
|
45 |
-
src, sr = sf.read(io.BytesIO(_decode_resample_by_ffmpeg(path, sr=sample_rate)))
|
46 |
-
return src.T, sr
|
47 |
-
|
48 |
-
|
49 |
-
def _resample_load_librosa(path: str, sample_rate: int, downmix_to_mono: bool, **kwargs) -> Tuple[np.ndarray, int]:
|
50 |
-
"""
|
51 |
-
Decoding, downmixing, and downsampling by librosa.
|
52 |
-
Returns a channel-first audio signal.
|
53 |
-
"""
|
54 |
-
src, sr = librosa.load(path, sr=sample_rate, mono=downmix_to_mono, **kwargs)
|
55 |
-
return src, sr
|
56 |
-
|
57 |
-
|
58 |
-
def load_audio(
|
59 |
-
path: str or Path,
|
60 |
-
ch_format: str,
|
61 |
-
sample_rate: int = None,
|
62 |
-
downmix_to_mono: bool = False,
|
63 |
-
resample_by: str = 'ffmpeg',
|
64 |
-
**kwargs,
|
65 |
-
) -> Tuple[np.ndarray, int]:
|
66 |
-
"""A wrapper of librosa.load that:
|
67 |
-
- forces the returned audio to be 2-dim,
|
68 |
-
- defaults to sr=None, and
|
69 |
-
- defaults to downmix_to_mono=False.
|
70 |
-
The audio decoding is done by `audioread` or `soundfile` package and ultimately, often by ffmpeg.
|
71 |
-
The resampling is done by `librosa`'s child package `resampy`.
|
72 |
-
Args:
|
73 |
-
path: audio file path
|
74 |
-
ch_format: one of 'channels_first' or 'channels_last'
|
75 |
-
sample_rate: target sampling rate. if None, use the rate of the audio file
|
76 |
-
downmix_to_mono:
|
77 |
-
resample_by (str): 'librosa' or 'ffmpeg'. it decides backend for audio decoding and resampling.
|
78 |
-
**kwargs: keyword args for librosa.load - offset, duration, dtype, res_type.
|
79 |
-
Returns:
|
80 |
-
(audio, sr) tuple
|
81 |
-
"""
|
82 |
-
if ch_format not in (STR_CH_FIRST, STR_CH_LAST):
|
83 |
-
raise ValueError(f'ch_format is wrong here -> {ch_format}')
|
84 |
-
|
85 |
-
if os.stat(path).st_size > 8000:
|
86 |
-
if resample_by == 'librosa':
|
87 |
-
src, sr = _resample_load_librosa(path, sample_rate, downmix_to_mono, **kwargs)
|
88 |
-
elif resample_by == 'ffmpeg':
|
89 |
-
src, sr = _resample_load_ffmpeg(path, sample_rate, downmix_to_mono)
|
90 |
-
else:
|
91 |
-
raise NotImplementedError(f'resample_by: "{resample_by}" is not supposred yet')
|
92 |
-
else:
|
93 |
-
raise ValueError('Given audio is too short!')
|
94 |
-
return src, sr
|
95 |
-
|
96 |
-
# if src.ndim == 1:
|
97 |
-
# src = np.expand_dims(src, axis=0)
|
98 |
-
# # now always 2d and channels_first
|
99 |
-
|
100 |
-
# if ch_format == STR_CH_FIRST:
|
101 |
-
# return src, sr
|
102 |
-
# else:
|
103 |
-
# return src.T, sr
|
104 |
-
|
105 |
-
def ms(x):
|
106 |
-
"""Mean value of signal `x` squared.
|
107 |
-
:param x: Dynamic quantity.
|
108 |
-
:returns: Mean squared of `x`.
|
109 |
-
"""
|
110 |
-
return (np.abs(x)**2.0).mean()
|
111 |
-
|
112 |
-
def normalize(y, x=None):
|
113 |
-
"""normalize power in y to a (standard normal) white noise signal.
|
114 |
-
Optionally normalize to power in signal `x`.
|
115 |
-
#The mean power of a Gaussian with :math:`\\mu=0` and :math:`\\sigma=1` is 1.
|
116 |
-
"""
|
117 |
-
if x is not None:
|
118 |
-
x = ms(x)
|
119 |
-
else:
|
120 |
-
x = 1.0
|
121 |
-
return y * np.sqrt(x / ms(y))
|
122 |
-
|
123 |
-
def noise(N, color='white', state=None):
|
124 |
-
"""Noise generator.
|
125 |
-
:param N: Amount of samples.
|
126 |
-
:param color: Color of noise.
|
127 |
-
:param state: State of PRNG.
|
128 |
-
:type state: :class:`np.random.RandomState`
|
129 |
-
"""
|
130 |
-
try:
|
131 |
-
return _noise_generators[color](N, state)
|
132 |
-
except KeyError:
|
133 |
-
raise ValueError("Incorrect color.")
|
134 |
-
|
135 |
-
def white(N, state=None):
|
136 |
-
"""
|
137 |
-
White noise.
|
138 |
-
:param N: Amount of samples.
|
139 |
-
:param state: State of PRNG.
|
140 |
-
:type state: :class:`np.random.RandomState`
|
141 |
-
White noise has a constant power density. It's narrowband spectrum is therefore flat.
|
142 |
-
The power in white noise will increase by a factor of two for each octave band,
|
143 |
-
and therefore increases with 3 dB per octave.
|
144 |
-
"""
|
145 |
-
state = np.random.RandomState() if state is None else state
|
146 |
-
return state.randn(N)
|
147 |
-
|
148 |
-
def pink(N, state=None):
|
149 |
-
"""
|
150 |
-
Pink noise.
|
151 |
-
:param N: Amount of samples.
|
152 |
-
:param state: State of PRNG.
|
153 |
-
:type state: :class:`np.random.RandomState`
|
154 |
-
Pink noise has equal power in bands that are proportionally wide.
|
155 |
-
Power density decreases with 3 dB per octave.
|
156 |
-
"""
|
157 |
-
state = np.random.RandomState() if state is None else state
|
158 |
-
uneven = N % 2
|
159 |
-
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven)
|
160 |
-
S = np.sqrt(np.arange(len(X)) + 1.) # +1 to avoid divide by zero
|
161 |
-
y = (irfft(X / S)).real
|
162 |
-
if uneven:
|
163 |
-
y = y[:-1]
|
164 |
-
return normalize(y)
|
165 |
-
|
166 |
-
def blue(N, state=None):
|
167 |
-
"""
|
168 |
-
Blue noise.
|
169 |
-
:param N: Amount of samples.
|
170 |
-
:param state: State of PRNG.
|
171 |
-
:type state: :class:`np.random.RandomState`
|
172 |
-
Power increases with 6 dB per octave.
|
173 |
-
Power density increases with 3 dB per octave.
|
174 |
-
"""
|
175 |
-
state = np.random.RandomState() if state is None else state
|
176 |
-
uneven = N % 2
|
177 |
-
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven)
|
178 |
-
S = np.sqrt(np.arange(len(X))) # Filter
|
179 |
-
y = (irfft(X * S)).real
|
180 |
-
if uneven:
|
181 |
-
y = y[:-1]
|
182 |
-
return normalize(y)
|
183 |
-
|
184 |
-
def brown(N, state=None):
|
185 |
-
"""
|
186 |
-
Violet noise.
|
187 |
-
:param N: Amount of samples.
|
188 |
-
:param state: State of PRNG.
|
189 |
-
:type state: :class:`np.random.RandomState`
|
190 |
-
Power decreases with -3 dB per octave.
|
191 |
-
Power density decreases with 6 dB per octave.
|
192 |
-
"""
|
193 |
-
state = np.random.RandomState() if state is None else state
|
194 |
-
uneven = N % 2
|
195 |
-
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven)
|
196 |
-
S = (np.arange(len(X)) + 1) # Filter
|
197 |
-
y = (irfft(X / S)).real
|
198 |
-
if uneven:
|
199 |
-
y = y[:-1]
|
200 |
-
return normalize(y)
|
201 |
-
|
202 |
-
def violet(N, state=None):
|
203 |
-
"""
|
204 |
-
Violet noise. Power increases with 6 dB per octave.
|
205 |
-
:param N: Amount of samples.
|
206 |
-
:param state: State of PRNG.
|
207 |
-
:type state: :class:`np.random.RandomState`
|
208 |
-
Power increases with +9 dB per octave.
|
209 |
-
Power density increases with +6 dB per octave.
|
210 |
-
"""
|
211 |
-
state = np.random.RandomState() if state is None else state
|
212 |
-
uneven = N % 2
|
213 |
-
X = state.randn(N // 2 + 1 + uneven) + 1j * state.randn(N // 2 + 1 + uneven)
|
214 |
-
S = (np.arange(len(X))) # Filter
|
215 |
-
y = (irfft(X * S)).real
|
216 |
-
if uneven:
|
217 |
-
y = y[:-1]
|
218 |
-
return normalize(y)
|
219 |
-
|
220 |
-
_noise_generators = {
|
221 |
-
'white': white,
|
222 |
-
'pink': pink,
|
223 |
-
'blue': blue,
|
224 |
-
'brown': brown,
|
225 |
-
'violet': violet,
|
226 |
-
}
|
227 |
-
|
228 |
-
def noise_generator(N=44100, color='white', state=None):
|
229 |
-
"""Noise generator.
|
230 |
-
:param N: Amount of unique samples to generate.
|
231 |
-
:param color: Color of noise.
|
232 |
-
Generate `N` amount of unique samples and cycle over these samples.
|
233 |
-
"""
|
234 |
-
#yield from itertools.cycle(noise(N, color)) # Python 3.3
|
235 |
-
for sample in itertools.cycle(noise(N, color, state)):
|
236 |
-
yield sample
|
237 |
-
|
238 |
-
def heaviside(N):
|
239 |
-
"""Heaviside.
|
240 |
-
Returns the value 0 for `x < 0`, 1 for `x > 0`, and 1/2 for `x = 0`.
|
241 |
-
"""
|
242 |
-
return 0.5 * (np.sign(N) + 1)
|
|
|
1 |
+
import os
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|