Spaces:
Build error
Build error
import torch | |
from torchaudio import load as torchaudio_load | |
from moviepy.editor import VideoFileClip | |
from pyannote.audio import Pipeline | |
from sklearn.preprocessing import LabelEncoder | |
from librosa import load as librosa_load | |
import librosa.display | |
import math | |
import pandas as pd | |
import sys | |
from tqdm import tqdm | |
import numpy as np | |
from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration, pipeline as transformers_pipeline | |
import pickle | |
""""Author: Frank""" | |
def extract_s2t_features(gpu): | |
model_name="medium" | |
processor = Speech2TextProcessor.from_pretrained("facebook/s2t-{}-librispeech-asr".format(model_name)) | |
model = Speech2TextForConditionalGeneration.from_pretrained("facebook/s2t-{}-librispeech-asr".format(model_name)) | |
if gpu: | |
model = model.cuda() | |
model.load_state_dict(torch.load('s2t_model')) | |
model.eval() | |
sample_rate = 16000 | |
embedding_window = 10 # in secs | |
audio, _ = torchaudio_load('temp.wav') | |
audio = torch.mean(audio, dim=0) | |
embs = [] | |
audio_clips = audio.split(embedding_window*sample_rate) | |
if len(audio_clips) > 1: | |
audio_clips = audio_clips[:-1] | |
for clip in tqdm(audio_clips): | |
with torch.no_grad(): | |
inputs = processor(clip, sampling_rate=16000, return_tensors="pt") | |
features = inputs["input_features"] | |
decoder_input = torch.zeros(features.shape[:2], dtype=torch.int32) | |
if gpu: | |
features, decoder_input = features.cuda(), decoder_input.cuda() | |
h = model.model(features, decoder_input_ids=decoder_input).last_hidden_state.cpu() | |
emb = torch.mean(h,axis=1) | |
embs.append(emb) | |
return torch.cat(embs).numpy() | |
""""Author: Sichao""" | |
def extract_speaker_features(gpu): | |
x , sample_rate = librosa_load('temp.wav') | |
print('Input sample rate: {}, Length: {} s'.format(sample_rate, x.size/sample_rate)) | |
# speaker diarization | |
print('Start speaker diarization...') | |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", use_auth_token='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') | |
diarization = pipeline('temp.wav') | |
speaker_per_sec_dict = {i: 'UNKNOWN' for i in range(0, math.ceil(x.size/sample_rate))} | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
for clip_start in range(math.ceil(turn.start), math.ceil(turn.end)): | |
if speaker_per_sec_dict[clip_start] == 'UNKNOWN': | |
speaker_per_sec_dict[clip_start] = speaker | |
elif speaker_per_sec_dict[clip_start] != speaker: | |
speaker_per_sec_dict[clip_start] = speaker_per_sec_dict[clip_start] + ' ' + speaker | |
speaker_per_clip = [] | |
for i in range(0, math.ceil(x.size/sample_rate), 10): | |
speakers = [] | |
for j in range(10): | |
if i + j in speaker_per_sec_dict and speaker_per_sec_dict[i + j] != 'UNKNOWN': | |
speakers.append(speaker_per_sec_dict[i + j]) | |
if len(speakers) > 0: | |
is_single_speaker = all(s == speakers[0] for s in speakers) | |
if is_single_speaker: | |
speaker_per_clip.append(speakers[0]) | |
else: | |
speaker_per_clip.append('MULTI SPEAKER') | |
else: | |
speaker_per_clip.append('UNKNOWN') | |
# Adult child classification | |
print('Start adult child classification...') | |
device = 0 if gpu else -1 | |
audio_classifier = transformers_pipeline(task="audio-classification", model="bookbot/wav2vec2-adult-child-cls", device=device) | |
clip_idxs = [i for i in range(0, math.ceil(x.size/sample_rate), 10)] | |
classifications = [] | |
for clip_start in tqdm(clip_idxs): | |
with torch.no_grad(): | |
preds = audio_classifier(x[clip_start*sample_rate:(clip_start + 10)*sample_rate]) | |
preds = [{"score": round(pred["score"], 4), "label": pred["label"]} for pred in preds] | |
classifications.append(preds[0]['label']) | |
# output | |
print('Output...') | |
output = {'clip_start':clip_idxs, 'diarization':speaker_per_clip, 'adult_child_classification':classifications} | |
output_df = pd.DataFrame(output) | |
# Creating a instance of label Encoder. | |
le = LabelEncoder() | |
# encoder and return encoded label | |
output_df['diarization_numeric'] = le.fit_transform(output_df['diarization']) | |
output_df['adult_child_classification_numeric'] = le.fit_transform(output_df['adult_child_classification']) | |
return output_df['diarization_numeric'].values, output_df['adult_child_classification_numeric'].values | |
def audio_feature_extraction(input_path, gpu=False): | |
output_path = 'audio_embedding' | |
audioTrack = VideoFileClip(input_path).audio | |
audioTrack.write_audiofile('temp.wav', codec='pcm_s16le', fps=16000) | |
print('Extracting s2t features...') | |
s2t_features = extract_s2t_features(gpu) | |
print('Extracting speaker features...') | |
diarization_features, adult_child_class_features = extract_speaker_features(gpu) | |
if len(diarization_features) > 1: | |
diarization_features, adult_child_class_features = diarization_features[:-1], adult_child_class_features[:-1] | |
audio_features = np.concatenate((s2t_features, diarization_features[:, None], adult_child_class_features[:, None]), axis=1) | |
with open(output_path, 'wb') as f: | |
pickle.dump(audio_features, f) | |
return output_path | |