Spaces:
Sleeping
Sleeping
import torch.nn as nn | |
import numpy as np | |
class CFTourGenerator(nn.Module): | |
def __init__(self, cf_solver): | |
super().__init__() | |
self.solver = cf_solver | |
self.problem = cf_solver.problem | |
def forward(self, factual_tour, vehicle_id, cf_step, cf_next_node_id, node_feats, dist_matrix=None): | |
""" | |
solve an input instance with visited edges fixed | |
Parameters | |
---------- | |
factual_tour: list [seq_length] | |
cf_step: int | |
cf_next_node_id: int | |
node_feats: | |
Returns | |
------- | |
cf_tour: np.array [seq_length] | |
""" | |
fixed_paths = self.get_fixed_paths(factual_tour, vehicle_id, cf_step, cf_next_node_id) | |
cf_tours = self.solver.solve(node_feats, fixed_paths, dist_matrix=dist_matrix) | |
if cf_tours is None: | |
return | |
if (cf_step > 0): | |
for vehicle_id, cf_tour in enumerate(cf_tours): | |
if cf_next_node_id in cf_tour: | |
if cf_step == 1: | |
if cf_tour[1] != cf_next_node_id: | |
cf_tours[vehicle_id] = np.flipud(cf_tour) | |
break | |
else: | |
if (factual_tour[vehicle_id][1] != cf_tour[1]): | |
cf_tours[vehicle_id] = np.flipud(cf_tour) # make direction of the cf tour the same as factual one | |
break | |
print("aaaa", cf_tours) | |
return cf_tours | |
def get_fixed_paths(self, factual_tour, vehicle_id, cf_step, cf_next_node_id): | |
visited_paths = np.append(factual_tour[vehicle_id][:cf_step], cf_next_node_id) | |
return visited_paths | |
# def get_avail_edges(self, factual_tour, cf_step, cf_next_node_id): | |
# visited_paths = np.append(factual_tour[:cf_step], cf_next_node_id) | |
# avail_edges = [] | |
# # add fixed edges | |
# for i in range(len(visited_paths) - 1): | |
# avail_edges.append([visited_paths[i], visited_paths[i + 1]]) | |
# print(avail_edges) | |
# # add rest avaialbel edges | |
# num_nodes = np.max(factual_tour) + 1 | |
# visited = np.array([0] * num_nodes) | |
# for id in visited_paths: | |
# visited[id] = 1 | |
# visited[factual_tour[0]] = 0 | |
# visited[cf_next_node_id] = 0 | |
# mask = visited < 1 | |
# node_id = np.arange(num_nodes) | |
# feasible_node_id = node_id[mask] | |
# for j in range(len(feasible_node_id) - 1): | |
# for i in range(j + 1, len(feasible_node_id)): | |
# if ((feasible_node_id[j] == factual_tour[0]) and (feasible_node_id[i] == cf_next_node_id)) or ((feasible_node_id[i] == factual_tour[0]) and (feasible_node_id[j] == cf_next_node_id)): | |
# continue | |
# avail_edges.append([feasible_node_id[j], feasible_node_id[i]]) | |
# return np.array(avail_edges) | |
#----------- | |
# unit test | |
#----------- | |
if __name__ == "__main__": | |
import argparse | |
import random | |
import matplotlib.pyplot as plt | |
# FYI: | |
# - https://yu-nix.com/archives/python-path-get/ | |
# - https://www.delftstack.com/ja/howto/python/python-get-parent-directory/ | |
# - https://stackoverflow.com/questions/2817264/how-to-get-the-parent-dir-location | |
import os | |
import sys | |
CURR_DIR = os.path.dirname(os.path.abspath(__file__)) | |
PARENT_DIR = os.path.abspath(os.path.join(CURR_DIR, os.pardir)) | |
sys.path.append(PARENT_DIR) | |
from utils.util_vis import visualize_factual_and_cf_tours | |
from lkh.lkh import LKH | |
from models.ortools.ortools import ORTools | |
from data_generator.tsptw.tsptw_dataset import generate_tsptw_instance | |
parser = argparse.ArgumentParser(description='') | |
# general settings | |
parser.add_argument("--problem", type=str, default="tsptw") | |
parser.add_argument("--random_seed", type=int, default=1234) | |
parser.add_argument("--num_samples", type=int, default=5) | |
parser.add_argument("--num_nodes", type=int, default=100) | |
parser.add_argument("--coord_dim", type=int, default=2) | |
# LKH settings | |
parser.add_argument("--max_trials", type=int, default=1000) | |
parser.add_argument("--lkh_dir", type=str, default="lkh", help="Path to the binary of LKH") | |
parser.add_argument("--io_dir", type=str, default="lkh_io_files") | |
args = parser.parse_args() | |
# models | |
# cf_solver = LKH(args.problem, args.max_trials, args.random_seed, lkh_dir=args.lkh_dir, io_dir=args.io_dir) | |
cf_solver = ORTools(args.problem) | |
cf_generator = CFTourGenerator(cf_solver) | |
# dataset | |
if args.problem == "tsp": | |
np.random.seed(args.random_seed) | |
node_feats = np.random.uniform(size=[args.num_samples, args.num_nodes, args.coord_dim]) | |
elif args.problem == "tsptw": | |
coords, time_window, grid_size = generate_tsptw_instance(num_nodes=args.num_nodes, grid_size=100, max_tw_gap=10, max_tw_size=1000, is_integer_instance=True, da_silva_style=True) | |
node_feats = np.concatenate([coords, time_window], -1) | |
node_feats = node_feats[None, :, :] | |
# function ot automatically generate couterfactual visit | |
def get_random_cf_visit(factual_tour, random_seed=1234): | |
# random.seed(random_seed) | |
num_nodes = np.max(factual_tour) + 1 | |
step = random.randrange(len(factual_tour) - 2) # remove the last step (returning to the start-point) | |
visited = np.array([0] * num_nodes) | |
for i in range(step+1): | |
visited[factual_tour[i]] = 1 | |
mask = visited < 1 | |
node_id = np.arange(num_nodes) | |
feasible_node_id = node_id[mask] | |
cf_next_id = random.choice(feasible_node_id) # select counterfactual | |
return step, cf_next_id | |
for i in range(len(node_feats)): | |
# obtain a factual tour | |
factual_tour = cf_solver.solve(node_feats[i]) | |
# counterfactual visit | |
cf_step, cf_next_node_id = get_random_cf_visit(factual_tour, random_seed=args.random_seed) | |
print(cf_step, cf_next_node_id) | |
# obtain a counterfactual tour | |
cf_tour = cf_generator(factual_tour, cf_step, cf_next_node_id, node_feats[i]) | |
print(factual_tour) | |
print(cf_tour) | |
# visualize the factual and counterfactual tours | |
if args.problem == "tsp": | |
coords = node_feats[i] | |
elif args.problem == "tsptw": | |
coord_dim = 2 | |
coords = node_feats[i, :, :coord_dim] | |
visualize_factual_and_cf_tours(factual_tour, cf_tour, coords, cf_step, f"test{i}.png") | |
break |