File size: 6,792 Bytes
6ed2820
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a4b0fcb
 
6ed2820
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a4b0fcb
 
6ed2820
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a4b0fcb
 
6ed2820
 
 
 
a4b0fcb
6ed2820
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import numpy as np
import mmcv
from pathlib import Path
from collections import namedtuple
import cv2 as cv
from tqdm import tqdm
from mmengine.registry import init_default_scope
from mmengine.visualization import Visualizer
from mmpose.apis import inference_topdown, init_model
from mmdet.apis import inference_detector, init_detector
from .utils import filter_by_catgory, filter_by_score, Timer
from .apis import build_onnx_model_and_task_processor, inference_onnx_model


class PoseInferencer:
    def __init__(self,
                 det_cfg, 
                 pose_cfg,
                 device='cpu') -> None:
        # init
        self.det_model_cfg = det_cfg.model_cfg
        self.det_model_ckpt = det_cfg.model_ckpt
        self.pose_model_cfg = pose_cfg.model_cfg
        self.pose_model_ckpt = pose_cfg.model_ckpt
        
        self.detector = init_detector(self.det_model_cfg, 
                                      self.det_model_ckpt,
                                      device=device)
        self.pose_model = init_model(self.pose_model_cfg,
                                     self.pose_model_ckpt,
                                     device=device)
        # use this count to tell the progress
        self.video_count = 0

    def process_one_image(self, img):
        init_default_scope('mmdet')
        det_result = inference_detector(self.detector, img)
        det_inst = det_result.pred_instances.cpu().numpy()
        bboxes, scores, labels = (det_inst.bboxes,
                                  det_inst.scores,
                                  det_inst.labels)
        bboxes, scores, labels = filter_by_score(bboxes, scores,
                                                 labels, 0.5)
        bboxes, scores, labels = filter_by_catgory(bboxes, scores, labels, 
                                                   ['person'])
        # inference with pose model
        init_default_scope('mmpose')
        pose_result = inference_topdown(self.pose_model, img, bboxes)
        if len(pose_result) == 0:
            # no detection place holder
            keypoints = np.zeros((1, 17, 2))
            pts_scores = np.zeros((1, 17))
            bboxes = np.zeros((1, 4))
            scores = np.zeros((1, ))
            labels = np.zeros((1, ))
        else:
            keypoints = np.concatenate([r.pred_instances.keypoints 
                                            for r in pose_result])
            pts_scores = np.concatenate([r.pred_instances.keypoint_scores 
                                            for r in pose_result])

        DetInst = namedtuple('DetInst', ['bboxes', 'scores', 'labels'])
        PoseInst = namedtuple('PoseInst', ['keypoints', 'pts_scores'])
        return DetInst(bboxes, scores, labels), PoseInst(keypoints, pts_scores)

    def inference_video(self, video_path):
        """ Inference a video with detector and pose model
        Return:
            all_pose: a list of PoseInst, check the namedtuple definition
            all_det: a list of DetInst
        """
        video_reader = mmcv.VideoReader(video_path)
        all_pose, all_det = [], []

        for frame in tqdm(video_reader):
            # inference with detector
            det, pose = self.process_one_image(frame)
            all_pose.append(pose)
            all_det.append(det)

        return all_det, all_pose

class PoseInferencerV2:
    """ V2 Use onnx for detection model, still use pytorch for pose model.
    """
    def __init__(self,
                 det_cfg, 
                 pose_cfg,
                 device='cpu') -> None:
        # init
        self.det_deploy_cfg = det_cfg.deploy_cfg
        self.det_model_cfg = det_cfg.model_cfg
        self.det_backend_files = det_cfg.backend_files

        self.pose_model_cfg = pose_cfg.model_cfg
        self.pose_model_ckpt = pose_cfg.model_ckpt
        
        self.detector, self.task_processor = \
            build_onnx_model_and_task_processor(self.det_model_cfg,
                                                self.det_deploy_cfg,
                                                self.det_backend_files,
                                                device)
        self.pose_model = init_model(self.pose_model_cfg,
                                     self.pose_model_ckpt,
                                     device)
        # use this count to tell the progress
        self.video_count = 0

    def process_one_image(self, img):
        init_default_scope('mmdet')
        det_result = inference_onnx_model(self.detector,
                                          self.task_processor,
                                          self.det_deploy_cfg,
                                          img)
        det_inst = det_result[0].pred_instances.cpu().numpy()
        bboxes, scores, labels = (det_inst.bboxes,
                                  det_inst.scores,
                                  det_inst.labels)
        bboxes, scores, labels = filter_by_score(bboxes, scores,
                                                 labels, 0.5)
        bboxes, scores, labels = filter_by_catgory(bboxes, scores, labels, 
                                                    ['person'])
        # inference with pose model
        init_default_scope('mmpose')
        pose_result = inference_topdown(self.pose_model, img, bboxes)
        if len(pose_result) == 0:
            # no detection place holder
            keypoints = np.zeros((1, 17, 2))
            pts_scores = np.zeros((1, 17))
            bboxes = np.zeros((1, 4))
            scores = np.zeros((1, ))
            labels = np.zeros((1, ))
        else:
            keypoints = np.concatenate([r.pred_instances.keypoints 
                                            for r in pose_result])
            pts_scores = np.concatenate([r.pred_instances.keypoint_scores 
                                            for r in pose_result])

        DetInst = namedtuple('DetInst', ['bboxes', 'scores', 'labels'])
        PoseInst = namedtuple('PoseInst', ['keypoints', 'pts_scores'])
        return DetInst(bboxes, scores, labels), PoseInst(keypoints, pts_scores)

    def inference_video(self, video_path):
        """ Inference a video with detector and pose model
        Return:
            all_pose: a list of PoseInst, check the namedtuple definition
            all_det: a list of DetInst
        """
        video_reader = mmcv.VideoReader(video_path)
        all_pose, all_det = [], []

        count = self.video_count + 1
        for frame in tqdm(video_reader, desc=f'Inference video {count}'):
            # inference with detector
            det, pose = self.process_one_image(frame)
            all_pose.append(pose)
            all_det.append(det)
        self.video_count += 1

        return all_det, all_pose