Spaces:
Running
Running
| import torch.nn as nn | |
| import numpy as np | |
| class CFTourGenerator(nn.Module): | |
| def __init__(self, cf_solver): | |
| super().__init__() | |
| self.solver = cf_solver | |
| self.problem = cf_solver.problem | |
| def forward(self, factual_tour, vehicle_id, cf_step, cf_next_node_id, node_feats, dist_matrix=None): | |
| """ | |
| solve an input instance with visited edges fixed | |
| Parameters | |
| ---------- | |
| factual_tour: list [seq_length] | |
| cf_step: int | |
| cf_next_node_id: int | |
| node_feats: | |
| Returns | |
| ------- | |
| cf_tour: np.array [seq_length] | |
| """ | |
| fixed_paths = self.get_fixed_paths(factual_tour, vehicle_id, cf_step, cf_next_node_id) | |
| cf_tours = self.solver.solve(node_feats, fixed_paths, dist_matrix=dist_matrix) | |
| if cf_tours is None: | |
| return | |
| if (cf_step > 0): | |
| for vehicle_id, cf_tour in enumerate(cf_tours): | |
| if cf_next_node_id in cf_tour: | |
| if cf_step == 1: | |
| if cf_tour[1] != cf_next_node_id: | |
| cf_tours[vehicle_id] = np.flipud(cf_tour) | |
| break | |
| else: | |
| if (factual_tour[vehicle_id][1] != cf_tour[1]): | |
| cf_tours[vehicle_id] = np.flipud(cf_tour) # make direction of the cf tour the same as factual one | |
| break | |
| print("aaaa", cf_tours) | |
| return cf_tours | |
| def get_fixed_paths(self, factual_tour, vehicle_id, cf_step, cf_next_node_id): | |
| visited_paths = np.append(factual_tour[vehicle_id][:cf_step], cf_next_node_id) | |
| return visited_paths | |
| # def get_avail_edges(self, factual_tour, cf_step, cf_next_node_id): | |
| # visited_paths = np.append(factual_tour[:cf_step], cf_next_node_id) | |
| # avail_edges = [] | |
| # # add fixed edges | |
| # for i in range(len(visited_paths) - 1): | |
| # avail_edges.append([visited_paths[i], visited_paths[i + 1]]) | |
| # print(avail_edges) | |
| # # add rest avaialbel edges | |
| # num_nodes = np.max(factual_tour) + 1 | |
| # visited = np.array([0] * num_nodes) | |
| # for id in visited_paths: | |
| # visited[id] = 1 | |
| # visited[factual_tour[0]] = 0 | |
| # visited[cf_next_node_id] = 0 | |
| # mask = visited < 1 | |
| # node_id = np.arange(num_nodes) | |
| # feasible_node_id = node_id[mask] | |
| # for j in range(len(feasible_node_id) - 1): | |
| # for i in range(j + 1, len(feasible_node_id)): | |
| # if ((feasible_node_id[j] == factual_tour[0]) and (feasible_node_id[i] == cf_next_node_id)) or ((feasible_node_id[i] == factual_tour[0]) and (feasible_node_id[j] == cf_next_node_id)): | |
| # continue | |
| # avail_edges.append([feasible_node_id[j], feasible_node_id[i]]) | |
| # return np.array(avail_edges) | |
| #----------- | |
| # unit test | |
| #----------- | |
| if __name__ == "__main__": | |
| import argparse | |
| import random | |
| import matplotlib.pyplot as plt | |
| # FYI: | |
| # - https://yu-nix.com/archives/python-path-get/ | |
| # - https://www.delftstack.com/ja/howto/python/python-get-parent-directory/ | |
| # - https://stackoverflow.com/questions/2817264/how-to-get-the-parent-dir-location | |
| import os | |
| import sys | |
| CURR_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| PARENT_DIR = os.path.abspath(os.path.join(CURR_DIR, os.pardir)) | |
| sys.path.append(PARENT_DIR) | |
| from utils.util_vis import visualize_factual_and_cf_tours | |
| from lkh.lkh import LKH | |
| from models.ortools.ortools import ORTools | |
| from data_generator.tsptw.tsptw_dataset import generate_tsptw_instance | |
| parser = argparse.ArgumentParser(description='') | |
| # general settings | |
| parser.add_argument("--problem", type=str, default="tsptw") | |
| parser.add_argument("--random_seed", type=int, default=1234) | |
| parser.add_argument("--num_samples", type=int, default=5) | |
| parser.add_argument("--num_nodes", type=int, default=100) | |
| parser.add_argument("--coord_dim", type=int, default=2) | |
| # LKH settings | |
| parser.add_argument("--max_trials", type=int, default=1000) | |
| parser.add_argument("--lkh_dir", type=str, default="lkh", help="Path to the binary of LKH") | |
| parser.add_argument("--io_dir", type=str, default="lkh_io_files") | |
| args = parser.parse_args() | |
| # models | |
| # cf_solver = LKH(args.problem, args.max_trials, args.random_seed, lkh_dir=args.lkh_dir, io_dir=args.io_dir) | |
| cf_solver = ORTools(args.problem) | |
| cf_generator = CFTourGenerator(cf_solver) | |
| # dataset | |
| if args.problem == "tsp": | |
| np.random.seed(args.random_seed) | |
| node_feats = np.random.uniform(size=[args.num_samples, args.num_nodes, args.coord_dim]) | |
| elif args.problem == "tsptw": | |
| coords, time_window, grid_size = generate_tsptw_instance(num_nodes=args.num_nodes, grid_size=100, max_tw_gap=10, max_tw_size=1000, is_integer_instance=True, da_silva_style=True) | |
| node_feats = np.concatenate([coords, time_window], -1) | |
| node_feats = node_feats[None, :, :] | |
| # function ot automatically generate couterfactual visit | |
| def get_random_cf_visit(factual_tour, random_seed=1234): | |
| # random.seed(random_seed) | |
| num_nodes = np.max(factual_tour) + 1 | |
| step = random.randrange(len(factual_tour) - 2) # remove the last step (returning to the start-point) | |
| visited = np.array([0] * num_nodes) | |
| for i in range(step+1): | |
| visited[factual_tour[i]] = 1 | |
| mask = visited < 1 | |
| node_id = np.arange(num_nodes) | |
| feasible_node_id = node_id[mask] | |
| cf_next_id = random.choice(feasible_node_id) # select counterfactual | |
| return step, cf_next_id | |
| for i in range(len(node_feats)): | |
| # obtain a factual tour | |
| factual_tour = cf_solver.solve(node_feats[i]) | |
| # counterfactual visit | |
| cf_step, cf_next_node_id = get_random_cf_visit(factual_tour, random_seed=args.random_seed) | |
| print(cf_step, cf_next_node_id) | |
| # obtain a counterfactual tour | |
| cf_tour = cf_generator(factual_tour, cf_step, cf_next_node_id, node_feats[i]) | |
| print(factual_tour) | |
| print(cf_tour) | |
| # visualize the factual and counterfactual tours | |
| if args.problem == "tsp": | |
| coords = node_feats[i] | |
| elif args.problem == "tsptw": | |
| coord_dim = 2 | |
| coords = node_feats[i, :, :coord_dim] | |
| visualize_factual_and_cf_tours(factual_tour, cf_tour, coords, cf_step, f"test{i}.png") | |
| break |