import torch from torchaudio import load as torchaudio_load from moviepy.editor import VideoFileClip from pyannote.audio import Pipeline from sklearn.preprocessing import LabelEncoder from librosa import load as librosa_load import librosa.display import math import pandas as pd import sys from tqdm import tqdm import numpy as np from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration, pipeline as transformers_pipeline import pickle """"Author: Frank""" def extract_s2t_features(gpu): model_name="medium" processor = Speech2TextProcessor.from_pretrained("facebook/s2t-{}-librispeech-asr".format(model_name)) model = Speech2TextForConditionalGeneration.from_pretrained("facebook/s2t-{}-librispeech-asr".format(model_name)) if gpu: model = model.cuda() model.load_state_dict(torch.load('s2t_model')) model.eval() sample_rate = 16000 embedding_window = 10 # in secs audio, _ = torchaudio_load('temp.wav') audio = torch.mean(audio, dim=0) embs = [] audio_clips = audio.split(embedding_window*sample_rate) if len(audio_clips) > 1: audio_clips = audio_clips[:-1] for clip in tqdm(audio_clips): with torch.no_grad(): inputs = processor(clip, sampling_rate=16000, return_tensors="pt") features = inputs["input_features"] decoder_input = torch.zeros(features.shape[:2], dtype=torch.int32) if gpu: features, decoder_input = features.cuda(), decoder_input.cuda() h = model.model(features, decoder_input_ids=decoder_input).last_hidden_state.cpu() emb = torch.mean(h,axis=1) embs.append(emb) return torch.cat(embs).numpy() """"Author: Sichao""" def extract_speaker_features(gpu): x , sample_rate = librosa_load('temp.wav') print('Input sample rate: {}, Length: {} s'.format(sample_rate, x.size/sample_rate)) # speaker diarization print('Start speaker diarization...') pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", use_auth_token='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') diarization = pipeline('temp.wav') speaker_per_sec_dict = {i: 'UNKNOWN' for i in range(0, math.ceil(x.size/sample_rate))} for turn, _, speaker in diarization.itertracks(yield_label=True): for clip_start in range(math.ceil(turn.start), math.ceil(turn.end)): if speaker_per_sec_dict[clip_start] == 'UNKNOWN': speaker_per_sec_dict[clip_start] = speaker elif speaker_per_sec_dict[clip_start] != speaker: speaker_per_sec_dict[clip_start] = speaker_per_sec_dict[clip_start] + ' ' + speaker speaker_per_clip = [] for i in range(0, math.ceil(x.size/sample_rate), 10): speakers = [] for j in range(10): if i + j in speaker_per_sec_dict and speaker_per_sec_dict[i + j] != 'UNKNOWN': speakers.append(speaker_per_sec_dict[i + j]) if len(speakers) > 0: is_single_speaker = all(s == speakers[0] for s in speakers) if is_single_speaker: speaker_per_clip.append(speakers[0]) else: speaker_per_clip.append('MULTI SPEAKER') else: speaker_per_clip.append('UNKNOWN') # Adult child classification print('Start adult child classification...') device = 0 if gpu else -1 audio_classifier = transformers_pipeline(task="audio-classification", model="bookbot/wav2vec2-adult-child-cls", device=device) clip_idxs = [i for i in range(0, math.ceil(x.size/sample_rate), 10)] classifications = [] for clip_start in tqdm(clip_idxs): with torch.no_grad(): preds = audio_classifier(x[clip_start*sample_rate:(clip_start + 10)*sample_rate]) preds = [{"score": round(pred["score"], 4), "label": pred["label"]} for pred in preds] classifications.append(preds[0]['label']) # output print('Output...') output = {'clip_start':clip_idxs, 'diarization':speaker_per_clip, 'adult_child_classification':classifications} output_df = pd.DataFrame(output) # Creating a instance of label Encoder. le = LabelEncoder() # encoder and return encoded label output_df['diarization_numeric'] = le.fit_transform(output_df['diarization']) output_df['adult_child_classification_numeric'] = le.fit_transform(output_df['adult_child_classification']) return output_df['diarization_numeric'].values, output_df['adult_child_classification_numeric'].values def audio_feature_extraction(input_path, gpu=False): output_path = 'audio_embedding' audioTrack = VideoFileClip(input_path).audio audioTrack.write_audiofile('temp.wav', codec='pcm_s16le', fps=16000) print('Extracting s2t features...') s2t_features = extract_s2t_features(gpu) print('Extracting speaker features...') diarization_features, adult_child_class_features = extract_speaker_features(gpu) if len(diarization_features) > 1: diarization_features, adult_child_class_features = diarization_features[:-1], adult_child_class_features[:-1] audio_features = np.concatenate((s2t_features, diarization_features[:, None], adult_child_class_features[:, None]), axis=1) with open(output_path, 'wb') as f: pickle.dump(audio_features, f) return output_path