Spaces:
Sleeping
Sleeping
import numpy as np | |
import cv2 | |
import pandas as pd | |
import operator | |
import matplotlib.pyplot as plt | |
import os | |
from sklearn.model_selection import train_test_split | |
from tensorflow.keras.utils import Sequence | |
from config import yolo_config | |
def load_weights(model, weights_file_path): | |
conv_layer_size = 110 | |
conv_output_idxs = [93, 101, 109] | |
with open(weights_file_path, 'rb') as file: | |
major, minor, revision, seen, _ = np.fromfile(file, dtype=np.int32, count=5) | |
bn_idx = 0 | |
for conv_idx in range(conv_layer_size): | |
conv_layer_name = f'conv2d_{conv_idx}' if conv_idx > 0 else 'conv2d' | |
bn_layer_name = f'batch_normalization_{bn_idx}' if bn_idx > 0 else 'batch_normalization' | |
conv_layer = model.get_layer(conv_layer_name) | |
filters = conv_layer.filters | |
kernel_size = conv_layer.kernel_size[0] | |
input_dims = conv_layer.input_shape[-1] | |
if conv_idx not in conv_output_idxs: | |
# darknet bn layer weights: [beta, gamma, mean, variance] | |
bn_weights = np.fromfile(file, dtype=np.float32, count=4 * filters) | |
# tf bn layer weights: [gamma, beta, mean, variance] | |
bn_weights = bn_weights.reshape((4, filters))[[1, 0, 2, 3]] | |
bn_layer = model.get_layer(bn_layer_name) | |
bn_idx += 1 | |
else: | |
conv_bias = np.fromfile(file, dtype=np.float32, count=filters) | |
# darknet shape: (out_dim, input_dims, height, width) | |
# tf shape: (height, width, input_dims, out_dim) | |
conv_shape = (filters, input_dims, kernel_size, kernel_size) | |
conv_weights = np.fromfile(file, dtype=np.float32, count=np.product(conv_shape)) | |
conv_weights = conv_weights.reshape(conv_shape).transpose([2, 3, 1, 0]) | |
if conv_idx not in conv_output_idxs: | |
conv_layer.set_weights([conv_weights]) | |
bn_layer.set_weights(bn_weights) | |
else: | |
conv_layer.set_weights([conv_weights, conv_bias]) | |
if len(file.read()) == 0: | |
print('all weights read') | |
else: | |
print(f'failed to read all weights, # of unread weights: {len(file.read())}') | |
def get_detection_data(img, model_outputs, class_names): | |
""" | |
:param img: target raw image | |
:param model_outputs: outputs from inference_model | |
:param class_names: list of object class names | |
:return: | |
""" | |
num_bboxes = model_outputs[-1][0] | |
boxes, scores, classes = [output[0][:num_bboxes] for output in model_outputs[:-1]] | |
h, w = img.shape[:2] | |
df = pd.DataFrame(boxes, columns=['x1', 'y1', 'x2', 'y2']) | |
df[['x1', 'x2']] = (df[['x1', 'x2']] * w).astype('int64') | |
df[['y1', 'y2']] = (df[['y1', 'y2']] * h).astype('int64') | |
df['class_name'] = np.array(class_names)[classes.astype('int64')] | |
df['score'] = scores | |
df['w'] = df['x2'] - df['x1'] | |
df['h'] = df['y2'] - df['y1'] | |
print(f'# of bboxes: {num_bboxes}') | |
return df | |
def read_annotation_lines(annotation_path, test_size=None, random_seed=5566): | |
with open(annotation_path) as f: | |
lines = f.readlines() | |
if test_size: | |
return train_test_split(lines, test_size=test_size, random_state=random_seed) | |
else: | |
return lines | |
def draw_bbox(img, detections, cmap, random_color=True, figsize=(10, 10), show_img=True, show_text=True): | |
""" | |
Draw bounding boxes on the img. | |
:param img: BGR img. | |
:param detections: pandas DataFrame containing detections | |
:param random_color: assign random color for each objects | |
:param cmap: object colormap | |
:param plot_img: if plot img with bboxes | |
:return: None | |
""" | |
img = np.array(img) | |
scale = max(img.shape[0:2]) / 416 | |
line_width = int(2 * scale) | |
for _, row in detections.iterrows(): | |
x1, y1, x2, y2, cls, score, w, h = row.values | |
color = list(np.random.random(size=3) * 255) if random_color else cmap[cls] | |
cv2.rectangle(img, (x1, y1), (x2, y2), color, line_width) | |
if show_text: | |
text = f'{cls} {score:.2f}' | |
font = cv2.FONT_HERSHEY_DUPLEX | |
font_scale = max(0.3 * scale, 0.3) | |
thickness = max(int(1 * scale), 1) | |
(text_width, text_height) = cv2.getTextSize(text, font, fontScale=font_scale, thickness=thickness)[0] | |
cv2.rectangle(img, (x1 - line_width//2, y1 - text_height), (x1 + text_width, y1), color, cv2.FILLED) | |
cv2.putText(img, text, (x1, y1), font, font_scale, (255, 255, 255), thickness, cv2.LINE_AA) | |
if show_img: | |
plt.figure(figsize=figsize) | |
plt.imshow(img) | |
plt.show() | |
return img | |
class DataGenerator(Sequence): | |
""" | |
Generates data for Keras | |
ref: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly | |
""" | |
def __init__(self, | |
annotation_lines, | |
class_name_path, | |
folder_path, | |
max_boxes=100, | |
shuffle=True): | |
self.annotation_lines = annotation_lines | |
self.class_name_path = class_name_path | |
self.num_classes = len([line.strip() for line in open(class_name_path).readlines()]) | |
self.num_gpu = yolo_config['num_gpu'] | |
self.batch_size = yolo_config['batch_size'] * self.num_gpu | |
self.target_img_size = yolo_config['img_size'] | |
self.anchors = np.array(yolo_config['anchors']).reshape((9, 2)) | |
self.shuffle = shuffle | |
self.indexes = np.arange(len(self.annotation_lines)) | |
self.folder_path = folder_path | |
self.max_boxes = max_boxes | |
self.on_epoch_end() | |
def __len__(self): | |
'number of batches per epoch' | |
return int(np.ceil(len(self.annotation_lines) / self.batch_size)) | |
def __getitem__(self, index): | |
'Generate one batch of data' | |
# Generate indexes of the batch | |
idxs = self.indexes[index * self.batch_size:(index + 1) * self.batch_size] | |
# Find list of IDs | |
lines = [self.annotation_lines[i] for i in idxs] | |
# Generate data | |
X, y_tensor, y_bbox = self.__data_generation(lines) | |
return [X, *y_tensor, y_bbox], np.zeros(len(lines)) | |
def on_epoch_end(self): | |
'Updates indexes after each epoch' | |
if self.shuffle: | |
np.random.shuffle(self.indexes) | |
def __data_generation(self, annotation_lines): | |
""" | |
Generates data containing batch_size samples | |
:param annotation_lines: | |
:return: | |
""" | |
X = np.empty((len(annotation_lines), *self.target_img_size), dtype=np.float32) | |
y_bbox = np.empty((len(annotation_lines), self.max_boxes, 5), dtype=np.float32) # x1y1x2y2 | |
for i, line in enumerate(annotation_lines): | |
img_data, box_data = self.get_data(line) | |
X[i] = img_data | |
y_bbox[i] = box_data | |
y_tensor, y_true_boxes_xywh = preprocess_true_boxes(y_bbox, self.target_img_size[:2], self.anchors, self.num_classes) | |
return X, y_tensor, y_true_boxes_xywh | |
def get_data(self, annotation_line): | |
line = annotation_line.split() | |
img_path = line[0] | |
img = cv2.imread(os.path.join(self.folder_path, img_path))[:, :, ::-1] | |
ih, iw = img.shape[:2] | |
h, w, c = self.target_img_size | |
boxes = np.array([np.array(list(map(float, box.split(',')))) for box in line[1:]], dtype=np.float32) # x1y1x2y2 | |
scale_w, scale_h = w / iw, h / ih | |
img = cv2.resize(img, (w, h)) | |
image_data = np.array(img) / 255. | |
# correct boxes coordinates | |
box_data = np.zeros((self.max_boxes, 5)) | |
if len(boxes) > 0: | |
np.random.shuffle(boxes) | |
boxes = boxes[:self.max_boxes] | |
boxes[:, [0, 2]] = boxes[:, [0, 2]] * scale_w # + dx | |
boxes[:, [1, 3]] = boxes[:, [1, 3]] * scale_h # + dy | |
box_data[:len(boxes)] = boxes | |
return image_data, box_data | |
def preprocess_true_boxes(true_boxes, input_shape, anchors, num_classes): | |
'''Preprocess true boxes to training input format | |
Parameters | |
---------- | |
true_boxes: array, shape=(bs, max boxes per img, 5) | |
Absolute x_min, y_min, x_max, y_max, class_id relative to input_shape. | |
input_shape: array-like, hw, multiples of 32 | |
anchors: array, shape=(N, 2), (9, wh) | |
num_classes: int | |
Returns | |
------- | |
y_true: list of array, shape like yolo_outputs, xywh are reletive value | |
''' | |
num_stages = 3 # default setting for yolo, tiny yolo will be 2 | |
anchor_mask = [[0, 1, 2], [3, 4, 5], [6, 7, 8]] | |
bbox_per_grid = 3 | |
true_boxes = np.array(true_boxes, dtype='float32') | |
true_boxes_abs = np.array(true_boxes, dtype='float32') | |
input_shape = np.array(input_shape, dtype='int32') | |
true_boxes_xy = (true_boxes_abs[..., 0:2] + true_boxes_abs[..., 2:4]) // 2 # (100, 2) | |
true_boxes_wh = true_boxes_abs[..., 2:4] - true_boxes_abs[..., 0:2] # (100, 2) | |
# Normalize x,y,w, h, relative to img size -> (0~1) | |
true_boxes[..., 0:2] = true_boxes_xy/input_shape[::-1] # xy | |
true_boxes[..., 2:4] = true_boxes_wh/input_shape[::-1] # wh | |
bs = true_boxes.shape[0] | |
grid_sizes = [input_shape//{0:8, 1:16, 2:32}[stage] for stage in range(num_stages)] | |
y_true = [np.zeros((bs, | |
grid_sizes[s][0], | |
grid_sizes[s][1], | |
bbox_per_grid, | |
5+num_classes), dtype='float32') | |
for s in range(num_stages)] | |
# [(?, 52, 52, 3, 5+num_classes) (?, 26, 26, 3, 5+num_classes) (?, 13, 13, 3, 5+num_classes) ] | |
y_true_boxes_xywh = np.concatenate((true_boxes_xy, true_boxes_wh), axis=-1) | |
# Expand dim to apply broadcasting. | |
anchors = np.expand_dims(anchors, 0) # (1, 9 , 2) | |
anchor_maxes = anchors / 2. # (1, 9 , 2) | |
anchor_mins = -anchor_maxes # (1, 9 , 2) | |
valid_mask = true_boxes_wh[..., 0] > 0 # (1, 100) | |
for batch_idx in range(bs): | |
# Discard zero rows. | |
wh = true_boxes_wh[batch_idx, valid_mask[batch_idx]] # (# of bbox, 2) | |
num_boxes = len(wh) | |
if num_boxes == 0: continue | |
wh = np.expand_dims(wh, -2) # (# of bbox, 1, 2) | |
box_maxes = wh / 2. # (# of bbox, 1, 2) | |
box_mins = -box_maxes # (# of bbox, 1, 2) | |
# Compute IoU between each anchors and true boxes for responsibility assignment | |
intersect_mins = np.maximum(box_mins, anchor_mins) # (# of bbox, 9, 2) | |
intersect_maxes = np.minimum(box_maxes, anchor_maxes) | |
intersect_wh = np.maximum(intersect_maxes - intersect_mins, 0.) | |
intersect_area = np.prod(intersect_wh, axis=-1) # (9,) | |
box_area = wh[..., 0] * wh[..., 1] # (# of bbox, 1) | |
anchor_area = anchors[..., 0] * anchors[..., 1] # (1, 9) | |
iou = intersect_area / (box_area + anchor_area - intersect_area) # (# of bbox, 9) | |
# Find best anchor for each true box | |
best_anchors = np.argmax(iou, axis=-1) # (# of bbox,) | |
for box_idx in range(num_boxes): | |
best_anchor = best_anchors[box_idx] | |
for stage in range(num_stages): | |
if best_anchor in anchor_mask[stage]: | |
x_offset = true_boxes[batch_idx, box_idx, 0]*grid_sizes[stage][1] | |
y_offset = true_boxes[batch_idx, box_idx, 1]*grid_sizes[stage][0] | |
# Grid Index | |
grid_col = np.floor(x_offset).astype('int32') | |
grid_row = np.floor(y_offset).astype('int32') | |
anchor_idx = anchor_mask[stage].index(best_anchor) | |
class_idx = true_boxes[batch_idx, box_idx, 4].astype('int32') | |
# y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 0] = x_offset - grid_col # x | |
# y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 1] = y_offset - grid_row # y | |
# y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, :4] = true_boxes_abs[batch_idx, box_idx, :4] # abs xywh | |
y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, :2] = true_boxes_xy[batch_idx, box_idx, :] # abs xy | |
y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 2:4] = true_boxes_wh[batch_idx, box_idx, :] # abs wh | |
y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 4] = 1 # confidence | |
y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 5+class_idx] = 1 # one-hot encoding | |
# smooth | |
# onehot = np.zeros(num_classes, dtype=np.float) | |
# onehot[class_idx] = 1.0 | |
# uniform_distribution = np.full(num_classes, 1.0 / num_classes) | |
# delta = 0.01 | |
# smooth_onehot = onehot * (1 - delta) + delta * uniform_distribution | |
# y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 5:] = smooth_onehot | |
return y_true, y_true_boxes_xywh | |
""" | |
Calculate the AP given the recall and precision array | |
1st) We compute a version of the measured precision/recall curve with | |
precision monotonically decreasing | |
2nd) We compute the AP as the area under this curve by numerical integration. | |
""" | |
def voc_ap(rec, prec): | |
""" | |
--- Official matlab code VOC2012--- | |
mrec=[0 ; rec ; 1]; | |
mpre=[0 ; prec ; 0]; | |
for i=numel(mpre)-1:-1:1 | |
mpre(i)=max(mpre(i),mpre(i+1)); | |
end | |
i=find(mrec(2:end)~=mrec(1:end-1))+1; | |
ap=sum((mrec(i)-mrec(i-1)).*mpre(i)); | |
""" | |
rec.insert(0, 0.0) # insert 0.0 at begining of list | |
rec.append(1.0) # insert 1.0 at end of list | |
mrec = rec[:] | |
prec.insert(0, 0.0) # insert 0.0 at begining of list | |
prec.append(0.0) # insert 0.0 at end of list | |
mpre = prec[:] | |
""" | |
This part makes the precision monotonically decreasing | |
(goes from the end to the beginning) | |
matlab: for i=numel(mpre)-1:-1:1 | |
mpre(i)=max(mpre(i),mpre(i+1)); | |
""" | |
# matlab indexes start in 1 but python in 0, so I have to do: | |
# range(start=(len(mpre) - 2), end=0, step=-1) | |
# also the python function range excludes the end, resulting in: | |
# range(start=(len(mpre) - 2), end=-1, step=-1) | |
for i in range(len(mpre)-2, -1, -1): | |
mpre[i] = max(mpre[i], mpre[i+1]) | |
""" | |
This part creates a list of indexes where the recall changes | |
matlab: i=find(mrec(2:end)~=mrec(1:end-1))+1; | |
""" | |
i_list = [] | |
for i in range(1, len(mrec)): | |
if mrec[i] != mrec[i-1]: | |
i_list.append(i) # if it was matlab would be i + 1 | |
""" | |
The Average Precision (AP) is the area under the curve | |
(numerical integration) | |
matlab: ap=sum((mrec(i)-mrec(i-1)).*mpre(i)); | |
""" | |
ap = 0.0 | |
for i in i_list: | |
ap += ((mrec[i]-mrec[i-1])*mpre[i]) | |
return ap, mrec, mpre | |
""" | |
Draw plot using Matplotlib | |
""" | |
def draw_plot_func(dictionary, n_classes, window_title, plot_title, x_label, output_path, to_show, plot_color, true_p_bar): | |
# sort the dictionary by decreasing value, into a list of tuples | |
sorted_dic_by_value = sorted(dictionary.items(), key=operator.itemgetter(1)) | |
print(sorted_dic_by_value) | |
# unpacking the list of tuples into two lists | |
sorted_keys, sorted_values = zip(*sorted_dic_by_value) | |
# | |
if true_p_bar != "": | |
""" | |
Special case to draw in: | |
- green -> TP: True Positives (object detected and matches ground-truth) | |
- red -> FP: False Positives (object detected but does not match ground-truth) | |
- pink -> FN: False Negatives (object not detected but present in the ground-truth) | |
""" | |
fp_sorted = [] | |
tp_sorted = [] | |
for key in sorted_keys: | |
fp_sorted.append(dictionary[key] - true_p_bar[key]) | |
tp_sorted.append(true_p_bar[key]) | |
plt.barh(range(n_classes), fp_sorted, align='center', color='crimson', label='False Positive') | |
plt.barh(range(n_classes), tp_sorted, align='center', color='forestgreen', label='True Positive', left=fp_sorted) | |
# add legend | |
plt.legend(loc='lower right') | |
""" | |
Write number on side of bar | |
""" | |
fig = plt.gcf() # gcf - get current figure | |
axes = plt.gca() | |
r = fig.canvas.get_renderer() | |
for i, val in enumerate(sorted_values): | |
fp_val = fp_sorted[i] | |
tp_val = tp_sorted[i] | |
fp_str_val = " " + str(fp_val) | |
tp_str_val = fp_str_val + " " + str(tp_val) | |
# trick to paint multicolor with offset: | |
# first paint everything and then repaint the first number | |
t = plt.text(val, i, tp_str_val, color='forestgreen', va='center', fontweight='bold') | |
plt.text(val, i, fp_str_val, color='crimson', va='center', fontweight='bold') | |
if i == (len(sorted_values)-1): # largest bar | |
adjust_axes(r, t, fig, axes) | |
else: | |
plt.barh(range(n_classes), sorted_values, color=plot_color) | |
""" | |
Write number on side of bar | |
""" | |
fig = plt.gcf() # gcf - get current figure | |
axes = plt.gca() | |
r = fig.canvas.get_renderer() | |
for i, val in enumerate(sorted_values): | |
str_val = " " + str(val) # add a space before | |
if val < 1.0: | |
str_val = " {0:.2f}".format(val) | |
t = plt.text(val, i, str_val, color=plot_color, va='center', fontweight='bold') | |
# re-set axes to show number inside the figure | |
if i == (len(sorted_values)-1): # largest bar | |
adjust_axes(r, t, fig, axes) | |
# set window title | |
fig.canvas.set_window_title(window_title) | |
# write classes in y axis | |
tick_font_size = 12 | |
plt.yticks(range(n_classes), sorted_keys, fontsize=tick_font_size) | |
""" | |
Re-scale height accordingly | |
""" | |
init_height = fig.get_figheight() | |
# comput the matrix height in points and inches | |
dpi = fig.dpi | |
height_pt = n_classes * (tick_font_size * 1.4) # 1.4 (some spacing) | |
height_in = height_pt / dpi | |
# compute the required figure height | |
top_margin = 0.15 # in percentage of the figure height | |
bottom_margin = 0.05 # in percentage of the figure height | |
figure_height = height_in / (1 - top_margin - bottom_margin) | |
# set new height | |
if figure_height > init_height: | |
fig.set_figheight(figure_height) | |
# set plot title | |
plt.title(plot_title, fontsize=14) | |
# set axis titles | |
# plt.xlabel('classes') | |
plt.xlabel(x_label, fontsize='large') | |
# adjust size of window | |
fig.tight_layout() | |
# save the plot | |
fig.savefig(output_path) | |
# show image | |
# if to_show: | |
plt.show() | |
# close the plot | |
# plt.close() | |
""" | |
Plot - adjust axes | |
""" | |
def adjust_axes(r, t, fig, axes): | |
# get text width for re-scaling | |
bb = t.get_window_extent(renderer=r) | |
text_width_inches = bb.width / fig.dpi | |
# get axis width in inches | |
current_fig_width = fig.get_figwidth() | |
new_fig_width = current_fig_width + text_width_inches | |
propotion = new_fig_width / current_fig_width | |
# get axis limit | |
x_lim = axes.get_xlim() | |
axes.set_xlim([x_lim[0], x_lim[1]*propotion]) | |
def read_txt_to_list(path): | |
# open txt file lines to a list | |
with open(path) as f: | |
content = f.readlines() | |
# remove whitespace characters like `\n` at the end of each line | |
content = [x.strip() for x in content] | |
return content |