Spaces:
Sleeping
Sleeping
import pyaudio | |
import numpy as np | |
import webrtcvad | |
# Set up PyAudio | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 1 | |
RATE = 48000 | |
CHUNK_SIZE = 960 # 20ms audio chunks | |
# p = pyaudio.PyAudio() | |
# wav = "/home/kevingeng/Disk2/laronix/Laronix_ASR_TTS_VC/wav/20221228_video_good_normed_5/take1_001_norm.wav" | |
wav = "/home/kevingeng/Disk2/laronix/Laronix_ASR_TTS_VC/wav/VAD_test.wav" | |
import wave | |
wf = wave.open(wav, "rb") | |
# import pdb | |
# stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), | |
# channels=wf.getnchannels(), | |
# rate=wf.getframerate(), | |
# output=True) | |
# pdb.set_trace() | |
# Set up VAD | |
def streaming_VAD(wf): | |
vad = webrtcvad.Vad() | |
vad.set_mode(2) # Aggressive mode | |
# Start audio stream | |
# stream = p.open(format=FORMAT, | |
# channels=CHANNELS, | |
# rate=RATE, | |
# input=True, | |
# frames_per_buffer=CHUNK_SIZE) | |
# VAD constants | |
MIN_SILENCE_DURATION = 2000 # in ms | |
MAX_SILENCE_DURATION = 4000 # in ms | |
BUFFER_SIZE = MAX_SILENCE_DURATION // CHUNK_SIZE | |
BUFFER_THRESHOLD = int(BUFFER_SIZE * 0.5) | |
# Initialize VAD buffer | |
vad_buffer = [] | |
VAD_indicator = [] | |
VAD_frame_indicator = [] | |
data = wf.readframes(CHUNK_SIZE) | |
# Loop through audio stream | |
while data: | |
# Read audio chunk from stream | |
# pdb.set_trace() | |
# audio_chunk = np.frombuffer(stream.read(CHUNK_SIZE), dtype=np.int16) | |
audio_chunk = np.frombuffer(data, dtype=np.int16) | |
# Detect voice activity | |
# is_speech = vad.is_speech(audio_chunk.tobytes(), RATE) | |
try: | |
is_speech = vad.is_speech(audio_chunk, RATE) | |
except: | |
is_speech = False | |
vad_buffer.append(is_speech) | |
# If VAD buffer is full, check for silence and reset buffer | |
if len(vad_buffer) == BUFFER_SIZE: | |
# Check if buffer contains mostly silence | |
if vad_buffer.count(False) >= BUFFER_THRESHOLD: | |
# print("Slience") | |
# VAD_indicator.append(0) | |
# vad_buffer = [] | |
return(False) | |
else: | |
# print("Voice detected!") | |
# VAD_indicator.append(1) | |
vad_buffer = vad_buffer[CHUNK_SIZE // BUFFER_SIZE:] | |
return(True) | |
data = wf.readframes(CHUNK_SIZE) | |