File size: 4,022 Bytes
719d0db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import numpy as np
import torch
from torchmetrics.classification import ConfusionMatrix

def calc_tour_length(tour, coords):
    tour_length = []
    for i in range(len(tour) - 1):
        path_length = np.linalg.norm(coords[tour[i]] - coords[tour[i + 1]])
        tour_length.append(path_length)
    tour_length = np.sum(tour_length)
    return tour_length

class TemporalConfusionMatrix():
    def __init__(self, num_classes: int, seq_length: int, device: str):
        if num_classes == 2:
            task = "binary" 
        elif num_classes > 2:
            task = "multiclass"
        else:
            assert False, "Invalid num_classes. It should be more than 2"
        
        self.num_classes = num_classes
        self.seq_length = seq_length
        self.temp_confmat = [ConfusionMatrix(task=task, num_classes=num_classes).to(device) for _ in range(seq_length)]

    def update(self, 
               preds: torch.Tensor,
               labels: torch.Tensor,
               mask: torch.Tensor):
        """
        Parameters
        ----------
        preds: predicted labels [batch_size x max_seq_length]
        label: ground truth labels [batch_size x max_seq_length]
        mask:  mask of padding [batch_size x max_seq_length]
        """
        batch_mask = (mask.sum(-1) == self.seq_length) # [batch_size]
        preds = preds[batch_mask]   # [fixed_batch_size x max_seq_length]
        labels = labels[batch_mask] # [fixed_batch_size x max_seq_length]
        for i in range(self.seq_length):
            self.temp_confmat[i](preds[:, i], labels[:, i])

    def compute(self, device="cpu"):
        return [confmat.compute().to(device) for confmat in self.temp_confmat]

def calc_route_length(routes: list, coords: np.array):
    route_length = []
    num_vehicles = get_num_vehicles(routes)
    for vehicle_id in range(num_vehicles):
        route = routes[vehicle_id]
        for i in range(len(route) - 1):
            edge_length = np.linalg.norm(coords[route[i]] - coords[route[i + 1]])
            route_length.append(edge_length)
    route_length = np.sum(route_length)
    return route_length

def get_num_vehicles(routes: list):
    return len(routes)

def calc_class_ratio(labels: torch.Tensor, routes: list):
    """
    Parameters
    ----------
    lables: torch.Tensor [num_vehicles, max_seq_length]
    routes: 2d list [num_vehicles, seq_length]
    """
    if isinstance(labels, torch.Tensor):
        num_classes = 3 # labels.max().item() + 1
        class_list = [0 for _ in range(num_classes)]
        print(labels, routes)
        for vehicle_id in range(get_num_vehicles(routes)):
            for step in range(len(routes[vehicle_id])-1):
                class_list[labels[vehicle_id, step].item()] += 1
    else:
        if len(labels) > 1:
            num_classes = np.max(np.max(labels)) + 1
        else:    
            num_classes = np.max(labels) + 1
        class_list = [0 for _ in range(num_classes)]
        for vehicle_id in range(get_num_vehicles(routes)):
            for step in range(len(routes[vehicle_id])-1):
                class_list[labels[vehicle_id][step]] += 1
    class_list = np.array(class_list)
    return class_list / np.sum(class_list)

def calc_feasible_ratio(routes: list, coords: np.array):
    return len(np.unique(routes)) / len(coords)

def calc_total_prizes(routes: list, prizes: np.array):
    total_prizes = 0.
    num_vehicles = get_num_vehicles(routes)
    for vehicle_id in range(num_vehicles):
        route = routes[vehicle_id]
        for i in range(len(route) - 1):
            total_prizes += prizes[route[i]]
    return total_prizes

def calc_total_penlties(routes: list, penalities: np.array):
    total_penalites = 0.
    num_nodes = len(penalities)
    node_ids = np.arange(num_nodes)
    visited_node_ids = np.unique(routes)
    unvisited_node_ids = np.setdiff1d(node_ids, visited_node_ids)
    for unvisited_node_id in unvisited_node_ids:
        total_penalites += penalities[unvisited_node_id]
    return total_penalites