File size: 5,017 Bytes
967796b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
CUDA: 12.1
cuDNN Version: 8.9.2.26_1.0-1_amd64
Tensorflow Version: 2.12.0
Torch Version: 2.1.0.dev20230606+cu121
Transformers Version: 4.30.2
BENCHMARK:
    - RAM: 2.8 GB
    - VRAM: 1812 MB
    - test.wav: 23 s 
        - GPU (3060) -> 1.1s    (TensorCore is used for fp16 inference)
        - GPU (1660S) -> 3.3s
        - CPU -> torch.float16 not supported on CPU (Ryzen 5 3600)
    - Punchuation: True
"""

from transformers import (
    WhisperForConditionalGeneration, WhisperProcessor, WhisperConfig
)
import torch
import ffmpeg
import torch
import torch.nn.functional as F
import numpy as np
import os

# load_audio and pad_or_trim functions
SAMPLE_RATE = 16000
CHUNK_LENGTH = 30  # 30-second chunks
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE  # 480000 samples in a 30-second chunk

class Model:
    def __init__(self, 
                 model_name_or_path: str, 
                 cuda_visible_device: str = "0", 
                 device: str = 'cuda'   # torch.device("cuda" if torch.cuda.is_available() else "cpu")
                 ):
        
        os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_device
        self.DEVICE = device
        
        self.processor = WhisperProcessor.from_pretrained(model_name_or_path)
        self.tokenizer = self.processor.tokenizer

        self.config = WhisperConfig.from_pretrained(model_name_or_path)

        self.model = WhisperForConditionalGeneration(
                config=self.config
            ).from_pretrained(
                            pretrained_model_name_or_path = model_name_or_path, 
                            torch_dtype = self.config.torch_dtype,
                            # device_map=DEVICE,      # 'balanced', 'balanced_low_0', 'sequential', 'cuda', 'cpu'
                            low_cpu_mem_usage = True,
                        )
            
        # Move model to GPU
        if self.model.device.type != self.DEVICE:
            print(f'Moving model to {self.DEVICE}')
            self.model = self.model.to(self.DEVICE)
            self.model.eval()

        else:
            print(f'Model is already on {self.DEVICE}')
            self.model.eval()
            
        print('dtype of model acc to config: ', self.config.torch_dtype)
        print('dtype of loaded model: ', self.model.dtype)
        
    
    # audio = whisper.load_audio('test.wav')
    def load_audio(self, file: str, sr: int = SAMPLE_RATE, start_time: int = 0, dtype=np.float16):
        try:
            # This launches a subprocess to decode audio while down-mixing and resampling as necessary.
            # Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
            out, _ = (
                ffmpeg.input(file, ss=start_time, threads=0)
                .output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr)
                .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
            )
        except ffmpeg.Error as e:
            raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

        # return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0
        return np.frombuffer(out, np.int16).flatten().astype(dtype) / 32768.0


    # audio = whisper.pad_or_trim(audio)
    def _pad_or_trim(self, array, length: int = N_SAMPLES, *, axis: int = -1):
        """
        Pad or trim the audio array to N_SAMPLES, as expected by the encoder.
        """
        if torch.is_tensor(array):
            if array.shape[axis] > length:
                array = array.index_select(
                    dim=axis, index=torch.arange(length, device=array.device)
                )

            if array.shape[axis] < length:
                pad_widths = [(0, 0)] * array.ndim
                pad_widths[axis] = (0, length - array.shape[axis])
                array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes])
        else:
            if array.shape[axis] > length:
                array = array.take(indices=range(length), axis=axis)

            if array.shape[axis] < length:
                pad_widths = [(0, 0)] * array.ndim
                pad_widths[axis] = (0, length - array.shape[axis])
                array = np.pad(array, pad_widths)

        return array
    
    def transcribe(self, audio: np.ndarray, language: str = "english"):
        # audio = load_audio(audio)
        audio = self._pad_or_trim(audio)
        input_features = self.processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt").input_features.half().to(self.DEVICE)
        with torch.no_grad():
            predicted_ids = self.model.generate(
                input_features,
                num_beams = 1,
                language=language,
                task="transcribe",
                use_cache=True,
                is_multilingual=True,
                return_timestamps=True,
            )
        
        transcription = self.tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0]
        return transcription.strip()