SpeechDepression / speech.py
Liusuthu's picture
Update speech.py
587502c verified
raw
history blame contribute delete
No virus
7.93 kB
import os
# from sys import path
# path.append(r"./pretrained_models")
import gradio as gr
import numpy as np
import soundfile as sf
import scipy.io.wavfile as wav
# import torch
# import csv
# import json
# import math
# import os
# import struct
import torchaudio
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# from pydub import AudioSegment
# import pyaudio
from speechbrain.pretrained.interfaces import foreign_class
# import scipy.io.wavfile as wav
from paraformer import AudioReader, CttPunctuator, FSMNVad, ParaformerOffline
from gradio_client import Client
client = Client("Liusuthu/TextDepression")
sum1=0
sum2=0
sum3=0
# import time
os.environ["no_proxy"] = "localhost,127.0.0.1,::1"
classifier = foreign_class(
source="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", # ".\\emotion-recognition-wav2vec2-IEMOCAP"
pymodule_file="custom_interface.py",
classname="CustomEncoderWav2vec2Classifier",
savedir="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP",
)
ASR_model = ParaformerOffline()
vad = FSMNVad()
punc = CttPunctuator()
#####################################用于测试准确度############################################
def text_api(text:str):
result = client.predict(
text, # str in '输入文字' Textbox component
api_name="/predict",
)
return result
def text_score(text):
if text==None:
gr.Warning("提交内容为空!")
else:
string=text_api(text)
print(string)
if string=="[{}]":
text="空"
score=0
else:
part1 = str.partition(string, r"text")
want1 = part1[2]
label = want1[4:6]
part2 = str.partition(string, r"probability")
want2 = part2[2]
prob = float(want2[3:-4])
if label=="正向":
score=-np.log10(prob*10)
else:
score=np.log10(prob*10)
# print("from func:text_score————,text:",text,",score:",score)
return text,score
def speech_score(audio):
global sum1,sum2,sum3
if audio==None:
gr.Warning("提交内容为空!请等待音频加载完毕后再尝试提交!")
else:
# print(type(audio))
# print(audio)
sample_rate, signal = audio # 这是语音的输入
signal = signal.astype(np.float32)
signal /= np.max(np.abs(signal))
sf.write("a.wav", signal, sample_rate)
signal, sample_rate = torchaudio.load("a.wav")
signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(
signal
)
torchaudio.save("out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16)
Audio = "out.wav"
speech, sample_rate = AudioReader.read_wav_file(Audio)
if signal == "none":
return "none", "none", "haha"
else:
segments = vad.segments_offline(speech)
text_results = ""
for part in segments:
_result = ASR_model.infer_offline(
speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开"
)
text_results += punc.punctuate(_result)[0]
out_prob, score, index, text_lab = classifier.classify_batch(signal1)
print("from func:speech_score————type and value of prob:")
print(type(out_prob.squeeze(0).numpy()))
print(out_prob.squeeze(0).numpy())
print("from func:speech_score————type and value of resul_label:")
print(type(text_lab[-1]))
print(text_lab[-1])
#return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio
prob=out_prob.squeeze(0).numpy()
#print(prob)
score2=10*prob[0]-10*prob[1]
if score2>=0:
score2=np.log10(score2)
else:
score2=-np.log10(-score2)
# print("from func:speech_score————score2:",score2)
# print("from func:speech_score————",text_lab[-1])
if text_results!="[{}]":
text,score1=text_score(text_results)
else:
text="空"
score1=0
# # text_emo=str(get_text_score(text_results))
# print("from func:speech_score————text:",text,",score1:",score1)
score=(1/3)*score1+(2/3)*score2
print("语音score:",score2,",文本score:",score1,",整体score:",score,",文本:",text)
sum1=sum1+int(score2>0)
sum2=sum2+int(score1>0)
sum3=sum3+int(score>0)
return text,score
#############################################################################################
def classify_continuous(audio):
print(type(audio))
print(audio)
sample_rate, signal = audio # 这是语音的输入
signal = signal.astype(np.float32)
signal /= np.max(np.abs(signal))
sf.write("a.wav", signal, sample_rate)
signal, sample_rate = torchaudio.load("a.wav")
signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(
signal
)
torchaudio.save("out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16)
Audio = "out.wav"
speech, sample_rate = AudioReader.read_wav_file(Audio)
if signal == "none":
return "none", "none", "haha"
else:
segments = vad.segments_offline(speech)
text_results = ""
for part in segments:
_result = ASR_model.infer_offline(
speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开"
)
text_results += punc.punctuate(_result)[0]
out_prob, score, index, text_lab = classifier.classify_batch(signal1)
return text_results, out_prob.squeeze(0).numpy(), text_lab[-1]
def run_results(file):
global sum1,sum2,sum3
sum1=0
sum2=0
sum3=0
for i in range(29):
print("第",i+1,"段音频:")
if i<9:
filename=file+"/0"+str(i+1)+".wav"
else:
filename=file+"/"+str(i+1)+".wav"
audio = wav.read(filename)
text,score=speech_score(audio)
# print(score)
with open("data.txt",'a', encoding="utf8") as f:
f.write(str(score)+'\n')
#处理完毕后
print("本次批量处理结果:")
with open("data.txt",'r', encoding="utf8") as f:
for j in f:
print(j,end="")
print("语音:",sum1,",文本:",sum2,",融合:",sum3)
return "测试完毕"
# with gr.Blocks() as demo:
# with gr.Row():
# with gr.Column(scale=1):
# with gr.Row():
# input_audio=gr.Audio(sources='microphone')
# with gr.Row():
# submit_button=gr.Button("提交音频")
# with gr.Column(scale=2):
# with gr.Row():
# with gr.Row():
# with gr.Column():
# with gr.Row():
# output_log1=gr.Textbox(interactive=True)
# with gr.Row():
# output_log2=gr.Textbox(interactive=False)
# with gr.Row():
# run_button=gr.Button("开始测试")
# submit_button.click(
# fn=speech_score,
# inputs=[input_audio],
# outputs=[output_log1,output_log2]
# )
# run_button.click(
# fn=run_results,
# inputs=[output_log1],
# outputs=[output_log2]
# )
demo = gr.Interface(
classify_continuous,
gr.Audio("02020027-HC/07.wav"),
[
gr.Text(label="语音识别结果"),
gr.Text(label="音频情感识别1"),
gr.Text(label="音频情感识别2"),
],
)
demo.launch()