import requests from typing import Union, Optional, Dict, List, Any from collections import defaultdict import torch import numpy as np from transformers.pipelines.audio_utils import ffmpeg_read from transformers.pipelines.automatic_speech_recognition import AutomaticSpeechRecognitionPipeline, chunk_iter from transformers.utils import is_torchaudio_available from transformers.modeling_utils import PreTrainedModel from transformers.tokenization_utils import PreTrainedTokenizer from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor from pyannote.audio import Pipeline from pyannote.core.annotation import Annotation from punctuators.models import PunctCapSegModelONNX from diarizers import SegmentationModel class Punctuator: ja_punctuations = ["!", "?", "、", "。"] def __init__(self, model: str = "pcs_47lang"): self.punctuation_model = PunctCapSegModelONNX.from_pretrained(model) def punctuate(self, pipeline_chunk: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def validate_punctuation(raw: str, punctuated: str): if 'unk' in punctuated.lower() or any(p in raw for p in self.ja_punctuations): return raw if punctuated.count("。") > 1: ind = punctuated.rfind("。") punctuated = punctuated.replace("。", "") punctuated = punctuated[:ind] + "。" + punctuated[ind:] return punctuated text_edit = self.punctuation_model.infer([c['text'] for c in pipeline_chunk]) return [ { 'timestamp': c['timestamp'], 'speaker': c['speaker'], 'text': validate_punctuation(c['text'], "".join(e)) } for c, e in zip(pipeline_chunk, text_edit) ] class SpeakerDiarization: def __init__(self, device: torch.device, model_id: str = "pyannote/speaker-diarization-3.1", model_id_diarizers: Optional[str] = None): self.device = device self.pipeline = Pipeline.from_pretrained(model_id) self.pipeline = self.pipeline.to(self.device) if model_id_diarizers: self.pipeline._segmentation.model = SegmentationModel().from_pretrained( model_id_diarizers ).to_pyannote_model().to(self.device) def __call__(self, audio: Union[torch.Tensor, np.ndarray], sampling_rate: int, num_speakers: Optional[int] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None) -> Annotation: if sampling_rate is None: raise ValueError("sampling_rate must be provided") if type(audio) is np.ndarray: audio = torch.as_tensor(audio) audio = torch.as_tensor(audio, dtype=torch.float32) if len(audio.shape) == 1: audio = audio.unsqueeze(0) elif len(audio.shape) > 3: raise ValueError("audio shape must be (channel, time)") audio = {"waveform": audio.to(self.device), "sample_rate": sampling_rate} output = self.pipeline(audio, num_speakers=num_speakers, min_speakers=min_speakers, max_speakers=max_speakers) return output class KotobaWhisperPipeline(AutomaticSpeechRecognitionPipeline): def __init__(self, model: "PreTrainedModel", model_pyannote: str = "pyannote/speaker-diarization-3.1", model_diarizers: Optional[str] = "diarizers-community/speaker-segmentation-fine-tuned-callhome-jpn", feature_extractor: Union["SequenceFeatureExtractor", str] = None, tokenizer: Optional[PreTrainedTokenizer] = None, device: Union[int, "torch.device"] = None, device_pyannote: Union[int, "torch.device"] = None, torch_dtype: Optional[Union[str, "torch.dtype"]] = None, **kwargs): self.type = "seq2seq_whisper" if device is None: device = "cpu" if device_pyannote is None: device_pyannote = device if type(device_pyannote) is str: device_pyannote = torch.device(device_pyannote) self.model_speaker_diarization = SpeakerDiarization( device=device_pyannote, model_id=model_pyannote, model_id_diarizers=model_diarizers ) self.punctuator = None super().__init__( model=model, feature_extractor=feature_extractor, tokenizer=tokenizer, device=device, torch_dtype=torch_dtype, **kwargs ) def _sanitize_parameters(self, chunk_length_s=None, stride_length_s=None, ignore_warning=None, decoder_kwargs=None, return_timestamps=None, return_language=None, generate_kwargs=None, max_new_tokens=None, add_punctuation: bool =False, return_unique_speaker: bool =True, num_speakers: Optional[int] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None): # No parameters on this pipeline right now preprocess_params = {} if chunk_length_s is not None: preprocess_params["chunk_length_s"] = chunk_length_s if stride_length_s is not None: preprocess_params["stride_length_s"] = stride_length_s forward_params = defaultdict(dict) if max_new_tokens is not None: forward_params["max_new_tokens"] = max_new_tokens if generate_kwargs is not None: if max_new_tokens is not None and "max_new_tokens" in generate_kwargs: raise ValueError( "`max_new_tokens` is defined both as an argument and inside `generate_kwargs` argument, please use" " only 1 version" ) forward_params.update(generate_kwargs) postprocess_params = {} if decoder_kwargs is not None: postprocess_params["decoder_kwargs"] = decoder_kwargs if return_timestamps is not None: # Check whether we have a valid setting for return_timestamps and throw an error before we perform a forward pass if self.type == "seq2seq" and return_timestamps: raise ValueError("We cannot return_timestamps yet on non-CTC models apart from Whisper!") if self.type == "ctc_with_lm" and return_timestamps != "word": raise ValueError("CTC with LM can only predict word level timestamps, set `return_timestamps='word'`") if self.type == "ctc" and return_timestamps not in ["char", "word"]: raise ValueError( "CTC can either predict character level timestamps, or word level timestamps. " "Set `return_timestamps='char'` or `return_timestamps='word'` as required." ) if self.type == "seq2seq_whisper" and return_timestamps == "char": raise ValueError( "Whisper cannot return `char` timestamps, only word level or segment level timestamps. " "Use `return_timestamps='word'` or `return_timestamps=True` respectively." ) forward_params["return_timestamps"] = return_timestamps postprocess_params["return_timestamps"] = return_timestamps if return_language is not None: if self.type != "seq2seq_whisper": raise ValueError("Only Whisper can return language for now.") postprocess_params["return_language"] = return_language postprocess_params["return_language"] = return_language postprocess_params["add_punctuation"] = add_punctuation postprocess_params["return_unique_speaker"] = return_unique_speaker postprocess_params["num_speakers"] = num_speakers postprocess_params["min_speakers"] = min_speakers postprocess_params["max_speakers"] = max_speakers return preprocess_params, forward_params, postprocess_params def preprocess(self, inputs, chunk_length_s=0, stride_length_s=None): if isinstance(inputs, str): if inputs.startswith("http://") or inputs.startswith("https://"): # We need to actually check for a real protocol, otherwise it's impossible to use a local file # like http_huggingface_co.png inputs = requests.get(inputs).content else: with open(inputs, "rb") as f: inputs = f.read() if isinstance(inputs, bytes): inputs = ffmpeg_read(inputs, self.feature_extractor.sampling_rate) stride = None extra = {} if isinstance(inputs, dict): stride = inputs.pop("stride", None) # Accepting `"array"` which is the key defined in `datasets` for # better integration if not ("sampling_rate" in inputs and ("raw" in inputs or "array" in inputs)): raise ValueError( "When passing a dictionary to AutomaticSpeechRecognitionPipeline, the dict needs to contain a " '"raw" key containing the numpy array representing the audio and a "sampling_rate" key, ' "containing the sampling_rate associated with that array" ) _inputs = inputs.pop("raw", None) if _inputs is None: # Remove path which will not be used from `datasets`. inputs.pop("path", None) _inputs = inputs.pop("array", None) in_sampling_rate = inputs.pop("sampling_rate") extra = inputs inputs = _inputs if in_sampling_rate != self.feature_extractor.sampling_rate: if is_torchaudio_available(): from torchaudio import functional as F else: raise ImportError( "torchaudio is required to resample audio samples in AutomaticSpeechRecognitionPipeline. " "The torchaudio package can be installed through: `pip install torchaudio`." ) inputs = F.resample( torch.from_numpy(inputs), in_sampling_rate, self.feature_extractor.sampling_rate ).numpy() ratio = self.feature_extractor.sampling_rate / in_sampling_rate else: ratio = 1 if stride is not None: if stride[0] + stride[1] > inputs.shape[0]: raise ValueError("Stride is too large for input") # Stride needs to get the chunk length here, it's going to get # swallowed by the `feature_extractor` later, and then batching # can add extra data in the inputs, so we need to keep track # of the original length in the stride so we can cut properly. stride = (inputs.shape[0], int(round(stride[0] * ratio)), int(round(stride[1] * ratio))) if not isinstance(inputs, np.ndarray): raise ValueError(f"We expect a numpy ndarray as input, got `{type(inputs)}`") if len(inputs.shape) != 1: raise ValueError("We expect a single channel audio input for AutomaticSpeechRecognitionPipeline") if chunk_length_s: if stride_length_s is None: stride_length_s = chunk_length_s / 6 if isinstance(stride_length_s, (int, float)): stride_length_s = [stride_length_s, stride_length_s] # XXX: Carefuly, this variable will not exist in `seq2seq` setting. # Currently chunking is not possible at this level for `seq2seq` so # it's ok. align_to = getattr(self.model.config, "inputs_to_logits_ratio", 1) chunk_len = int(round(chunk_length_s * self.feature_extractor.sampling_rate / align_to) * align_to) stride_left = int(round(stride_length_s[0] * self.feature_extractor.sampling_rate / align_to) * align_to) stride_right = int(round(stride_length_s[1] * self.feature_extractor.sampling_rate / align_to) * align_to) if chunk_len < stride_left + stride_right: raise ValueError("Chunk length must be superior to stride length") for item in chunk_iter( inputs, self.feature_extractor, chunk_len, stride_left, stride_right, self.torch_dtype ): item["audio_array"] = inputs yield item else: if inputs.shape[0] > self.feature_extractor.n_samples: processed = self.feature_extractor( inputs, sampling_rate=self.feature_extractor.sampling_rate, truncation=False, padding="longest", return_tensors="pt", ) else: processed = self.feature_extractor( inputs, sampling_rate=self.feature_extractor.sampling_rate, return_tensors="pt" ) if self.torch_dtype is not None: processed = processed.to(dtype=self.torch_dtype) if stride is not None: processed["stride"] = stride yield {"is_last": True, "audio_array": inputs, **processed, **extra} def _forward(self, model_inputs, **generate_kwargs): attention_mask = model_inputs.pop("attention_mask", None) stride = model_inputs.pop("stride", None) is_last = model_inputs.pop("is_last") audio_array = model_inputs.pop("audio_array") encoder = self.model.get_encoder() # Consume values so we can let extra information flow freely through # the pipeline (important for `partial` in microphone) if "input_features" in model_inputs: inputs = model_inputs.pop("input_features") elif "input_values" in model_inputs: inputs = model_inputs.pop("input_values") else: raise ValueError( "Seq2Seq speech recognition model requires either a " f"`input_features` or `input_values` key, but only has {model_inputs.keys()}" ) # custom processing for Whisper timestamps and word-level timestamps generate_kwargs["return_timestamps"] = True if inputs.shape[-1] > self.feature_extractor.nb_max_frames: generate_kwargs["input_features"] = inputs else: generate_kwargs["encoder_outputs"] = encoder(inputs, attention_mask=attention_mask) tokens = self.model.generate(attention_mask=attention_mask, **generate_kwargs) # whisper longform generation stores timestamps in "segments" out = {"tokens": tokens} if self.type == "seq2seq_whisper": if stride is not None: out["stride"] = stride # Leftover extra = model_inputs return {"is_last": is_last, "audio_array": audio_array, **out, **extra} def postprocess(self, model_outputs, decoder_kwargs: Optional[Dict] = None, return_language=None, add_punctuation: bool = False, return_unique_speaker: bool = True, num_speakers: Optional[int] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None, *args, **kwargs): assert len(model_outputs) > 0 outputs = super().postprocess( model_outputs=model_outputs, decoder_kwargs=decoder_kwargs, return_timestamps=True, return_language=return_language ) audio_array = outputs.pop("audio_array")[0] sd = self.model_speaker_diarization( audio_array, num_speakers=num_speakers, min_speakers=min_speakers, max_speakers=max_speakers, sampling_rate=self.feature_extractor.sampling_rate ) diarization_result = {s: [[i.start, i.end] for i in sd.label_timeline(s)] for s in sd.labels()} timelines = sd.get_timeline() pointer_ts = 0 pointer_chunk = 0 new_chunks = [] while True: if pointer_ts == len(timelines): ts = timelines[-1] for chunk in outputs["chunks"][pointer_chunk:]: chunk["speaker"] = sd.get_labels(ts) new_chunks.append(chunk) break if pointer_chunk == len(outputs["chunks"]): break ts = timelines[pointer_ts] chunk = outputs["chunks"][pointer_chunk] if "speaker" not in chunk: chunk["speaker"] = [] start, end = chunk["timestamp"] if ts.end <= start: pointer_ts += 1 elif end <= ts.start: if len(chunk["speaker"]) == 0: chunk["speaker"] += list(sd.get_labels(ts)) new_chunks.append(chunk) pointer_chunk += 1 else: chunk["speaker"] += list(sd.get_labels(ts)) if ts.end >= end: new_chunks.append(chunk) pointer_chunk += 1 else: pointer_ts += 1 for i in new_chunks: if "speaker" in i: if return_unique_speaker: i["speaker"] = [i["speaker"][0]] else: i["speaker"] = list(set(i["speaker"])) else: i["speaker"] = [] outputs["chunks"] = new_chunks if add_punctuation: if self.punctuator is None: self.punctuator = Punctuator() outputs["chunks"] = self.punctuator.punctuate(outputs["chunks"]) outputs["text"] = "".join([c["text"] for c in outputs["chunks"]]) outputs["speakers"] = sd.labels() speakers = [] for s in outputs["speakers"]: chunk_s = [c for c in outputs["chunks"] if s in c["speaker"]] if len(chunk_s) != 0: outputs[f"chunks/{s}"] = chunk_s outputs[f"text/{s}"] = "".join([c["text"] for c in outputs["chunks"] if s in c["speaker"]]) speakers.append(s) outputs["speakers"] = speakers outputs["diarization_result"] = diarization_result return outputs