fal2022-videoanalysis-v2 / audio_feature_extraction_final.py
Frank Pacini
copy repo
6155c0e
import torch
from torchaudio import load as torchaudio_load
from moviepy.editor import VideoFileClip
from pyannote.audio import Pipeline
from sklearn.preprocessing import LabelEncoder
from librosa import load as librosa_load
import librosa.display
import math
import pandas as pd
import sys
from tqdm import tqdm
import numpy as np
from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration, pipeline as transformers_pipeline
import pickle
""""Author: Frank"""
def extract_s2t_features(gpu):
model_name="medium"
processor = Speech2TextProcessor.from_pretrained("facebook/s2t-{}-librispeech-asr".format(model_name))
model = Speech2TextForConditionalGeneration.from_pretrained("facebook/s2t-{}-librispeech-asr".format(model_name))
if gpu:
model = model.cuda()
model.load_state_dict(torch.load('s2t_model'))
model.eval()
sample_rate = 16000
embedding_window = 10 # in secs
audio, _ = torchaudio_load('temp.wav')
audio = torch.mean(audio, dim=0)
embs = []
audio_clips = audio.split(embedding_window*sample_rate)
if len(audio_clips) > 1:
audio_clips = audio_clips[:-1]
for clip in tqdm(audio_clips):
with torch.no_grad():
inputs = processor(clip, sampling_rate=16000, return_tensors="pt")
features = inputs["input_features"]
decoder_input = torch.zeros(features.shape[:2], dtype=torch.int32)
if gpu:
features, decoder_input = features.cuda(), decoder_input.cuda()
h = model.model(features, decoder_input_ids=decoder_input).last_hidden_state.cpu()
emb = torch.mean(h,axis=1)
embs.append(emb)
return torch.cat(embs).numpy()
""""Author: Sichao"""
def extract_speaker_features(gpu):
x , sample_rate = librosa_load('temp.wav')
print('Input sample rate: {}, Length: {} s'.format(sample_rate, x.size/sample_rate))
# speaker diarization
print('Start speaker diarization...')
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", use_auth_token='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')
diarization = pipeline('temp.wav')
speaker_per_sec_dict = {i: 'UNKNOWN' for i in range(0, math.ceil(x.size/sample_rate))}
for turn, _, speaker in diarization.itertracks(yield_label=True):
for clip_start in range(math.ceil(turn.start), math.ceil(turn.end)):
if speaker_per_sec_dict[clip_start] == 'UNKNOWN':
speaker_per_sec_dict[clip_start] = speaker
elif speaker_per_sec_dict[clip_start] != speaker:
speaker_per_sec_dict[clip_start] = speaker_per_sec_dict[clip_start] + ' ' + speaker
speaker_per_clip = []
for i in range(0, math.ceil(x.size/sample_rate), 10):
speakers = []
for j in range(10):
if i + j in speaker_per_sec_dict and speaker_per_sec_dict[i + j] != 'UNKNOWN':
speakers.append(speaker_per_sec_dict[i + j])
if len(speakers) > 0:
is_single_speaker = all(s == speakers[0] for s in speakers)
if is_single_speaker:
speaker_per_clip.append(speakers[0])
else:
speaker_per_clip.append('MULTI SPEAKER')
else:
speaker_per_clip.append('UNKNOWN')
# Adult child classification
print('Start adult child classification...')
device = 0 if gpu else -1
audio_classifier = transformers_pipeline(task="audio-classification", model="bookbot/wav2vec2-adult-child-cls", device=device)
clip_idxs = [i for i in range(0, math.ceil(x.size/sample_rate), 10)]
classifications = []
for clip_start in tqdm(clip_idxs):
with torch.no_grad():
preds = audio_classifier(x[clip_start*sample_rate:(clip_start + 10)*sample_rate])
preds = [{"score": round(pred["score"], 4), "label": pred["label"]} for pred in preds]
classifications.append(preds[0]['label'])
# output
print('Output...')
output = {'clip_start':clip_idxs, 'diarization':speaker_per_clip, 'adult_child_classification':classifications}
output_df = pd.DataFrame(output)
# Creating a instance of label Encoder.
le = LabelEncoder()
# encoder and return encoded label
output_df['diarization_numeric'] = le.fit_transform(output_df['diarization'])
output_df['adult_child_classification_numeric'] = le.fit_transform(output_df['adult_child_classification'])
return output_df['diarization_numeric'].values, output_df['adult_child_classification_numeric'].values
def audio_feature_extraction(input_path, gpu=False):
output_path = 'audio_embedding'
audioTrack = VideoFileClip(input_path).audio
audioTrack.write_audiofile('temp.wav', codec='pcm_s16le', fps=16000)
print('Extracting s2t features...')
s2t_features = extract_s2t_features(gpu)
print('Extracting speaker features...')
diarization_features, adult_child_class_features = extract_speaker_features(gpu)
if len(diarization_features) > 1:
diarization_features, adult_child_class_features = diarization_features[:-1], adult_child_class_features[:-1]
audio_features = np.concatenate((s2t_features, diarization_features[:, None], adult_child_class_features[:, None]), axis=1)
with open(output_path, 'wb') as f:
pickle.dump(audio_features, f)
return output_path