ISL-SignLanguageTranslation / ISL_Model_parameter.py
sunilsarolkarcds
Added app files
cc08753
import keras
from keras.layers import TorchModuleWrapper
import numpy as np
import cv2
import torch
from scipy.ndimage.filters import gaussian_filter
import math
import os
import numpy as np
from skimage.measure import label
import util as util
class ISLSignPos(keras.Model):
def __init__(self,pt_body_model,pt_hand_model):
super().__init__()
self.pt_body = TorchModuleWrapper(pt_body_model)
self.pt_body.trainable=False
self.pt_hand = TorchModuleWrapper(pt_hand_model)
self.pt_hand.trainable=False
self.njoint_body = 26
self.npaf_body = 52
def call(self, oriImg):
candidate, subset = self.bodypos(oriImg.cpu().numpy())
hands_list = util.handDetect(candidate, subset, oriImg.cpu().numpy())
all_hand_peaks = []
for x, y, w, is_left in hands_list:
peaks = self.handpos(oriImg.cpu().numpy()[y:y+w, x:x+w, :])
peaks[:, 0] = np.where(peaks[:, 0]==0, peaks[:, 0], peaks[:, 0]+x)
peaks[:, 1] = np.where(peaks[:, 1]==0, peaks[:, 1], peaks[:, 1]+y)
all_hand_peaks.append(peaks)
return (candidate, subset,all_hand_peaks)
def bodypos(self, oriImg):
model_type = 'body25'
scale_search = [0.5]
boxsize = 368
stride = 8
padValue = 128
thre1 = 0.1
thre2 = 0.05
multiplier = [x * boxsize / oriImg.shape[0] for x in scale_search]
heatmap_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], self.njoint_body))
paf_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], self.npaf_body))
for m in range(len(multiplier)):
scale = multiplier[m]
imageToTest = cv2.resize(oriImg, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
imageToTest_padded, pad = util.padRightDownCorner(imageToTest, stride, padValue)
im = np.transpose(np.float32(imageToTest_padded[:, :, :, np.newaxis]), (3, 2, 0, 1)) / 256 - 0.5
im = np.ascontiguousarray(im)
data = torch.from_numpy(im).float()
if torch.cuda.is_available():
data = data.cuda()
with torch.no_grad():
Mconv7_stage6_L1, Mconv7_stage6_L2 = self.pt_body(data)
Mconv7_stage6_L1 = Mconv7_stage6_L1.cpu().numpy()
Mconv7_stage6_L2 = Mconv7_stage6_L2.cpu().numpy()
heatmap = np.transpose(np.squeeze(Mconv7_stage6_L2), (1, 2, 0)) # output 1 is heatmaps
heatmap = cv2.resize(heatmap, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
heatmap = heatmap[:imageToTest_padded.shape[0] - pad[2], :imageToTest_padded.shape[1] - pad[3], :]
heatmap = cv2.resize(heatmap, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv2.INTER_CUBIC)
paf = np.transpose(np.squeeze(Mconv7_stage6_L1), (1, 2, 0)) # output 0 is PAFs
paf = cv2.resize(paf, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
paf = paf[:imageToTest_padded.shape[0] - pad[2], :imageToTest_padded.shape[1] - pad[3], :]
paf = cv2.resize(paf, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv2.INTER_CUBIC)
heatmap_avg += heatmap_avg + heatmap / len(multiplier)
paf_avg += + paf / len(multiplier)
all_peaks = []
peak_counter = 0
for part in range(self.njoint_body-1):
map_ori = heatmap_avg[:, :, part]
one_heatmap = gaussian_filter(map_ori, sigma=3)
map_left = np.zeros(one_heatmap.shape)
map_left[1:, :] = one_heatmap[:-1, :]
map_right = np.zeros(one_heatmap.shape)
map_right[:-1, :] = one_heatmap[1:, :]
map_up = np.zeros(one_heatmap.shape)
map_up[:, 1:] = one_heatmap[:, :-1]
map_down = np.zeros(one_heatmap.shape)
map_down[:, :-1] = one_heatmap[:, 1:]
peaks_binary = np.logical_and.reduce(
(one_heatmap >= map_left, one_heatmap >= map_right, one_heatmap >= map_up, one_heatmap >= map_down, one_heatmap > thre1))
peaks = list(zip(np.nonzero(peaks_binary)[1], np.nonzero(peaks_binary)[0])) # note reverse
peaks_with_score = [x + (map_ori[x[1], x[0]],) for x in peaks]
peak_id = range(peak_counter, peak_counter + len(peaks))
peaks_with_score_and_id = [peaks_with_score[i] + (peak_id[i],) for i in range(len(peak_id))]
all_peaks.append(peaks_with_score_and_id)
peak_counter += len(peaks)
if model_type=='body25':
# find connection in the specified sequence, center 29 is in the position 15
limbSeq = [[1,0],[1,2],[2,3],[3,4],[1,5],[5,6],[6,7],[1,8],[8,9],[9,10],\
[10,11],[8,12],[12,13],[13,14],[0,15],[0,16],[15,17],[16,18],\
[11,24],[11,22],[14,21],[14,19],[22,23],[19,20]]
# the middle joints heatmap correpondence
mapIdx = [[30, 31],[14, 15],[16, 17],[18, 19],[22, 23],[24, 25],[26, 27],[0, 1],[6, 7],\
[2, 3],[4, 5], [8, 9],[10, 11],[12, 13],[32, 33],[34, 35],[36,37],[38,39],\
[50,51],[46,47],[44,45],[40,41],[48,49],[42,43]]
else:
# find connection in the specified sequence, center 29 is in the position 15
limbSeq = [[1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], \
[9, 10], [1, 11], [11, 12], [12, 13], [1, 0], [0, 14], [14, 16], \
[0, 15], [15, 17], [2, 16], [5, 17]]
# the middle joints heatmap correpondence
mapIdx = [[12, 13], [20, 21], [14, 15], [16, 17], [22, 23], [24, 25], [0, 1], [2, 3], \
[4, 5], [6, 7], [8, 9], [10, 11], [28, 29], [30, 31], [34, 35], [32, 33], \
[36, 37], [18, 19], [26, 27]]
connection_all = []
special_k = []
mid_num = 10
for k in range(len(mapIdx)):
score_mid = paf_avg[:, :, mapIdx[k]]
candA = all_peaks[limbSeq[k][0]]
candB = all_peaks[limbSeq[k][1]]
nA = len(candA)
nB = len(candB)
indexA, indexB = limbSeq[k]
if (nA != 0 and nB != 0):
connection_candidate = []
for i in range(nA):
for j in range(nB):
vec = np.subtract(candB[j][:2], candA[i][:2])
norm = math.sqrt(vec[0] * vec[0] + vec[1] * vec[1])
norm = max(0.001, norm)
vec = np.divide(vec, norm)
startend = list(zip(np.linspace(candA[i][0], candB[j][0], num=mid_num), \
np.linspace(candA[i][1], candB[j][1], num=mid_num)))
vec_x = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 0] \
for I in range(len(startend))])
vec_y = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 1] \
for I in range(len(startend))])
score_midpts = np.multiply(vec_x, vec[0]) + np.multiply(vec_y, vec[1])
score_with_dist_prior = sum(score_midpts) / len(score_midpts) + min(
0.5 * oriImg.shape[0] / norm - 1, 0)
criterion1 = len(np.nonzero(score_midpts > thre2)[0]) > 0.8 * len(score_midpts)
criterion2 = score_with_dist_prior > 0
if criterion1 and criterion2:
connection_candidate.append(
[i, j, score_with_dist_prior, score_with_dist_prior + candA[i][2] + candB[j][2]])
connection_candidate = sorted(connection_candidate, key=lambda x: x[2], reverse=True)
connection = np.zeros((0, 5))
for c in range(len(connection_candidate)):
i, j, s = connection_candidate[c][0:3]
if (i not in connection[:, 3] and j not in connection[:, 4]):
connection = np.vstack([connection, [candA[i][3], candB[j][3], s, i, j]])
if (len(connection) >= min(nA, nB)):
break
connection_all.append(connection)
else:
special_k.append(k)
connection_all.append([])
# last number in each row is the total parts number of that person
# the second last number in each row is the score of the overall configuration
subset = -1 * np.ones((0, self.njoint_body+1))
candidate = np.array([item for sublist in all_peaks for item in sublist])
for k in range(len(mapIdx)):
if k not in special_k:
partAs = connection_all[k][:, 0]
partBs = connection_all[k][:, 1]
indexA, indexB = np.array(limbSeq[k])
for i in range(len(connection_all[k])): # = 1:size(temp,1)
found = 0
subset_idx = [-1, -1]
for j in range(len(subset)): # 1:size(subset,1):
if subset[j][indexA] == partAs[i] or subset[j][indexB] == partBs[i]:
subset_idx[found] = j
found += 1
if found == 1:
j = subset_idx[0]
if subset[j][indexB] != partBs[i]:
subset[j][indexB] = partBs[i]
subset[j][-1] += 1
subset[j][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
elif found == 2: # if found 2 and disjoint, merge them
j1, j2 = subset_idx
membership = ((subset[j1] >= 0).astype(int) + (subset[j2] >= 0).astype(int))[:-2]
if len(np.nonzero(membership == 2)[0]) == 0: # merge
subset[j1][:-2] += (subset[j2][:-2] + 1)
subset[j1][-2:] += subset[j2][-2:]
subset[j1][-2] += connection_all[k][i][2]
subset = np.delete(subset, j2, 0)
else: # as like found == 1
subset[j1][indexB] = partBs[i]
subset[j1][-1] += 1
subset[j1][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
# if find no partA in the subset, create a new subset
elif not found and k < self.njoint_body-2:
row = -1 * np.ones(self.njoint_body+1)
row[indexA] = partAs[i]
row[indexB] = partBs[i]
row[-1] = 2
row[-2] = sum(candidate[connection_all[k][i, :2].astype(int), 2]) + connection_all[k][i][2]
subset = np.vstack([subset, row])
# delete some rows of subset which has few parts occur
deleteIdx = []
for i in range(len(subset)):
if subset[i][-1] < 4 or subset[i][-2] / subset[i][-1] < 0.4:
deleteIdx.append(i)
subset = np.delete(subset, deleteIdx, axis=0)
# subset: n*20 array, 0-17 is the index in candidate, 18 is the total score, 19 is the total parts
# candidate: x, y, score, id
return candidate, subset
def handpos(self, oriImg):
scale_search = [0.5, 1.0, 1.5, 2.0]
# scale_search = [0.5]
boxsize = 368
stride = 8
padValue = 128
thre = 0.05
multiplier = [x * boxsize / oriImg.shape[0] for x in scale_search]
heatmap_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], 22))
# paf_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], 38))
for m in range(len(multiplier)):
scale = multiplier[m]
imageToTest = cv2.resize(oriImg, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
imageToTest_padded, pad = util.padRightDownCorner(imageToTest, stride, padValue)
im = np.transpose(np.float32(imageToTest_padded[:, :, :, np.newaxis]), (3, 2, 0, 1)) / 256 - 0.5
im = np.ascontiguousarray(im)
data = torch.from_numpy(im).float()
if torch.cuda.is_available():
data = data.cuda()
# data = data.permute([2, 0, 1]).unsqueeze(0).float()
with torch.no_grad():
output = self.pt_hand(data).cpu().numpy()
# output = self.model(data).numpy()q
# extract outputs, resize, and remove padding
heatmap = np.transpose(np.squeeze(output), (1, 2, 0)) # output 1 is heatmaps
heatmap = cv2.resize(heatmap, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
heatmap = heatmap[:imageToTest_padded.shape[0] - pad[2], :imageToTest_padded.shape[1] - pad[3], :]
heatmap = cv2.resize(heatmap, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv2.INTER_CUBIC)
heatmap_avg += heatmap / len(multiplier)
all_peaks = []
for part in range(21):
map_ori = heatmap_avg[:, :, part]
one_heatmap = gaussian_filter(map_ori, sigma=3)
binary = np.ascontiguousarray(one_heatmap > thre, dtype=np.uint8)
# 全部小于阈值
if np.sum(binary) == 0:
all_peaks.append([0, 0])
continue
label_img, label_numbers = label(binary, return_num=True, connectivity=binary.ndim)
max_index = np.argmax([np.sum(map_ori[label_img == i]) for i in range(1, label_numbers + 1)]) + 1
label_img[label_img != max_index] = 0
map_ori[label_img == 0] = 0
y, x = util.npmax(map_ori)
all_peaks.append([x, y])
return np.array(all_peaks)
class ISLSignPosTranslator(keras.Model):
def __init__(self,body_model,hand_model, translation_model):
super().__init__()
self.pt_body = TorchModuleWrapper(body_model)
self.pt_body.trainable=False
self.pt_hand = TorchModuleWrapper(hand_model)
self.pt_hand.trainable=False
self.njoint_body = 26
self.npaf_body = 52
self.model_type='body25'
self.translation_layer=translation_model
def call(self, window):
window_size=20
window_features=[]
blank_frame=np.zeros((1,156))
for idx,frame in enumerate(window.cpu()):
# frame=frame.cpu().numpy()[:, :, ::-1]
candidate, subset = self.bodypos(frame.cpu().numpy())
hands_list = util.handDetect(candidate, subset, frame.cpu().numpy())
all_hand_peaks = []
for x, y, w, is_left in hands_list:
peaks = self.handpos(frame.cpu().numpy()[y:y+w, x:x+w, :])
peaks[:, 0] = np.where(peaks[:, 0]==0, peaks[:, 0], peaks[:, 0]+x)
peaks[:, 1] = np.where(peaks[:, 1]==0, peaks[:, 1], peaks[:, 1]+y)
all_hand_peaks.append(peaks)
(bodypose_circles,bodypose_sticks,)=util.get_bodypose(candidate, subset, self.model_type)
(handpose_edges,handpose_peaks)=util.get_handpose(all_hand_peaks,)
feature=self.populate_features(bodypose_circles,handpose_peaks)
window_features.append(feature)
if len(window_features)<window_size:
for _ in range(0,(window_size-window_features.shape[0])):
window_features.append(blank_frame)
# self.writer.close()
# timeseries=self.create_timeseries_data(feature)
# self.frame_to_window(feature)
# np.savetxt('C:/Users/spsar/OneDrive/Desktop/test/MVI_9590.MOV.window1.numpy',np.array(window_features))
return self.translation_layer(np.array(window_features).reshape(1,20,156))
def frame_to_window(self,frame):
"""
Converts a single frame to a rolling window array with zero padding.
Args:
frame: A numpy array representing a video frame.
window_size: The size of the rolling window (default: 20).
window (optional): An existing window array to add the frame to
(useful for maintaining rolling window state).
Returns:
A numpy array representing the rolling window with the added frame.
"""
# Shift the window elements by 1 (oldest frame is dropped)
self.window[:-1] = self.window[1:]
# Add the new frame to the end of the window
self.window[-1] = frame
def populate_features(self,bodypose_circles,handpose_peaks):
# X_body_test = [f'bodypeaks_x_{i}' for i in range(15)] + [f'bodypeaks_y_{i}' for i in range(15)]
# X_hand0_test = [f'hand0peaks_x_{i}' for i in range(21)] + [f'hand0peaks_y_{i}' for i in range(21)] + [f'hand0peaks_peaktxt{i}' for i in range(21)]
# X_hand1_test = [f'hand1peaks_x_{i}' for i in range(21)] + [f'hand1peaks_y_{i}' for i in range(21)] + [f'hand1peaks_peaktxt{i}' for i in range(21)]
# feature_columns_new = X_body_test + X_hand0_test + X_hand1_test
feature=[]
for idx in range(15):
if(idx<len(bodypose_circles)):
feature.append(bodypose_circles[idx][0])
else:
feature.append(0)
for idx in range(15):
if(idx<len(bodypose_circles)):
feature.append(bodypose_circles[idx][1])
else:
feature.append(0)
for hand_idx in range(2):
for idx in range(21):
if(idx<len(handpose_peaks[hand_idx])):
feature.append(float(handpose_peaks[hand_idx][idx][0]))
else:
feature.append(0)
for idx in range(21):
if(idx<len(handpose_peaks[hand_idx])):
feature.append(float(handpose_peaks[hand_idx][idx][1]))
else:
feature.append(0)
for idx in range(21):
if(idx<len(handpose_peaks[hand_idx])):
feature.append(float(handpose_peaks[hand_idx][idx][2]))
else:
feature.append(0)
# for idx in range(21):
# if(idx<len(handpose_peaks[1])):
# feature.append(handpose_peaks[1][idx][0])
# else:
# feature.append(0)
# for idx in range(21):
# if(idx<len(handpose_peaks[1])):
# feature.append(handpose_peaks[1][idx][1])
# else:
# feature.append(0)
# for idx in range(21):
# if(idx<len(handpose_peaks[1])):
# feature.append(handpose_peaks[1][idx][2])
# else:
# feature.append(0)
# for idx,handedges in enumerate(handpose_edges):
# for (peaktxt, (handedge_x1, handedge_y1), (handedge_x2, handedge_y2)) in handedges:
# feature[f'hand{idx}edge_x1_{peaktxt}']=handedge_x1
# feature[f'hand{idx}edge_y1_{peaktxt}']=handedge_y1
# feature[f'hand{idx}edge_x2_{peaktxt}']=handedge_x2
# feature[f'hand{idx}edge_y2_{peaktxt}']=handedge_y2
X=np.array(feature)
# time_steps = 12 # Number of time steps
# num_features = X.shape[0] // time_steps # Number of features per time step
# X_reshaped = X.reshape(1,time_steps, num_features)
return X
def bodypos(self, oriImg):
model_type = 'body25'
scale_search = [0.5]
boxsize = 368
stride = 8
padValue = 128
thre1 = 0.1
thre2 = 0.05
multiplier = [x * boxsize / oriImg.shape[0] for x in scale_search]
heatmap_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], self.njoint_body))
paf_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], self.npaf_body))
for m in range(len(multiplier)):
scale = multiplier[m]
imageToTest = cv2.resize(oriImg, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
imageToTest_padded, pad = util.padRightDownCorner(imageToTest, stride, padValue)
im = np.transpose(np.float32(imageToTest_padded[:, :, :, np.newaxis]), (3, 2, 0, 1)) / 256 - 0.5
im = np.ascontiguousarray(im)
data = torch.from_numpy(im).float()
if torch.cuda.is_available():
data = data.cuda()
with torch.no_grad():
Mconv7_stage6_L1, Mconv7_stage6_L2 = self.pt_body(data)
Mconv7_stage6_L1 = Mconv7_stage6_L1.cpu().numpy()
Mconv7_stage6_L2 = Mconv7_stage6_L2.cpu().numpy()
heatmap = np.transpose(np.squeeze(Mconv7_stage6_L2), (1, 2, 0)) # output 1 is heatmaps
heatmap = cv2.resize(heatmap, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
heatmap = heatmap[:imageToTest_padded.shape[0] - pad[2], :imageToTest_padded.shape[1] - pad[3], :]
heatmap = cv2.resize(heatmap, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv2.INTER_CUBIC)
paf = np.transpose(np.squeeze(Mconv7_stage6_L1), (1, 2, 0)) # output 0 is PAFs
paf = cv2.resize(paf, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
paf = paf[:imageToTest_padded.shape[0] - pad[2], :imageToTest_padded.shape[1] - pad[3], :]
paf = cv2.resize(paf, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv2.INTER_CUBIC)
heatmap_avg += heatmap_avg + heatmap / len(multiplier)
paf_avg += + paf / len(multiplier)
all_peaks = []
peak_counter = 0
for part in range(self.njoint_body-1):
map_ori = heatmap_avg[:, :, part]
one_heatmap = gaussian_filter(map_ori, sigma=3)
map_left = np.zeros(one_heatmap.shape)
map_left[1:, :] = one_heatmap[:-1, :]
map_right = np.zeros(one_heatmap.shape)
map_right[:-1, :] = one_heatmap[1:, :]
map_up = np.zeros(one_heatmap.shape)
map_up[:, 1:] = one_heatmap[:, :-1]
map_down = np.zeros(one_heatmap.shape)
map_down[:, :-1] = one_heatmap[:, 1:]
peaks_binary = np.logical_and.reduce(
(one_heatmap >= map_left, one_heatmap >= map_right, one_heatmap >= map_up, one_heatmap >= map_down, one_heatmap > thre1))
peaks = list(zip(np.nonzero(peaks_binary)[1], np.nonzero(peaks_binary)[0])) # note reverse
peaks_with_score = [x + (map_ori[x[1], x[0]],) for x in peaks]
peak_id = range(peak_counter, peak_counter + len(peaks))
peaks_with_score_and_id = [peaks_with_score[i] + (peak_id[i],) for i in range(len(peak_id))]
all_peaks.append(peaks_with_score_and_id)
peak_counter += len(peaks)
if model_type=='body25':
# find connection in the specified sequence, center 29 is in the position 15
limbSeq = [[1,0],[1,2],[2,3],[3,4],[1,5],[5,6],[6,7],[1,8],[8,9],[9,10],\
[10,11],[8,12],[12,13],[13,14],[0,15],[0,16],[15,17],[16,18],\
[11,24],[11,22],[14,21],[14,19],[22,23],[19,20]]
# the middle joints heatmap correpondence
mapIdx = [[30, 31],[14, 15],[16, 17],[18, 19],[22, 23],[24, 25],[26, 27],[0, 1],[6, 7],\
[2, 3],[4, 5], [8, 9],[10, 11],[12, 13],[32, 33],[34, 35],[36,37],[38,39],\
[50,51],[46,47],[44,45],[40,41],[48,49],[42,43]]
else:
# find connection in the specified sequence, center 29 is in the position 15
limbSeq = [[1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], \
[9, 10], [1, 11], [11, 12], [12, 13], [1, 0], [0, 14], [14, 16], \
[0, 15], [15, 17], [2, 16], [5, 17]]
# the middle joints heatmap correpondence
mapIdx = [[12, 13], [20, 21], [14, 15], [16, 17], [22, 23], [24, 25], [0, 1], [2, 3], \
[4, 5], [6, 7], [8, 9], [10, 11], [28, 29], [30, 31], [34, 35], [32, 33], \
[36, 37], [18, 19], [26, 27]]
connection_all = []
special_k = []
mid_num = 10
for k in range(len(mapIdx)):
score_mid = paf_avg[:, :, mapIdx[k]]
candA = all_peaks[limbSeq[k][0]]
candB = all_peaks[limbSeq[k][1]]
nA = len(candA)
nB = len(candB)
indexA, indexB = limbSeq[k]
if (nA != 0 and nB != 0):
connection_candidate = []
for i in range(nA):
for j in range(nB):
vec = np.subtract(candB[j][:2], candA[i][:2])
norm = math.sqrt(vec[0] * vec[0] + vec[1] * vec[1])
norm = max(0.001, norm)
vec = np.divide(vec, norm)
startend = list(zip(np.linspace(candA[i][0], candB[j][0], num=mid_num), \
np.linspace(candA[i][1], candB[j][1], num=mid_num)))
vec_x = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 0] \
for I in range(len(startend))])
vec_y = np.array([score_mid[int(round(startend[I][1])), int(round(startend[I][0])), 1] \
for I in range(len(startend))])
score_midpts = np.multiply(vec_x, vec[0]) + np.multiply(vec_y, vec[1])
score_with_dist_prior = sum(score_midpts) / len(score_midpts) + min(
0.5 * oriImg.shape[0] / norm - 1, 0)
criterion1 = len(np.nonzero(score_midpts > thre2)[0]) > 0.8 * len(score_midpts)
criterion2 = score_with_dist_prior > 0
if criterion1 and criterion2:
connection_candidate.append(
[i, j, score_with_dist_prior, score_with_dist_prior + candA[i][2] + candB[j][2]])
connection_candidate = sorted(connection_candidate, key=lambda x: x[2], reverse=True)
connection = np.zeros((0, 5))
for c in range(len(connection_candidate)):
i, j, s = connection_candidate[c][0:3]
if (i not in connection[:, 3] and j not in connection[:, 4]):
connection = np.vstack([connection, [candA[i][3], candB[j][3], s, i, j]])
if (len(connection) >= min(nA, nB)):
break
connection_all.append(connection)
else:
special_k.append(k)
connection_all.append([])
# last number in each row is the total parts number of that person
# the second last number in each row is the score of the overall configuration
subset = -1 * np.ones((0, self.njoint_body+1))
candidate = np.array([item for sublist in all_peaks for item in sublist])
for k in range(len(mapIdx)):
if k not in special_k:
partAs = connection_all[k][:, 0]
partBs = connection_all[k][:, 1]
indexA, indexB = np.array(limbSeq[k])
for i in range(len(connection_all[k])): # = 1:size(temp,1)
found = 0
subset_idx = [-1, -1]
for j in range(len(subset)): # 1:size(subset,1):
if subset[j][indexA] == partAs[i] or subset[j][indexB] == partBs[i]:
subset_idx[found] = j
found += 1
if found == 1:
j = subset_idx[0]
if subset[j][indexB] != partBs[i]:
subset[j][indexB] = partBs[i]
subset[j][-1] += 1
subset[j][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
elif found == 2: # if found 2 and disjoint, merge them
j1, j2 = subset_idx
membership = ((subset[j1] >= 0).astype(int) + (subset[j2] >= 0).astype(int))[:-2]
if len(np.nonzero(membership == 2)[0]) == 0: # merge
subset[j1][:-2] += (subset[j2][:-2] + 1)
subset[j1][-2:] += subset[j2][-2:]
subset[j1][-2] += connection_all[k][i][2]
subset = np.delete(subset, j2, 0)
else: # as like found == 1
subset[j1][indexB] = partBs[i]
subset[j1][-1] += 1
subset[j1][-2] += candidate[partBs[i].astype(int), 2] + connection_all[k][i][2]
# if find no partA in the subset, create a new subset
elif not found and k < self.njoint_body-2:
row = -1 * np.ones(self.njoint_body+1)
row[indexA] = partAs[i]
row[indexB] = partBs[i]
row[-1] = 2
row[-2] = sum(candidate[connection_all[k][i, :2].astype(int), 2]) + connection_all[k][i][2]
subset = np.vstack([subset, row])
# delete some rows of subset which has few parts occur
deleteIdx = []
for i in range(len(subset)):
if subset[i][-1] < 4 or subset[i][-2] / subset[i][-1] < 0.4:
deleteIdx.append(i)
subset = np.delete(subset, deleteIdx, axis=0)
# subset: n*20 array, 0-17 is the index in candidate, 18 is the total score, 19 is the total parts
# candidate: x, y, score, id
return candidate, subset
def handpos(self, oriImg):
scale_search = [0.5, 1.0, 1.5, 2.0]
# scale_search = [0.5]
boxsize = 368
stride = 8
padValue = 128
thre = 0.05
multiplier = [x * boxsize / oriImg.shape[0] for x in scale_search]
heatmap_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], 22))
# paf_avg = np.zeros((oriImg.shape[0], oriImg.shape[1], 38))
for m in range(len(multiplier)):
scale = multiplier[m]
imageToTest = cv2.resize(oriImg, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
imageToTest_padded, pad = util.padRightDownCorner(imageToTest, stride, padValue)
im = np.transpose(np.float32(imageToTest_padded[:, :, :, np.newaxis]), (3, 2, 0, 1)) / 256 - 0.5
im = np.ascontiguousarray(im)
data = torch.from_numpy(im).float()
if torch.cuda.is_available():
data = data.cuda()
# data = data.permute([2, 0, 1]).unsqueeze(0).float()
with torch.no_grad():
output = self.pt_hand(data).cpu().numpy()
# output = self.model(data).numpy()q
# extract outputs, resize, and remove padding
heatmap = np.transpose(np.squeeze(output), (1, 2, 0)) # output 1 is heatmaps
heatmap = cv2.resize(heatmap, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
heatmap = heatmap[:imageToTest_padded.shape[0] - pad[2], :imageToTest_padded.shape[1] - pad[3], :]
heatmap = cv2.resize(heatmap, (oriImg.shape[1], oriImg.shape[0]), interpolation=cv2.INTER_CUBIC)
heatmap_avg += heatmap / len(multiplier)
all_peaks = []
for part in range(21):
map_ori = heatmap_avg[:, :, part]
one_heatmap = gaussian_filter(map_ori, sigma=3)
binary = np.ascontiguousarray(one_heatmap > thre, dtype=np.uint8)
# 全部小于阈值
if np.sum(binary) == 0:
all_peaks.append([0, 0])
continue
label_img, label_numbers = label(binary, return_num=True, connectivity=binary.ndim)
max_index = np.argmax([np.sum(map_ori[label_img == i]) for i in range(1, label_numbers + 1)]) + 1
label_img[label_img != max_index] = 0
map_ori[label_img == 0] = 0
y, x = util.npmax(map_ori)
all_peaks.append([x, y])
return np.array(all_peaks)