from transformers import ( WhisperForConditionalGeneration, WhisperProcessor, WhisperConfig, ) import torch import ffmpeg import torch import torch.nn.functional as F import numpy as np import os # load_audio and pad_or_trim functions SAMPLE_RATE = 16000 CHUNK_LENGTH = 30 # 30-second chunks N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE # 480000 samples in a 30-second chunk # audio = whisper.load_audio('test.wav') def load_audio(file: str, sr: int = SAMPLE_RATE, start_time: int = 0, dtype=np.float16): """ Load an audio file into a numpy array at the specified sampling rate. """ try: # This launches a subprocess to decode audio while down-mixing and resampling as necessary. # Requires the ffmpeg CLI and `ffmpeg-python` package to be installed. out, _ = ( ffmpeg.input(file, ss=start_time, threads=0) .output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr) .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True) ) except ffmpeg.Error as e: raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e # return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0 return np.frombuffer(out, np.int16).flatten().astype(dtype) / 32768.0 # audio = whisper.pad_or_trim(audio) def pad_or_trim(array, length: int = N_SAMPLES, *, axis: int = -1): """ Pad or trim the audio array to N_SAMPLES, as expected by the encoder. """ if torch.is_tensor(array): if array.shape[axis] > length: array = array.index_select( dim=axis, index=torch.arange(length, device=array.device) ) if array.shape[axis] < length: pad_widths = [(0, 0)] * array.ndim pad_widths[axis] = (0, length - array.shape[axis]) array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes]) else: if array.shape[axis] > length: array = array.take(indices=range(length), axis=axis) if array.shape[axis] < length: pad_widths = [(0, 0)] * array.ndim pad_widths[axis] = (0, length - array.shape[axis]) array = np.pad(array, pad_widths) return array class Model: def __init__( self, model_name_or_path: str, cuda_visible_device: str = "0", device: str = "cuda", # torch.device("cuda" if torch.cuda.is_available() else "cpu") ): os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_device self.DEVICE = device self.processor = WhisperProcessor.from_pretrained(model_name_or_path) self.tokenizer = self.processor.tokenizer self.config = WhisperConfig.from_pretrained(model_name_or_path) self.model = WhisperForConditionalGeneration( config=self.config ).from_pretrained( pretrained_model_name_or_path=model_name_or_path, torch_dtype=self.config.torch_dtype, # device_map=DEVICE, # 'balanced', 'balanced_low_0', 'sequential', 'cuda', 'cpu' low_cpu_mem_usage=True, ) # Move model to GPU if self.model.device.type != self.DEVICE: print(f"Moving model to {self.DEVICE}") self.model = self.model.to(self.DEVICE) self.model.eval() else: print(f"Model is already on {self.DEVICE}") self.model.eval() print("dtype of model acc to config: ", self.config.torch_dtype) print("dtype of loaded model: ", self.model.dtype) def transcribe( self, audio, language: str = "english", skip_special_tokens: bool = True ) -> str: input_features = ( self.processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt") .input_features.half() .to(self.DEVICE) ) with torch.no_grad(): predicted_ids = self.model.generate( input_features, num_beams=1, language=language, task="transcribe", use_cache=True, is_multilingual=True, return_timestamps=True, ) transcription = self.tokenizer.batch_decode( predicted_ids, skip_special_tokens=skip_special_tokens )[0] return transcription.strip()