import os # from sys import path # path.append(r"./pretrained_models") import gradio as gr import numpy as np import soundfile as sf import scipy.io.wavfile as wav # import torch # import csv # import json # import math # import os # import struct import torchaudio # logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) # from pydub import AudioSegment # import pyaudio from speechbrain.pretrained.interfaces import foreign_class # import scipy.io.wavfile as wav from paraformer import AudioReader, CttPunctuator, FSMNVad, ParaformerOffline from gradio_client import Client client = Client("Liusuthu/TextDepression") sum1=0 sum2=0 sum3=0 # import time os.environ["no_proxy"] = "localhost,127.0.0.1,::1" classifier = foreign_class( source="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", # ".\\emotion-recognition-wav2vec2-IEMOCAP" pymodule_file="custom_interface.py", classname="CustomEncoderWav2vec2Classifier", savedir="pretrained_models/local-speechbrain/emotion-recognition-wav2vec2-IEMOCAP", ) ASR_model = ParaformerOffline() vad = FSMNVad() punc = CttPunctuator() #####################################用于测试准确度############################################ def text_api(text:str): result = client.predict( text, # str in '输入文字' Textbox component api_name="/predict", ) return result def text_score(text): if text==None: gr.Warning("提交内容为空!") else: string=text_api(text) print(string) if string=="[{}]": text="空" score=0 else: part1 = str.partition(string, r"text") want1 = part1[2] label = want1[4:6] part2 = str.partition(string, r"probability") want2 = part2[2] prob = float(want2[3:-4]) if label=="正向": score=-np.log10(prob*10) else: score=np.log10(prob*10) # print("from func:text_score————,text:",text,",score:",score) return text,score def speech_score(audio): global sum1,sum2,sum3 if audio==None: gr.Warning("提交内容为空!请等待音频加载完毕后再尝试提交!") else: # print(type(audio)) # print(audio) sample_rate, signal = audio # 这是语音的输入 signal = signal.astype(np.float32) signal /= np.max(np.abs(signal)) sf.write("a.wav", signal, sample_rate) signal, sample_rate = torchaudio.load("a.wav") signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)( signal ) torchaudio.save("out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16) Audio = "out.wav" speech, sample_rate = AudioReader.read_wav_file(Audio) if signal == "none": return "none", "none", "haha" else: segments = vad.segments_offline(speech) text_results = "" for part in segments: _result = ASR_model.infer_offline( speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" ) text_results += punc.punctuate(_result)[0] out_prob, score, index, text_lab = classifier.classify_batch(signal1) print("from func:speech_score————type and value of prob:") print(type(out_prob.squeeze(0).numpy())) print(out_prob.squeeze(0).numpy()) print("from func:speech_score————type and value of resul_label:") print(type(text_lab[-1])) print(text_lab[-1]) #return text_results, out_prob.squeeze(0).numpy(), text_lab[-1], Audio prob=out_prob.squeeze(0).numpy() #print(prob) score2=10*prob[0]-10*prob[1] if score2>=0: score2=np.log10(score2) else: score2=-np.log10(-score2) # print("from func:speech_score————score2:",score2) # print("from func:speech_score————",text_lab[-1]) if text_results!="[{}]": text,score1=text_score(text_results) else: text="空" score1=0 # # text_emo=str(get_text_score(text_results)) # print("from func:speech_score————text:",text,",score1:",score1) score=(1/3)*score1+(2/3)*score2 print("语音score:",score2,",文本score:",score1,",整体score:",score,",文本:",text) sum1=sum1+int(score2>0) sum2=sum2+int(score1>0) sum3=sum3+int(score>0) return text,score ############################################################################################# def classify_continuous(audio): print(type(audio)) print(audio) sample_rate, signal = audio # 这是语音的输入 signal = signal.astype(np.float32) signal /= np.max(np.abs(signal)) sf.write("a.wav", signal, sample_rate) signal, sample_rate = torchaudio.load("a.wav") signal1 = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)( signal ) torchaudio.save("out.wav", signal1, 16000, encoding="PCM_S", bits_per_sample=16) Audio = "out.wav" speech, sample_rate = AudioReader.read_wav_file(Audio) if signal == "none": return "none", "none", "haha" else: segments = vad.segments_offline(speech) text_results = "" for part in segments: _result = ASR_model.infer_offline( speech[part[0] * 16 : part[1] * 16], hot_words="任意热词 空格分开" ) text_results += punc.punctuate(_result)[0] out_prob, score, index, text_lab = classifier.classify_batch(signal1) return text_results, out_prob.squeeze(0).numpy(), text_lab[-1] def run_results(file): global sum1,sum2,sum3 sum1=0 sum2=0 sum3=0 for i in range(29): print("第",i+1,"段音频:") if i<9: filename=file+"/0"+str(i+1)+".wav" else: filename=file+"/"+str(i+1)+".wav" audio = wav.read(filename) text,score=speech_score(audio) # print(score) with open("data.txt",'a', encoding="utf8") as f: f.write(str(score)+'\n') #处理完毕后 print("本次批量处理结果:") with open("data.txt",'r', encoding="utf8") as f: for j in f: print(j,end="") print("语音:",sum1,",文本:",sum2,",融合:",sum3) return "测试完毕" # with gr.Blocks() as demo: # with gr.Row(): # with gr.Column(scale=1): # with gr.Row(): # input_audio=gr.Audio(sources='microphone') # with gr.Row(): # submit_button=gr.Button("提交音频") # with gr.Column(scale=2): # with gr.Row(): # with gr.Row(): # with gr.Column(): # with gr.Row(): # output_log1=gr.Textbox(interactive=True) # with gr.Row(): # output_log2=gr.Textbox(interactive=False) # with gr.Row(): # run_button=gr.Button("开始测试") # submit_button.click( # fn=speech_score, # inputs=[input_audio], # outputs=[output_log1,output_log2] # ) # run_button.click( # fn=run_results, # inputs=[output_log1], # outputs=[output_log2] # ) demo = gr.Interface( classify_continuous, gr.Audio("02020027-HC/07.wav"), [ gr.Text(label="语音识别结果"), gr.Text(label="音频情感识别1"), gr.Text(label="音频情感识别2"), ], ) demo.launch()