File size: 8,599 Bytes
9e56ba5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import torch.nn as nn
import torch
from pathlib import Path
import numpy as np
import torchvision.transforms as transforms
import cv2
import pandas as pd
import gdown
from os.path import exists as file_exists
from .deep.reid_model_factory import show_downloadeable_models, get_model_url, get_model_name

from torchreid.reid.utils import FeatureExtractor
from torchreid.reid.utils.tools import download_url


def check_suffix(file='yolov5s.pt', suffix=('.pt',), msg=''):
    # Check file(s) for acceptable suffix
    if file and suffix:
        if isinstance(suffix, str):
            suffix = [suffix]
        for f in file if isinstance(file, (list, tuple)) else [file]:
            s = Path(f).suffix.lower()  # file suffix
            if len(s):
                assert s in suffix, f"{msg}{f} acceptable suffix is {suffix}"


class ReIDDetectMultiBackend(nn.Module):
    # ReID models MultiBackend class for python inference on various backends
    def __init__(self, weights='osnet_x0_25_msmt17.pt', device=torch.device('cpu'), fp16=False):
        super().__init__()
        w = str(weights[0] if isinstance(weights, list) else weights)
        self.pt, self.jit, self.onnx, self.xml, self.engine, self.coreml, \
            self.saved_model, self.pb, self.tflite, self.edgetpu, self.tfjs = self.model_type(w)  # get backend
        
        if self.pt:  # PyTorch
            model_name = get_model_name(weights)
            model_url = get_model_url(weights)

            if not file_exists(weights) and model_url is not None:
                gdown.download(model_url, str(weights), quiet=False)
            elif file_exists(weights):
                pass
            elif model_url is None:
                print('No URL associated to the chosen DeepSort weights. Choose between:')
                show_downloadeable_models()
                exit()

            self.extractor = FeatureExtractor(
                # get rid of dataset information DeepSort model name
                model_name=model_name,
                model_path=weights,
                device=str(device)
            )
            self.extractor.model.half() if fp16 else  self.extractor.model.float()
        elif self.onnx:  # ONNX Runtime
            # LOGGER.info(f'Loading {w} for ONNX Runtime inference...')
            cuda = torch.cuda.is_available()
            #check_requirements(('onnx', 'onnxruntime-gpu' if cuda else 'onnxruntime'))
            import onnxruntime
            providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda else ['CPUExecutionProvider']
            self.session = onnxruntime.InferenceSession(w, providers=providers)
        
        elif self.tflite:
            try:  # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu
                from tflite_runtime.interpreter import Interpreter, load_delegate
            except ImportError:
                import tensorflow as tf
                Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate,
            self.interpreter = tf.lite.Interpreter(model_path=weights)
            self.interpreter.allocate_tensors()
            # Get input and output tensors.
            self.input_details = self.interpreter.get_input_details()
            self.output_details = self.interpreter.get_output_details()
            
            # Test model on random input data.
            input_data = np.array(np.random.random_sample((1,256,128,3)), dtype=np.float32)
            self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
            
            self.interpreter.invoke()

            # The function `get_tensor()` returns a copy of the tensor data.
            output_data = self.interpreter.get_tensor(self.output_details[0]['index'])
            print(output_data.shape)
        else:
            print('This model framework is not supported yet!')
            exit()
            
        pixel_mean=[0.485, 0.456, 0.406]
        pixel_std=[0.229, 0.224, 0.225]
        self.norm = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(pixel_mean, pixel_std),
        ])
        self.size = (256, 128)
        self.fp16 = fp16
        self.device = device
        
    def export_formats(self):
        # YOLOv5 export formats
        x = [
            ['PyTorch', '-', '.pt', True, True],
            ['TorchScript', 'torchscript', '.torchscript', True, True],
            ['ONNX', 'onnx', '.onnx', True, True],
            ['OpenVINO', 'openvino', '_openvino_model', True, False],
            ['TensorRT', 'engine', '.engine', False, True],
            ['CoreML', 'coreml', '.mlmodel', True, False],
            ['TensorFlow SavedModel', 'saved_model', '_saved_model', True, True],
            ['TensorFlow GraphDef', 'pb', '.pb', True, True],
            ['TensorFlow Lite', 'tflite', '.tflite', True, False],
            ['TensorFlow Edge TPU', 'edgetpu', '_edgetpu.tflite', False, False],
            ['TensorFlow.js', 'tfjs', '_web_model', False, False],]
        return pd.DataFrame(x, columns=['Format', 'Argument', 'Suffix', 'CPU', 'GPU'])
    

    def model_type(self, p='path/to/model.pt'):
        # Return model type from model path, i.e. path='path/to/model.onnx' -> type=onnx

        suffixes = list(self.export_formats().Suffix) + ['.xml']  # export suffixes
        check_suffix(p, suffixes)  # checks
        p = Path(p).name  # eliminate trailing separators
        pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, xml2 = (s in p for s in suffixes)
        xml |= xml2  # *_openvino_model or *.xml
        tflite &= not edgetpu  # *.tflite
        return pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs
    
    def warmup(self, imgsz=(1, 256, 128, 3)):
        # Warmup model by running inference once
        warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb
        if any(warmup_types) and self.device.type != 'cpu':
            im = torch.zeros(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device)  # input
            im = im.cpu().numpy()
            print(im.shape)
            for _ in range(2 if self.jit else 1):  #
                self.forward(im)  # warmup

    def preprocess(self, im_crops):
        def _resize(im, size):
            return cv2.resize(im.astype(np.float32), size)

        im = torch.cat([self.norm(_resize(im, self.size)).unsqueeze(0) for im in im_crops], dim=0).float()
        im = im.float().to(device=self.device)
        return im
    
    def forward(self, im_batch):
        im_batch = self.preprocess(im_batch)
        b, ch, h, w = im_batch.shape  # batch, channel, height, width
        features = []
        for i in range(0, im_batch.shape[0]):
            im = im_batch[i, :, :, :].unsqueeze(0)
            if self.fp16 and im.dtype != torch.float16:
                im = im.half()  # to FP16
            if self.pt:  # PyTorch
                y = self.extractor.model(im)[0]
            elif self.jit:  # TorchScript
                y = self.model(im)[0]
            elif self.onnx:  # ONNX Runtime
                im = im.permute(0, 1, 3, 2).cpu().numpy()  # torch to numpy  # torch to numpy
                y = self.session.run([self.session.get_outputs()[0].name], {self.session.get_inputs()[0].name: im})[0]
            elif self.xml:  # OpenVINO
                im = im.cpu().numpy()  # FP32
                y = self.executable_network([im])[self.output_layer]
            else:  # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)
                im = im.permute(0, 3, 2, 1).cpu().numpy()  # torch BCHW to numpy BHWC shape(1,320,192,3)
                input, output = self.input_details[0], self.output_details[0]
                int8 = input['dtype'] == np.uint8  # is TFLite quantized uint8 model
                if int8:
                    scale, zero_point = input['quantization']
                    im = (im / scale + zero_point).astype(np.uint8)  # de-scale
                self.interpreter.set_tensor(input['index'], im)
                self.interpreter.invoke()
                y = torch.tensor(self.interpreter.get_tensor(output['index']))
                if int8:
                    scale, zero_point = output['quantization']
                    y = (y.astype(np.float32) - zero_point) * scale  # re-scale
            
            if isinstance(y, np.ndarray):
                y = torch.tensor(y, device=self.device)
            features.append(y.squeeze())

        
        return features