File size: 2,916 Bytes
f78137c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b2ae017
f78137c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import whisper
import datetime
import subprocess
import wave
import contextlib


import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
from pyannote.audio import Audio
from pyannote.core import Segment
from sklearn.cluster import AgglomerativeClustering
import numpy as np

model = whisper.load_model("large-v2")
embedding_model = PretrainedSpeakerEmbedding( 
    "speechbrain/spkrec-ecapa-voxceleb",
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
)

def transcribe(audio, num_speakers):
  print(type(audio))
  path, error = convert_to_wav(audio)
  if error is not None:
    return error

  duration = get_duration(path)
  if duration > 4 * 60 * 60:
    return "Audio duration too long"

  result = model.transcribe(path)
  segments = result["segments"]

  num_speakers = min(max(round(num_speakers), 1), len(segments))
  if len(segments) == 1:
    segments[0]['speaker'] = 'SPEAKER 1'
  else:
    embeddings = make_embeddings(path, segments, duration)
    add_speaker_labels(segments, embeddings, num_speakers)
  output = get_output(segments)
  return output

def convert_to_wav(path):
  if path[-3:] != 'wav':
    new_path = '.'.join(path.split('.')[:-1]) + '.wav'
    try:
      subprocess.call(['ffmpeg', '-i', path, new_path, '-y'])
    except:
      return path, 'Error: Could not convert file to .wav'
    path = new_path
  return path, None

def get_duration(path):
  with contextlib.closing(wave.open(path,'r')) as f:
    frames = f.getnframes()
    rate = f.getframerate()
    return frames / float(rate)

def make_embeddings(path, segments, duration):
  embeddings = np.zeros(shape=(len(segments), 192))
  for i, segment in enumerate(segments):
    embeddings[i] = segment_embedding(path, segment, duration)
  return np.nan_to_num(embeddings)

audio = Audio()

def segment_embedding(path, segment, duration):
  start = segment["start"]
  # Whisper overshoots the end timestamp in the last segment
  end = min(duration, segment["end"])
  clip = Segment(start, end)
  waveform, sample_rate = audio.crop(path, clip)
  return embedding_model(waveform[None])

def add_speaker_labels(segments, embeddings, num_speakers):
  """Add speaker labels"""
  clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
  labels = clustering.labels_
  for i in range(len(segments)):
    segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

def time(secs):
  """Function to return time delta"""
  return datetime.timedelta(seconds=round(secs))

def get_output(segments):
  """Format and generate the output string"""
  output = ''
  for (i, segment) in enumerate(segments):
    if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]:
      if i != 0:
        output += '\n\n'
      output += segment["speaker"] + ' ' + str(time(segment["start"])) + '\n'
    output += segment["text"][1:] + ' '
  return output