BenjiELCA's picture
no subprocess problem
7440d31
raw
history blame
32.3 kB
import numpy as np
import torch
from modules.utils import class_dict, object_dict, arrow_dict, find_closest_object, find_other_keypoint, filter_overlap_boxes, iou
from tqdm import tqdm
from modules.toXML import get_size_elements, calculate_pool_bounds, create_BPMN_id
from modules.utils import is_vertical
import streamlit as st
def non_maximum_suppression(boxes, scores, labels=None, iou_threshold=0.5):
exception = ['pool', 'lane']
idxs = np.argsort(scores) # Sort the boxes according to their scores in ascending order
selected_boxes = []
while len(idxs) > 0:
last = len(idxs) - 1
i = idxs[last]
# Skip if the label is a lane
if labels is not None and (class_dict[labels[i]] in exception):
selected_boxes.append(i)
idxs = np.delete(idxs, last)
continue
selected_boxes.append(i)
# Find the intersection of the box with the rest
suppress = [last]
for pos in range(0, last):
j = idxs[pos]
if iou(boxes[i], boxes[j]) > iou_threshold:
suppress.append(pos)
idxs = np.delete(idxs, suppress)
# Return only the boxes that were selected
return selected_boxes
def keypoint_correction(keypoints, boxes, labels, model_dict=arrow_dict, distance_treshold=15):
for idx, (key1, key2) in enumerate(keypoints):
if labels[idx] not in [list(model_dict.values()).index('sequenceFlow'),
list(model_dict.values()).index('messageFlow'),
list(model_dict.values()).index('dataAssociation')]:
continue
# Calculate the Euclidean distance between the two keypoints
distance = np.linalg.norm(key1[:2] - key2[:2])
if distance < distance_treshold:
print('Key modified for index:', idx)
x_new,y_new, x,y = find_other_keypoint(idx, keypoints, boxes)
keypoints[idx][0][:2] = [x_new,y_new]
keypoints[idx][1][:2] = [x,y]
return keypoints
def object_prediction(model, image, score_threshold=0.5, iou_threshold=0.5):
model.eval()
with torch.no_grad():
image_tensor = image.unsqueeze(0).to(torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'))
predictions = model(image_tensor)
boxes = predictions[0]['boxes'].cpu().numpy()
labels = predictions[0]['labels'].cpu().numpy()
scores = predictions[0]['scores'].cpu().numpy()
idx = np.where(scores > score_threshold)[0]
boxes = boxes[idx]
scores = scores[idx]
labels = labels[idx]
selected_boxes = non_maximum_suppression(boxes, scores, labels=labels, iou_threshold=iou_threshold)
#find orientation of the task by checking the size of all the boxes and delete the one that are not in the same orientation
vertical = 0
for i in range(len(labels)):
if labels[i] != list(object_dict.values()).index('task'):
continue
if is_vertical(boxes[i]):
vertical += 1
horizontal = len(labels) - vertical
for i in range(len(labels)):
if labels[i] != list(object_dict.values()).index('task'):
continue
if vertical < horizontal:
if is_vertical(boxes[i]):
#find the element in the list and remove it
if i in selected_boxes:
selected_boxes.remove(i)
elif vertical > horizontal:
if is_vertical(boxes[i]) == False:
#find the element in the list and remove it
if i in selected_boxes:
selected_boxes.remove(i)
else:
pass
boxes = boxes[selected_boxes]
scores = scores[selected_boxes]
labels = labels[selected_boxes]
#modify the label of the sub-process to task
for i in range(len(labels)):
if labels[i] == list(object_dict.values()).index('subProcess'):
labels[i] = list(object_dict.values()).index('task')
prediction = {
'boxes': boxes,
'scores': scores,
'labels': labels,
}
image = image.permute(1, 2, 0).cpu().numpy()
image = (image * 255).astype(np.uint8)
return image, prediction
def arrow_prediction(model, image, score_threshold=0.5, iou_threshold=0.5, distance_treshold=15):
model.eval()
with torch.no_grad():
image_tensor = image.unsqueeze(0).to(torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'))
predictions = model(image_tensor)
boxes = predictions[0]['boxes'].cpu().numpy()
labels = predictions[0]['labels'].cpu().numpy() + (len(object_dict) - 1)
scores = predictions[0]['scores'].cpu().numpy()
keypoints = predictions[0]['keypoints'].cpu().numpy()
idx = np.where(scores > score_threshold)[0]
boxes = boxes[idx]
scores = scores[idx]
labels = labels[idx]
keypoints = keypoints[idx]
selected_boxes = non_maximum_suppression(boxes, scores, iou_threshold=iou_threshold)
boxes = boxes[selected_boxes]
scores = scores[selected_boxes]
labels = labels[selected_boxes]
keypoints = keypoints[selected_boxes]
keypoints = keypoint_correction(keypoints, boxes, labels, class_dict, distance_treshold=distance_treshold)
prediction = {
'boxes': boxes,
'scores': scores,
'labels': labels,
'keypoints': keypoints,
}
image = image.permute(1, 2, 0).cpu().numpy()
image = (image * 255).astype(np.uint8)
return image, prediction
def mix_predictions(objects_pred, arrow_pred):
# Initialize the list of lists for keypoints
object_keypoints = []
# Number of boxes
num_boxes = len(objects_pred['boxes'])
# Iterate over the number of boxes
for _ in range(num_boxes):
# Each box has 2 keypoints, both initialized to [0, 0, 0]
keypoints = [[0, 0, 0], [0, 0, 0]]
object_keypoints.append(keypoints)
#concatenate the two predictions
if len(arrow_pred['boxes']) == 0:
return objects_pred['boxes'], objects_pred['labels'], objects_pred['scores'], object_keypoints
boxes = np.concatenate((objects_pred['boxes'], arrow_pred['boxes']))
labels = np.concatenate((objects_pred['labels'], arrow_pred['labels']))
scores = np.concatenate((objects_pred['scores'], arrow_pred['scores']))
keypoints = np.concatenate((object_keypoints, arrow_pred['keypoints']))
return boxes, labels, scores, keypoints
def regroup_elements_by_pool(boxes, labels, scores, keypoints, class_dict, iou_threshold=0.3):
pool_dict = {}
# Filter out pools with IoU greater than the threshold
to_delete = []
for i in range(len(boxes)):
for j in range(i + 1, len(boxes)):
if labels[i] == labels[j] and labels[i] == list(class_dict.values()).index('pool'):
if iou(np.array(boxes[i]), np.array(boxes[j])) > iou_threshold:
to_delete.append(j)
boxes = np.delete(boxes, to_delete, axis=0)
labels = np.delete(labels, to_delete)
scores = np.delete(scores, to_delete)
keypoints = np.delete(keypoints, to_delete, axis=0)
pool_indices = [i for i, label in enumerate(labels) if class_dict[label.item()] == 'pool']
pool_boxes = [boxes[i] for i in pool_indices]
if pool_indices:
for pool_index in pool_indices:
pool_dict[pool_index] = []
elements_not_in_pool = []
for i, box in enumerate(boxes):
assigned_to_pool = False
if i in pool_indices or class_dict[labels[i]] in ['messageFlow', 'pool']:
continue
for j, pool_box in enumerate(pool_boxes):
if (box[0] >= pool_box[0] and box[1] >= pool_box[1] and
box[2] <= pool_box[2] and box[3] <= pool_box[3]):
pool_index = pool_indices[j]
pool_dict[pool_index].append(i)
assigned_to_pool = True
break
if not assigned_to_pool:
if class_dict[labels[i]] not in ['messageFlow', 'lane', 'pool']:
elements_not_in_pool.append(i)
if len(elements_not_in_pool) > 1:
new_elements_not_in_pool = [i for i in elements_not_in_pool if class_dict[labels[i]] not in ['messageFlow', 'lane', 'pool']]
# Indices of relevant classes
sequence_flow_index = list(class_dict.values()).index('sequenceFlow')
message_flow_index = list(class_dict.values()).index('messageFlow')
data_association_index = list(class_dict.values()).index('dataAssociation')
if all(labels[i] in {sequence_flow_index, message_flow_index, data_association_index} for i in new_elements_not_in_pool):
print('The new pool contains only sequenceFlow, messageFlow, or dataAssociation')
elif len(new_elements_not_in_pool) > 1:
new_pool_index = len(labels)
box = calculate_pool_bounds(boxes, labels, new_elements_not_in_pool, None)
boxes = np.append(boxes, [box], axis=0)
labels = np.append(labels, list(class_dict.values()).index('pool'))
scores = np.append(scores, 1.0)
keypoints = np.append(keypoints, np.zeros((1, 2, 3)), axis=0)
pool_dict[new_pool_index] = new_elements_not_in_pool
print(f"Created a new pool index {new_pool_index} with elements: {new_elements_not_in_pool}")
non_empty_pools = {k: v for k, v in pool_dict.items() if v}
empty_pools = {k: v for k, v in pool_dict.items() if not v}
pool_dict = {**non_empty_pools, **empty_pools}
return pool_dict, boxes, labels, scores, keypoints
def create_links(keypoints, boxes, labels, class_dict):
best_points = []
links = []
for i in range(len(labels)):
if labels[i]==list(class_dict.values()).index('sequenceFlow') or labels[i]==list(class_dict.values()).index('messageFlow'):
closest1, point_start = find_closest_object(keypoints[i][0], boxes, labels)
closest2, point_end = find_closest_object(keypoints[i][1], boxes, labels)
if closest1 is not None and closest2 is not None:
best_points.append([point_start, point_end])
links.append([closest1, closest2])
else:
best_points.append([None,None])
links.append([None,None])
for i in range(len(labels)):
if labels[i]==list(class_dict.values()).index('dataAssociation'):
closest1, point_start = find_closest_object(keypoints[i][0], boxes, labels)
closest2, point_end = find_closest_object(keypoints[i][1], boxes, labels)
if closest1 is not None and closest2 is not None:
best_points[i] = ([point_start, point_end])
links[i] = ([closest1, closest2])
return links, best_points
def correction_labels(boxes, labels, class_dict, pool_dict, flow_links):
sequence_flow_index = list(class_dict.values()).index('sequenceFlow')
message_flow_index = list(class_dict.values()).index('messageFlow')
data_association_index = list(class_dict.values()).index('dataAssociation')
data_object_index = list(class_dict.values()).index('dataObject')
data_store_index = list(class_dict.values()).index('dataStore')
message_event_index = list(class_dict.values()).index('messageEvent')
senquence_flow_indexx = list(class_dict.values()).index('sequenceFlow')
for pool_index, elements in pool_dict.items():
print(f"Pool {pool_index} contains elements: {elements}")
# Check if the label sequenceFlow or messageFlow is good
for i, (id1, id2) in enumerate(flow_links):
if labels[i] in {sequence_flow_index, message_flow_index}:
if id1 is not None and id2 is not None:
# Check if each link is in the same pool
if id1 in elements and id2 in elements:
# Check if the link is between a dataObject or a dataStore
if labels[id1] in {data_object_index, data_store_index} or labels[id2] in {data_object_index, data_store_index}:
print('Change the link from sequenceFlow/messageFlow to dataAssociation')
labels[i] = data_association_index
else:
continue
elif id1 not in elements and id2 not in elements:
continue
else:
print('Change the link from sequenceFlow to messageFlow')
labels[i] = message_flow_index
# Check if dataAssociation is connected to a dataObject
for i, (id1, id2) in enumerate(flow_links):
if labels[i] == data_association_index:
if id1 is not None and id2 is not None:
label1 = labels[id1]
label2 = labels[id2]
if data_object_index in {label1, label2} or data_store_index in {label1, label2}:
continue
elif message_event_index in {label1, label2}:
print('Change the link from dataAssociation to messageFlow')
labels[i] = message_flow_index
else:
print('Change the link from dataAssociation to sequenceFlow')
labels[i] = senquence_flow_indexx
return labels, flow_links
def last_correction(boxes, labels, scores, keypoints, bpmn_id, links, best_points, pool_dict, limit_area=10000):
#delete pool that are have only messageFlow on it
delete_pool = []
for pool_index, elements in pool_dict.items():
#find the position of the pool_index in the bpmn_id
if pool_index in bpmn_id:
position = bpmn_id.index(pool_index)
else:
continue
if all([labels[i] in [list(class_dict.values()).index('messageFlow'),
list(class_dict.values()).index('sequenceFlow'),
list(class_dict.values()).index('dataAssociation'),
list(class_dict.values()).index('lane')] for i in elements]):
if len(elements) > 0:
delete_pool.append(position)
print(f"Pool {pool_index} contains only arrow elements, deleting it")
#calcul the area of the pool$
if position < len(boxes):
pool = boxes[position]
area = (pool[2] - pool[0]) * (pool[3] - pool[1])
if len(pool_dict)>1 and area < limit_area:
delete_pool.append(position)
print(f"Pool {pool_index} is too small, deleting it")
if is_vertical(boxes[position]):
delete_pool.append(position)
print(f"Pool {position} is vertical, deleting it")
delete_elements = []
# Check if there is an arrow that has the same links
for i in range(len(labels)):
for j in range(i+1, len(labels)):
if labels[i] == list(class_dict.values()).index('sequenceFlow') and labels[j] == list(class_dict.values()).index('sequenceFlow'):
if links[i] == links[j]:
print(f'element {i} and {j} have the same links')
if scores[i] > scores[j]:
print('delete element', j)
delete_elements.append(j)
else:
print('delete element', i)
delete_elements.append(i)
#concatenate the delete_elements and the delete_pool
delete_elements = delete_elements + delete_pool
#delete double value in delete_elements
delete_elements = list(set(delete_elements))
boxes = np.delete(boxes, delete_elements, axis=0)
labels = np.delete(labels, delete_elements)
scores = np.delete(scores, delete_elements)
keypoints = np.delete(keypoints, delete_elements, axis=0)
bpmn_id = [point for i, point in enumerate(bpmn_id) if i not in delete_elements]
links = np.delete(links, delete_elements, axis=0)
best_points = [point for i, point in enumerate(best_points) if i not in delete_elements]
#also delete the element in the pool_dict
for pool_index, elements in pool_dict.items():
pool_dict[pool_index] = [i for i in elements if i not in delete_elements]
return boxes, labels, scores, keypoints, bpmn_id, links, best_points, pool_dict
def give_link_to_element(links, labels):
#give a link to event to allow the creation of the BPMN id with start, indermediate and end event
for i in range(len(links)):
if labels[i] == list(class_dict.values()).index('sequenceFlow'):
id1, id2 = links[i]
if (id1 and id2) is not None:
links[id1][1] = i
links[id2][0] = i
return links
def generate_data(image, boxes, labels, scores, keypoints, bpmn_id, flow_links, best_points, pool_dict):
idx = []
for i in range(len(labels)):
idx.append(i)
data = {
'image': image,
'idx': idx,
'boxes': boxes,
'labels': labels,
'scores': scores,
'keypoints': keypoints,
'links': flow_links,
'best_points': best_points,
'pool_dict': pool_dict,
'BPMN_id': bpmn_id,
}
return data
def develop_prediction(boxes, labels, scores, keypoints, class_dict, correction=True):
pool_dict, boxes, labels, scores, keypoints = regroup_elements_by_pool(boxes, labels, scores, keypoints, class_dict)
bpmn_id, pool_dict = create_BPMN_id(labels,pool_dict)
# Create links between elements
flow_links, best_points = create_links(keypoints, boxes, labels, class_dict)
#Correct the labels of some sequenceflow that cross multiple pool
if correction:
labels, flow_links = correction_labels(boxes, labels, class_dict, pool_dict, flow_links)
#give a link to event to allow the creation of the BPMN id with start, indermediate and end event
flow_links = give_link_to_element(flow_links, labels)
boxes,labels,scores,keypoints,bpmn_id, flow_links,best_points,pool_dict = last_correction(boxes,labels,scores,keypoints,bpmn_id,flow_links,best_points, pool_dict)
return boxes, labels, scores, keypoints, bpmn_id, flow_links, best_points, pool_dict
def full_prediction(model_object, model_arrow, image, score_threshold=0.5, iou_threshold=0.5, resize=True, distance_treshold=15):
model_object.eval() # Set the model to evaluation mode
model_arrow.eval() # Set the model to evaluation mode
# Load an image
with torch.no_grad(): # Disable gradient calculation for inference
_, objects_pred = object_prediction(model_object, image, score_threshold=score_threshold, iou_threshold=0.1)
_, arrow_pred = arrow_prediction(model_arrow, image, score_threshold=score_threshold, iou_threshold=iou_threshold, distance_treshold=distance_treshold)
st.session_state.arrow_pred = arrow_pred
boxes, labels, scores, keypoints = mix_predictions(objects_pred, arrow_pred)
boxes, labels, scores, keypoints, bpmn_id, flow_links, best_points, pool_dict = develop_prediction(boxes, labels, scores, keypoints, class_dict)
image = image.permute(1, 2, 0).cpu().numpy()
image = (image * 255).astype(np.uint8)
data = generate_data(image, boxes, labels, scores, keypoints, bpmn_id, flow_links, best_points, pool_dict)
return image, data
def evaluate_model_by_class(pred_boxes, true_boxes, pred_labels, true_labels, model_dict, iou_threshold=0.5):
# Initialize dictionaries to hold per-class counts
class_tp = {cls: 0 for cls in model_dict.values()}
class_fp = {cls: 0 for cls in model_dict.values()}
class_fn = {cls: 0 for cls in model_dict.values()}
# Track which true boxes have been matched
matched = [False] * len(true_boxes)
# Check each prediction against true boxes
for pred_box, pred_label in zip(pred_boxes, pred_labels):
match_found = False
for idx, (true_box, true_label) in enumerate(zip(true_boxes, true_labels)):
if not matched[idx] and pred_label == true_label:
if iou(np.array(pred_box), np.array(true_box)) >= iou_threshold:
class_tp[model_dict[pred_label]] += 1
matched[idx] = True
match_found = True
break
if not match_found:
class_fp[model_dict[pred_label]] += 1
# Count false negatives
for idx, (true_box, true_label) in enumerate(zip(true_boxes, true_labels)):
if not matched[idx]:
class_fn[model_dict[true_label]] += 1
# Calculate precision, recall, and F1-score per class
class_precision = {}
class_recall = {}
class_f1_score = {}
for cls in model_dict.values():
precision = class_tp[cls] / (class_tp[cls] + class_fp[cls]) if class_tp[cls] + class_fp[cls] > 0 else 0
recall = class_tp[cls] / (class_tp[cls] + class_fn[cls]) if class_tp[cls] + class_fn[cls] > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
class_precision[cls] = precision
class_recall[cls] = recall
class_f1_score[cls] = f1_score
return class_precision, class_recall, class_f1_score
def keypoints_mesure(pred_boxes, pred_box, true_boxes, true_box, pred_keypoints, true_keypoints, distance_threshold=5):
result = 0
reverted = False
#find the position of keypoints in the list
idx = np.where(pred_boxes == pred_box)[0][0]
idx2 = np.where(true_boxes == true_box)[0][0]
keypoint1_pred = pred_keypoints[idx][0]
keypoint1_true = true_keypoints[idx2][0]
keypoint2_pred = pred_keypoints[idx][1]
keypoint2_true = true_keypoints[idx2][1]
distance1 = np.linalg.norm(keypoint1_pred[:2] - keypoint1_true[:2])
distance2 = np.linalg.norm(keypoint2_pred[:2] - keypoint2_true[:2])
distance3 = np.linalg.norm(keypoint1_pred[:2] - keypoint2_true[:2])
distance4 = np.linalg.norm(keypoint2_pred[:2] - keypoint1_true[:2])
if distance1 < distance_threshold:
result += 1
if distance2 < distance_threshold:
result += 1
if distance3 < distance_threshold or distance4 < distance_threshold:
reverted = True
return result, reverted
def evaluate_single_image(pred_boxes, true_boxes, pred_labels, true_labels, pred_keypoints, true_keypoints, iou_threshold=0.5, distance_threshold=5):
tp, fp, fn = 0, 0, 0
key_t, key_f = 0, 0
labels_t, labels_f = 0, 0
reverted_tot = 0
matched_true_boxes = set()
for pred_idx, (pred_box, pred_label) in enumerate(zip(pred_boxes, pred_labels)):
match_found = False
for true_idx, true_box in enumerate(true_boxes):
if true_idx in matched_true_boxes:
continue
iou_val = iou(pred_box, true_box)
if iou_val >= iou_threshold:
if true_keypoints is not None and pred_keypoints is not None:
key_result, reverted = keypoints_mesure(pred_boxes, pred_box, true_boxes, true_box, pred_keypoints, true_keypoints, distance_threshold)
key_t += key_result
key_f += 2 - key_result
if reverted:
reverted_tot += 1
match_found = True
matched_true_boxes.add(true_idx)
if pred_label == true_labels[true_idx]:
labels_t += 1
else:
labels_f += 1
tp += 1
break
if not match_found:
fp += 1
fn = len(true_boxes) - tp
return tp, fp, fn, labels_t, labels_f, key_t, key_f, reverted_tot
def pred_4_evaluation(model, loader, score_threshold=0.5, iou_threshold=0.5, distance_threshold=5, key_correction=True, model_type='object'):
model.eval()
tp, fp, fn = 0, 0, 0
labels_t, labels_f = 0, 0
key_t, key_f = 0, 0
reverted = 0
with torch.no_grad():
for images, targets_im in tqdm(loader, desc="Testing... "): # Wrap the loader with tqdm
devices = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
images = [image.to(devices) for image in images]
targets = [{k: v.clone().detach().to(devices) for k, v in t.items()} for t in targets_im]
predictions = model(images)
for target, prediction in zip(targets, predictions):
true_boxes = target['boxes'].cpu().numpy()
true_labels = target['labels'].cpu().numpy()
if 'keypoints' in target:
true_keypoints = target['keypoints'].cpu().numpy()
pred_boxes = prediction['boxes'].cpu().numpy()
scores = prediction['scores'].cpu().numpy()
pred_labels = prediction['labels'].cpu().numpy()
if 'keypoints' in prediction:
pred_keypoints = prediction['keypoints'].cpu().numpy()
selected_boxes = non_maximum_suppression(pred_boxes, scores, iou_threshold=iou_threshold)
pred_boxes = pred_boxes[selected_boxes]
scores = scores[selected_boxes]
pred_labels = pred_labels[selected_boxes]
if 'keypoints' in prediction:
pred_keypoints = pred_keypoints[selected_boxes]
filtered_boxes = []
filtered_labels = []
filtered_keypoints = []
if 'keypoints' not in prediction:
#create a list of zeros of length equal to the number of boxes
pred_keypoints = [np.zeros((2, 3)) for _ in range(len(pred_boxes))]
for box, score, label, keypoints in zip(pred_boxes, scores, pred_labels, pred_keypoints):
if score >= score_threshold:
filtered_boxes.append(box)
filtered_labels.append(label)
if 'keypoints' in prediction:
filtered_keypoints.append(keypoints)
if key_correction and ('keypoints' in prediction):
filtered_keypoints = keypoint_correction(filtered_keypoints, filtered_boxes, filtered_labels)
if 'keypoints' not in target:
filtered_keypoints = None
true_keypoints = None
tp_img, fp_img, fn_img, labels_t_img, labels_f_img, key_t_img, key_f_img, reverted_img = evaluate_single_image(
filtered_boxes, true_boxes, filtered_labels, true_labels, filtered_keypoints, true_keypoints, iou_threshold, distance_threshold)
tp += tp_img
fp += fp_img
fn += fn_img
labels_t += labels_t_img
labels_f += labels_f_img
key_t += key_t_img
key_f += key_f_img
reverted += reverted_img
return tp, fp, fn, labels_t, labels_f, key_t, key_f, reverted
def main_evaluation(model, test_loader, score_threshold=0.5, iou_threshold=0.5, distance_threshold=5, key_correction=True, model_type = 'object'):
tp, fp, fn, labels_t, labels_f, key_t, key_f, reverted = pred_4_evaluation(model, test_loader, score_threshold, iou_threshold, distance_threshold, key_correction, model_type)
labels_precision = labels_t / (labels_t + labels_f) if (labels_t + labels_f) > 0 else 0
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
if model_type == 'arrow':
key_accuracy = key_t / (key_t + key_f) if (key_t + key_f) > 0 else 0
reverted_accuracy = reverted / (key_t + key_f) if (key_t + key_f) > 0 else 0
else:
key_accuracy = 0
reverted_accuracy = 0
return labels_precision, precision, recall, f1_score, key_accuracy, reverted_accuracy
def evaluate_model_by_class_single_image(pred_boxes, true_boxes, pred_labels, true_labels, class_tp, class_fp, class_fn, model_dict, iou_threshold=0.5):
matched_true_boxes = set()
for pred_idx, (pred_box, pred_label) in enumerate(zip(pred_boxes, pred_labels)):
match_found = False
for true_idx, (true_box, true_label) in enumerate(zip(true_boxes, true_labels)):
if true_idx in matched_true_boxes:
continue
if pred_label == true_label and iou(np.array(pred_box), np.array(true_box)) >= iou_threshold:
class_tp[model_dict[pred_label]] += 1
matched_true_boxes.add(true_idx)
match_found = True
break
if not match_found:
class_fp[model_dict[pred_label]] += 1
for idx, true_label in enumerate(true_labels):
if idx not in matched_true_boxes:
class_fn[model_dict[true_label]] += 1
def pred_4_evaluation_per_class(model, loader, score_threshold=0.5, iou_threshold=0.5):
model.eval()
with torch.no_grad():
for images, targets_im in tqdm(loader, desc="Testing... "):
devices = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
images = [image.to(devices) for image in images]
targets = [{k: v.clone().detach().to(devices) for k, v in t.items()} for t in targets_im]
predictions = model(images)
for target, prediction in zip(targets, predictions):
true_boxes = target['boxes'].cpu().numpy()
true_labels = target['labels'].cpu().numpy()
pred_boxes = prediction['boxes'].cpu().numpy()
scores = prediction['scores'].cpu().numpy()
pred_labels = prediction['labels'].cpu().numpy()
idx = np.where(scores > score_threshold)[0]
pred_boxes = pred_boxes[idx]
scores = scores[idx]
pred_labels = pred_labels[idx]
selected_boxes = non_maximum_suppression(pred_boxes, scores, iou_threshold=iou_threshold)
pred_boxes = pred_boxes[selected_boxes]
scores = scores[selected_boxes]
pred_labels = pred_labels[selected_boxes]
yield pred_boxes, true_boxes, pred_labels, true_labels
def evaluate_model_by_class(model, test_loader, model_dict, score_threshold=0.5, iou_threshold=0.5):
class_tp = {cls: 0 for cls in model_dict.values()}
class_fp = {cls: 0 for cls in model_dict.values()}
class_fn = {cls: 0 for cls in model_dict.values()}
for pred_boxes, true_boxes, pred_labels, true_labels in pred_4_evaluation_per_class(model, test_loader, score_threshold, iou_threshold):
evaluate_model_by_class_single_image(pred_boxes, true_boxes, pred_labels, true_labels, class_tp, class_fp, class_fn, model_dict, iou_threshold)
class_precision = {}
class_recall = {}
class_f1_score = {}
for cls in model_dict.values():
precision = class_tp[cls] / (class_tp[cls] + class_fp[cls]) if class_tp[cls] + class_fp[cls] > 0 else 0
recall = class_tp[cls] / (class_tp[cls] + class_fn[cls]) if class_tp[cls] + class_fn[cls] > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
class_precision[cls] = precision
class_recall[cls] = recall
class_f1_score[cls] = f1_score
return class_precision, class_recall, class_f1_score