Spaces:
Runtime error
Runtime error
import csv | |
import json | |
import math | |
import os | |
import struct | |
import time | |
import numpy as np | |
import pyaudio | |
import scipy.io.wavfile as wav | |
import sounddevice as sd | |
import soundfile as sf | |
import torch | |
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
from pydub import AudioSegment | |
from speechbrain.pretrained.interfaces import foreign_class | |
from tqdm.contrib import tqdm | |
from paraformer import AudioReader, CttPunctuator, FSMNVad, ParaformerOffline | |
recording = False | |
recorded_audio = [] | |
def check_prefix(string, prefix): | |
if string[:len(prefix)] == prefix: | |
return True | |
else: | |
return False | |
def luyin(self): | |
def callback(indata, frames, time, status): | |
if status: | |
print('录音错误:', status) | |
if recording: | |
# 将录音数据追加到变量中 | |
# if indata.copy()>1.5 or indata.copy()< -1.5: | |
arr = np.array(indata.copy()) # 假设数组中有416个元素 | |
sum_value = np.sum(arr) | |
recorded_audio.append(indata.copy()) | |
a = int(input('请输入数字1开始:')) | |
if a == 1: | |
recording = True | |
stream = sd.InputStream(callback=callback, channels=1, samplerate=self.sample_rate, blocksize=4096) | |
stream.start() | |
begin = time.time() | |
b = int(input('请输入数字2停止:')) | |
if b == 2: | |
recording = False | |
print("Stop recording") | |
stream.stop() | |
fina = time.time() | |
t = fina - begin | |
print('录音时间为%ds' % t) | |
# print(recorded_audio) | |
if len(recorded_audio) == 0: | |
return "none" | |
else: | |
signal = np.vstack(recorded_audio) | |
sf.write("out.wav",np.array(signal),self.sample_rate) | |
signal = torch.from_numpy(np.squeeze(signal)).float() | |
recorded_audio.clear() | |
return signal | |
class Recorder: | |
''' | |
Records audio from the microphone and returns the signal tensor. | |
''' | |
def __init__(self): | |
self.sample_rate = 16000 # sample rate for recording | |
self.channels = 1 # number of audio channels | |
def record(self,path): #数据处理 | |
signal2 = luyin(self) | |
return signal2 | |
class ContinuousInferencer: | |
''' | |
get the record signal continuously from the microphone, | |
and return the classification results | |
''' | |
def __init__(self): | |
self.recorder = Recorder() # create an instance of the Recorder class | |
self.classifier = foreign_class( | |
source="pretrained_models\\speechbrain\\emotion-recognition-wav2vec2-IEMOCAP", | |
pymodule_file="custom_interface.py", | |
classname="CustomEncoderWav2vec2Classifier", | |
savedir="pretrained_models\\speechbrain\\emotion-recognition-wav2vec2-IEMOCAP", | |
) | |
def classify_continuous(self): | |
''' | |
Record audio for a specified duration, at a specified interval, | |
and classify the recorded audio using the emotion recognition model. | |
''' | |
signal = self.recorder.record(0)#这是语音的输入 | |
audio = "out.wav" | |
speech, sample_rate = AudioReader.read_wav_file(audio) | |
if signal == "none": | |
return "none" | |
else: | |
segments = vad.segments_offline(speech) | |
text_results = "" | |
for part in segments: | |
_result = ASR_model.infer_offline( | |
speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" | |
) | |
text_results += punc.punctuate(_result)[0] | |
out_prob, score, index, text_lab = self.classifier.classify_batch(signal) | |
print(out_prob.squeeze(0).numpy(), text_lab[-1]) | |
print("文本内容:",text_results) | |
return out_prob.squeeze(0).numpy(), text_lab[-1] | |
if __name__ == "__main__": | |
print("inference start") | |
inferencer = ContinuousInferencer() | |
ASR_model = ParaformerOffline() | |
vad = FSMNVad() | |
punc = CttPunctuator() | |
while True: | |
res = inferencer.classify_continuous() | |