import osmnx as ox import pandas as pd import networkx as nx import matplotlib.pyplot as plt from ortools.constraint_solver import pywrapcp from ortools.constraint_solver import routing_enums_pb2 class pub_crawl: def __init__(self, df, G): '''Initialise a pub_crawl instance df: pd.Dataframe containing pubs 'name' and coordinates ('latitude', 'longitude') G: nx.MultiDiGraph of the map area ''' self.df = df self.G = G self.pub_names = df['name'].to_list() self.initial_route = self.pub_names self.optimal_route = None self.optimal_distance = None self.pub_nodes = self.create_pub_nodes() def create_pub_nodes(self): # Dictionary of pub names and coordinates pubs_dict = self.df.drop('address', axis=1).set_index('name').T.to_dict('list') # Get graph nodes for each pub pub_nodes = {} for k, v in pubs_dict.items(): pub_nodes[k] = ox.nearest_nodes(self.G, X = v[1], Y = v[0]) return pub_nodes def get_route_length(self, p0, p1): # Find length in meters of shortest path from pub p0 to pub p1 route = nx.shortest_path(self.G, self.pub_nodes[p0], self.pub_nodes[p1], weight='length') route_lengths = ox.utils_graph.get_route_edge_attributes(self.G, route, attribute = 'length') route_length_total = sum(route_lengths) return route_length_total def create_distance_matrix(self, pubs_considered): distance_matrix = [] for i in range(len(pubs_considered)): row = [] for j in range(len(pubs_considered)): distance = self.get_route_length(pubs_considered[i], pubs_considered[j]) row.append(round(distance*1000)) # avoids rounding error in Google's OR-Tools package distance_matrix.append(row) return distance_matrix def plot_map(self): node_colours = ['#FF0000' if i in list(self.pub_nodes.values()) else '#999999' for i in self.G.nodes] fig, ax = ox.plot_graph(self.G, bgcolor='#FFFFFF', node_color=node_colours, show=False, close=False) for _, node in ox.graph_to_gdfs(self.G, nodes=True, edges=False).fillna("").iterrows(): for k, v in self.pub_nodes.items(): if node.name == v: c = node["geometry"].centroid ax.annotate(k, xy=(c.x, c.y), xycoords='data', xytext=(3, -2), textcoords='offset points', size=8) plt.show() def get_route_nodes(self, route): # Find intermediary route nodes for plotting route_nodes = [] for i in range(len(route)-1): path = nx.shortest_path(self.G, self.pub_nodes[route[i]], self.pub_nodes[route[i+1]], weight='length') route_nodes.append(path) return route_nodes def plot_route(self, route): route_nodes = self.get_route_nodes(route) fig, ax = ox.plot_graph_routes(self.G, route_nodes, bgcolor='#FFFFFF', show=False, close=False, figsize=(12, 12)) for _, node in ox.graph_to_gdfs(self.G, nodes=True, edges=False).fillna("").iterrows(): for i, k in enumerate(route): if node.name == self.pub_nodes[k]: c = node["geometry"].centroid ax.annotate(f'{i}: {k}', xy=(c.x, c.y), xycoords='data', xytext=(3, -2), textcoords='offset points', size=8) return fig def create_data(self, start, pubs_considered): data = {} start_index = pubs_considered.index(start) data['distance_matrix'] = self.create_distance_matrix(pubs_considered) data['num_vehicles'] = 1 data['depot'] = start_index return data def format_solution(self, manager, routing, solution, pubs_considered): """Formats solution for osmnx plotting""" index = routing.Start(0) route = [index] leg_distances = [] route_distance = 0 while not routing.IsEnd(index): previous_index = index index = solution.Value(routing.NextVar(index)) if route[0] == manager.IndexToNode(index): break route.append(manager.IndexToNode(index)) leg_distance = routing.GetArcCostForVehicle(previous_index, index, 0) leg_distances.append(leg_distance) route_distance += leg_distance self.optimal_route = [pubs_considered[i] for i in route] self.optimal_distance = round(route_distance/1000) def optimise(self, start_point, pubs_considered): def distance_callback(from_index, to_index): """Returns the distance between the two nodes.""" # Convert from routing variable Index to distance matrix NodeIndex from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) return data['distance_matrix'][from_node][to_node] data = self.create_data(start_point, pubs_considered) # Create the routing index manager manager = pywrapcp.RoutingIndexManager(len(data['distance_matrix']), data['num_vehicles'], data['depot']) # Create Routing Model routing = pywrapcp.RoutingModel(manager) transit_callback_index = routing.RegisterTransitCallback(distance_callback) # Define cost of each arc routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) # Settings for simualted annealing search_parameters = pywrapcp.DefaultRoutingSearchParameters() search_parameters.local_search_metaheuristic = ( routing_enums_pb2.LocalSearchMetaheuristic.SIMULATED_ANNEALING) search_parameters.time_limit.seconds = 5 search_parameters.log_search = True # Solve the problem solution = routing.SolveWithParameters(search_parameters) # Format solution if solution: self.format_solution(manager, routing, solution, pubs_considered) if __name__=='main': df = pd.read_csv('galway_pubs.csv') G = ox.io.load_graphml('galway.graphml') crawler = pub_crawl(df, G) print('Locations of pubs in Galway to consider:') crawler.plot_map() initial_route = crawler.initial_route print('Initial route before optimisation (default pub ordering):') crawler.plot_route(initial_route) start_pub = 'The Sliding Rock' crawler.optimise(start_pub, ['The Sliding Rock', 'The Salt House', 'Caribou']) optimal_route = crawler.optimal_route print('Optimised route:') print(f'Route distance: {crawler.optimal_distance} meters') crawler.plot_route(optimal_route)