kotoba-whisper-v2.2 / pipeline /kotoba_whisper.py
asahi417's picture
init
bdaf5fc
raw
history blame
19 kB
import requests
from typing import Union, Optional, Dict, List, Any
from collections import defaultdict
import torch
import numpy as np
from transformers.pipelines.audio_utils import ffmpeg_read
from transformers.pipelines.automatic_speech_recognition import AutomaticSpeechRecognitionPipeline, chunk_iter
from transformers.utils import is_torchaudio_available
from transformers.modeling_utils import PreTrainedModel
from transformers.tokenization_utils import PreTrainedTokenizer
from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor
from pyannote.audio import Pipeline
from pyannote.core.annotation import Annotation
from punctuators.models import PunctCapSegModelONNX
from diarizers import SegmentationModel
class Punctuator:
ja_punctuations = ["!", "?", "、", "。"]
def __init__(self, model: str = "pcs_47lang"):
self.punctuation_model = PunctCapSegModelONNX.from_pretrained(model)
def punctuate(self, pipeline_chunk: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def validate_punctuation(raw: str, punctuated: str):
if 'unk' in punctuated.lower() or any(p in raw for p in self.ja_punctuations):
return raw
if punctuated.count("。") > 1:
ind = punctuated.rfind("。")
punctuated = punctuated.replace("。", "")
punctuated = punctuated[:ind] + "。" + punctuated[ind:]
return punctuated
text_edit = self.punctuation_model.infer([c['text'] for c in pipeline_chunk])
return [
{
'timestamp': c['timestamp'],
'speaker': c['speaker'],
'text': validate_punctuation(c['text'], "".join(e))
} for c, e in zip(pipeline_chunk, text_edit)
]
class SpeakerDiarization:
def __init__(self,
device: torch.device,
model_id: str = "pyannote/speaker-diarization-3.1",
model_id_diarizers: Optional[str] = None):
self.device = device
self.pipeline = Pipeline.from_pretrained(model_id)
self.pipeline = self.pipeline.to(self.device)
if model_id_diarizers:
self.pipeline._segmentation.model = SegmentationModel().from_pretrained(
model_id_diarizers
).to_pyannote_model().to(self.device)
def __call__(self,
audio: Union[torch.Tensor, np.ndarray],
sampling_rate: int,
num_speakers: Optional[int] = None,
min_speakers: Optional[int] = None,
max_speakers: Optional[int] = None) -> Annotation:
if sampling_rate is None:
raise ValueError("sampling_rate must be provided")
if type(audio) is np.ndarray:
audio = torch.as_tensor(audio)
audio = torch.as_tensor(audio, dtype=torch.float32)
if len(audio.shape) == 1:
audio = audio.unsqueeze(0)
elif len(audio.shape) > 3:
raise ValueError("audio shape must be (channel, time)")
audio = {"waveform": audio.to(self.device), "sample_rate": sampling_rate}
output = self.pipeline(audio, num_speakers=num_speakers, min_speakers=min_speakers, max_speakers=max_speakers)
return output
class KotobaWhisperPipeline(AutomaticSpeechRecognitionPipeline):
def __init__(self,
model: "PreTrainedModel",
model_pyannote: str = "pyannote/speaker-diarization-3.1",
model_diarizers: Optional[str] = "diarizers-community/speaker-segmentation-fine-tuned-callhome-jpn",
feature_extractor: Union["SequenceFeatureExtractor", str] = None,
tokenizer: Optional[PreTrainedTokenizer] = None,
device: Union[int, "torch.device"] = None,
device_pyannote: Union[int, "torch.device"] = None,
torch_dtype: Optional[Union[str, "torch.dtype"]] = None,
**kwargs):
self.type = "seq2seq_whisper"
if device is None:
device = "cpu"
if device_pyannote is None:
device_pyannote = device
if type(device_pyannote) is str:
device_pyannote = torch.device(device_pyannote)
self.model_speaker_diarization = SpeakerDiarization(
device=device_pyannote,
model_id=model_pyannote,
model_id_diarizers=model_diarizers
)
self.punctuator = None
super().__init__(
model=model,
feature_extractor=feature_extractor,
tokenizer=tokenizer,
device=device,
torch_dtype=torch_dtype,
**kwargs
)
def _sanitize_parameters(self,
chunk_length_s=None,
stride_length_s=None,
ignore_warning=None,
decoder_kwargs=None,
return_timestamps=None,
return_language=None,
generate_kwargs=None,
max_new_tokens=None,
add_punctuation: bool =False,
return_unique_speaker: bool =True,
num_speakers: Optional[int] = None,
min_speakers: Optional[int] = None,
max_speakers: Optional[int] = None):
# No parameters on this pipeline right now
preprocess_params = {}
if chunk_length_s is not None:
preprocess_params["chunk_length_s"] = chunk_length_s
if stride_length_s is not None:
preprocess_params["stride_length_s"] = stride_length_s
forward_params = defaultdict(dict)
if max_new_tokens is not None:
forward_params["max_new_tokens"] = max_new_tokens
if generate_kwargs is not None:
if max_new_tokens is not None and "max_new_tokens" in generate_kwargs:
raise ValueError(
"`max_new_tokens` is defined both as an argument and inside `generate_kwargs` argument, please use"
" only 1 version"
)
forward_params.update(generate_kwargs)
postprocess_params = {}
if decoder_kwargs is not None:
postprocess_params["decoder_kwargs"] = decoder_kwargs
if return_timestamps is not None:
# Check whether we have a valid setting for return_timestamps and throw an error before we perform a forward pass
if self.type == "seq2seq" and return_timestamps:
raise ValueError("We cannot return_timestamps yet on non-CTC models apart from Whisper!")
if self.type == "ctc_with_lm" and return_timestamps != "word":
raise ValueError("CTC with LM can only predict word level timestamps, set `return_timestamps='word'`")
if self.type == "ctc" and return_timestamps not in ["char", "word"]:
raise ValueError(
"CTC can either predict character level timestamps, or word level timestamps. "
"Set `return_timestamps='char'` or `return_timestamps='word'` as required."
)
if self.type == "seq2seq_whisper" and return_timestamps == "char":
raise ValueError(
"Whisper cannot return `char` timestamps, only word level or segment level timestamps. "
"Use `return_timestamps='word'` or `return_timestamps=True` respectively."
)
forward_params["return_timestamps"] = return_timestamps
postprocess_params["return_timestamps"] = return_timestamps
if return_language is not None:
if self.type != "seq2seq_whisper":
raise ValueError("Only Whisper can return language for now.")
postprocess_params["return_language"] = return_language
postprocess_params["return_language"] = return_language
postprocess_params["add_punctuation"] = add_punctuation
postprocess_params["return_unique_speaker"] = return_unique_speaker
postprocess_params["num_speakers"] = num_speakers
postprocess_params["min_speakers"] = min_speakers
postprocess_params["max_speakers"] = max_speakers
return preprocess_params, forward_params, postprocess_params
def preprocess(self, inputs, chunk_length_s=0, stride_length_s=None):
if isinstance(inputs, str):
if inputs.startswith("http://") or inputs.startswith("https://"):
# We need to actually check for a real protocol, otherwise it's impossible to use a local file
# like http_huggingface_co.png
inputs = requests.get(inputs).content
else:
with open(inputs, "rb") as f:
inputs = f.read()
if isinstance(inputs, bytes):
inputs = ffmpeg_read(inputs, self.feature_extractor.sampling_rate)
stride = None
extra = {}
if isinstance(inputs, dict):
stride = inputs.pop("stride", None)
# Accepting `"array"` which is the key defined in `datasets` for
# better integration
if not ("sampling_rate" in inputs and ("raw" in inputs or "array" in inputs)):
raise ValueError(
"When passing a dictionary to AutomaticSpeechRecognitionPipeline, the dict needs to contain a "
'"raw" key containing the numpy array representing the audio and a "sampling_rate" key, '
"containing the sampling_rate associated with that array"
)
_inputs = inputs.pop("raw", None)
if _inputs is None:
# Remove path which will not be used from `datasets`.
inputs.pop("path", None)
_inputs = inputs.pop("array", None)
in_sampling_rate = inputs.pop("sampling_rate")
extra = inputs
inputs = _inputs
if in_sampling_rate != self.feature_extractor.sampling_rate:
if is_torchaudio_available():
from torchaudio import functional as F
else:
raise ImportError(
"torchaudio is required to resample audio samples in AutomaticSpeechRecognitionPipeline. "
"The torchaudio package can be installed through: `pip install torchaudio`."
)
inputs = F.resample(
torch.from_numpy(inputs), in_sampling_rate, self.feature_extractor.sampling_rate
).numpy()
ratio = self.feature_extractor.sampling_rate / in_sampling_rate
else:
ratio = 1
if stride is not None:
if stride[0] + stride[1] > inputs.shape[0]:
raise ValueError("Stride is too large for input")
# Stride needs to get the chunk length here, it's going to get
# swallowed by the `feature_extractor` later, and then batching
# can add extra data in the inputs, so we need to keep track
# of the original length in the stride so we can cut properly.
stride = (inputs.shape[0], int(round(stride[0] * ratio)), int(round(stride[1] * ratio)))
if not isinstance(inputs, np.ndarray):
raise ValueError(f"We expect a numpy ndarray as input, got `{type(inputs)}`")
if len(inputs.shape) != 1:
raise ValueError("We expect a single channel audio input for AutomaticSpeechRecognitionPipeline")
if chunk_length_s:
if stride_length_s is None:
stride_length_s = chunk_length_s / 6
if isinstance(stride_length_s, (int, float)):
stride_length_s = [stride_length_s, stride_length_s]
# XXX: Carefuly, this variable will not exist in `seq2seq` setting.
# Currently chunking is not possible at this level for `seq2seq` so
# it's ok.
align_to = getattr(self.model.config, "inputs_to_logits_ratio", 1)
chunk_len = int(round(chunk_length_s * self.feature_extractor.sampling_rate / align_to) * align_to)
stride_left = int(round(stride_length_s[0] * self.feature_extractor.sampling_rate / align_to) * align_to)
stride_right = int(round(stride_length_s[1] * self.feature_extractor.sampling_rate / align_to) * align_to)
if chunk_len < stride_left + stride_right:
raise ValueError("Chunk length must be superior to stride length")
for item in chunk_iter(
inputs, self.feature_extractor, chunk_len, stride_left, stride_right, self.torch_dtype
):
item["audio_array"] = inputs
yield item
else:
if inputs.shape[0] > self.feature_extractor.n_samples:
processed = self.feature_extractor(
inputs,
sampling_rate=self.feature_extractor.sampling_rate,
truncation=False,
padding="longest",
return_tensors="pt",
)
else:
processed = self.feature_extractor(
inputs, sampling_rate=self.feature_extractor.sampling_rate, return_tensors="pt"
)
if self.torch_dtype is not None:
processed = processed.to(dtype=self.torch_dtype)
if stride is not None:
processed["stride"] = stride
yield {"is_last": True, "audio_array": inputs, **processed, **extra}
def _forward(self, model_inputs, **generate_kwargs):
attention_mask = model_inputs.pop("attention_mask", None)
stride = model_inputs.pop("stride", None)
is_last = model_inputs.pop("is_last")
audio_array = model_inputs.pop("audio_array")
encoder = self.model.get_encoder()
# Consume values so we can let extra information flow freely through
# the pipeline (important for `partial` in microphone)
if "input_features" in model_inputs:
inputs = model_inputs.pop("input_features")
elif "input_values" in model_inputs:
inputs = model_inputs.pop("input_values")
else:
raise ValueError(
"Seq2Seq speech recognition model requires either a "
f"`input_features` or `input_values` key, but only has {model_inputs.keys()}"
)
# custom processing for Whisper timestamps and word-level timestamps
generate_kwargs["return_timestamps"] = True
if inputs.shape[-1] > self.feature_extractor.nb_max_frames:
generate_kwargs["input_features"] = inputs
else:
generate_kwargs["encoder_outputs"] = encoder(inputs, attention_mask=attention_mask)
tokens = self.model.generate(attention_mask=attention_mask, **generate_kwargs)
# whisper longform generation stores timestamps in "segments"
out = {"tokens": tokens}
if self.type == "seq2seq_whisper":
if stride is not None:
out["stride"] = stride
# Leftover
extra = model_inputs
return {"is_last": is_last, "audio_array": audio_array, **out, **extra}
def postprocess(self,
model_outputs,
decoder_kwargs: Optional[Dict] = None,
return_language=None,
add_punctuation: bool = False,
return_unique_speaker: bool = True,
num_speakers: Optional[int] = None,
min_speakers: Optional[int] = None,
max_speakers: Optional[int] = None,
*args,
**kwargs):
assert len(model_outputs) > 0
outputs = super().postprocess(
model_outputs=model_outputs,
decoder_kwargs=decoder_kwargs,
return_timestamps=True,
return_language=return_language
)
audio_array = outputs.pop("audio_array")[0]
sd = self.model_speaker_diarization(
audio_array,
num_speakers=num_speakers,
min_speakers=min_speakers,
max_speakers=max_speakers,
sampling_rate=self.feature_extractor.sampling_rate
)
diarization_result = {s: [[i.start, i.end] for i in sd.label_timeline(s)] for s in sd.labels()}
timelines = sd.get_timeline()
pointer_ts = 0
pointer_chunk = 0
new_chunks = []
while True:
if pointer_ts == len(timelines):
ts = timelines[-1]
for chunk in outputs["chunks"][pointer_chunk:]:
chunk["speaker"] = sd.get_labels(ts)
new_chunks.append(chunk)
break
if pointer_chunk == len(outputs["chunks"]):
break
ts = timelines[pointer_ts]
chunk = outputs["chunks"][pointer_chunk]
if "speaker" not in chunk:
chunk["speaker"] = []
start, end = chunk["timestamp"]
if ts.end <= start:
pointer_ts += 1
elif end <= ts.start:
if len(chunk["speaker"]) == 0:
chunk["speaker"] += list(sd.get_labels(ts))
new_chunks.append(chunk)
pointer_chunk += 1
else:
chunk["speaker"] += list(sd.get_labels(ts))
if ts.end >= end:
new_chunks.append(chunk)
pointer_chunk += 1
else:
pointer_ts += 1
for i in new_chunks:
if "speaker" in i:
if return_unique_speaker:
i["speaker"] = [i["speaker"][0]]
else:
i["speaker"] = list(set(i["speaker"]))
else:
i["speaker"] = []
outputs["chunks"] = new_chunks
if add_punctuation:
if self.punctuator is None:
self.punctuator = Punctuator()
outputs["chunks"] = self.punctuator.punctuate(outputs["chunks"])
outputs["text"] = "".join([c["text"] for c in outputs["chunks"]])
outputs["speakers"] = sd.labels()
speakers = []
for s in outputs["speakers"]:
chunk_s = [c for c in outputs["chunks"] if s in c["speaker"]]
if len(chunk_s) != 0:
outputs[f"chunks/{s}"] = chunk_s
outputs[f"text/{s}"] = "".join([c["text"] for c in outputs["chunks"] if s in c["speaker"]])
speakers.append(s)
outputs["speakers"] = speakers
outputs["diarization_result"] = diarization_result
return outputs