Spaces:
Sleeping
Sleeping
| import math | |
| import random | |
| import time | |
| from typing import Dict, List, Tuple, Optional | |
| from functools import lru_cache | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.cluster import KMeans | |
| from PIL import Image | |
| import io | |
| # --------------------------- | |
| # Data utils | |
| # --------------------------- | |
| def make_template_dataframe(): | |
| """Blank template users can download/fill.""" | |
| return pd.DataFrame( | |
| { | |
| "id": ["A", "B", "C"], | |
| "x": [10, -5, 15], | |
| "y": [4, -12, 8], | |
| "demand": [1, 2, 1], | |
| "tw_start": [0, 0, 0], # optional: earliest arrival (soft) | |
| "tw_end": [9999, 9999, 9999], # optional: latest arrival (soft) | |
| "service": [0, 0, 0], # optional: service time at stop | |
| } | |
| ) | |
| def parse_uploaded_csv(file) -> pd.DataFrame: | |
| df = pd.read_csv(file.name if hasattr(file, "name") else file) | |
| required = {"id", "x", "y", "demand"} | |
| missing = required - set(df.columns) | |
| if missing: | |
| raise ValueError(f"Missing required columns: {sorted(missing)}") | |
| # fill optional columns if absent | |
| if "tw_start" not in df.columns: | |
| df["tw_start"] = 0 | |
| if "tw_end" not in df.columns: | |
| df["tw_end"] = 999999 | |
| if "service" not in df.columns: | |
| df["service"] = 0 | |
| # Normalize types | |
| df["id"] = df["id"].astype(str) | |
| for col in ["x", "y", "demand", "tw_start", "tw_end", "service"]: | |
| df[col] = pd.to_numeric(df[col], errors="coerce") | |
| df = df.dropna() | |
| df.reset_index(drop=True, inplace=True) | |
| return df | |
| def generate_random_instance( | |
| n_clients=30, | |
| n_vehicles=4, | |
| capacity=10, | |
| spread=50, | |
| demand_min=1, | |
| demand_max=3, | |
| seed=42, | |
| ) -> pd.DataFrame: | |
| rng = np.random.default_rng(seed) | |
| xs = rng.uniform(-spread, spread, size=n_clients) | |
| ys = rng.uniform(-spread, spread, size=n_clients) | |
| demands = rng.integers(demand_min, demand_max + 1, size=n_clients) | |
| df = pd.DataFrame( | |
| { | |
| "id": [f"C{i+1}" for i in range(n_clients)], | |
| "x": xs, | |
| "y": ys, | |
| "demand": demands, | |
| "tw_start": np.zeros(n_clients, dtype=float), | |
| "tw_end": np.full(n_clients, 999999.0), | |
| "service": np.zeros(n_clients, dtype=float), | |
| } | |
| ) | |
| return df | |
| # Validation | |
| def validate_instance(df: pd.DataFrame, n_vehicles: int, capacity: float) -> None: | |
| """ | |
| Validate that the VRP instance is feasible and properly formatted. | |
| Raises ValueError if validation fails. | |
| """ | |
| # Check required columns | |
| required = {'x', 'y', 'demand', 'tw_start', 'tw_end', 'service'} | |
| missing = required - set(df.columns) | |
| if missing: | |
| raise ValueError(f"Missing required columns: {sorted(missing)}") | |
| # Check for empty dataframe | |
| if len(df) == 0: | |
| raise ValueError("DataFrame is empty - no clients to route") | |
| # Check for negative values in key fields | |
| if (df['demand'] < 0).any(): | |
| raise ValueError("Negative demand values detected") | |
| if (df['service'] < 0).any(): | |
| raise ValueError("Negative service time values detected") | |
| # Check time window consistency | |
| if (df['tw_start'] > df['tw_end']).any(): | |
| invalid_rows = df[df['tw_start'] > df['tw_end']] | |
| raise ValueError(f"Invalid time windows (start > end) for {len(invalid_rows)} stops") | |
| # Check feasibility: total demand vs total capacity | |
| total_demand = df['demand'].sum() | |
| total_capacity = n_vehicles * capacity | |
| if total_demand > total_capacity: | |
| raise ValueError( | |
| f"Infeasible instance: total demand ({total_demand:.1f}) " | |
| f"exceeds total capacity ({total_capacity:.1f})" | |
| ) | |
| # Check for NaN or inf values | |
| numeric_cols = ['x', 'y', 'demand', 'tw_start', 'tw_end', 'service'] | |
| if df[numeric_cols].isna().any().any(): | |
| raise ValueError("NaN values detected in numeric columns") | |
| if np.isinf(df[numeric_cols].values).any(): | |
| raise ValueError("Infinite values detected in numeric columns") | |
| # Warn if capacity is very small | |
| if capacity <= 0: | |
| raise ValueError("Vehicle capacity must be positive") | |
| if n_vehicles <= 0: | |
| raise ValueError("Number of vehicles must be positive") | |
| # Geometry / distance helpers | |
| def euclid(a: Tuple[float, float], b: Tuple[float, float]) -> float: | |
| return float(math.hypot(a[0] - b[0], a[1] - b[1])) | |
| def total_distance(points: List[Tuple[float, float]]) -> float: | |
| return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1)) | |
| def build_distance_matrix(df: pd.DataFrame, depot: Tuple[float, float]) -> np.ndarray: | |
| """ | |
| Build a distance matrix for all clients and depot. | |
| Matrix[i][j] = distance from i to j, where 0 is depot. | |
| Returns (n+1) x (n+1) matrix. | |
| """ | |
| n = len(df) | |
| points = [depot] + [(df.loc[i, 'x'], df.loc[i, 'y']) for i in range(n)] | |
| dist_matrix = np.zeros((n + 1, n + 1)) | |
| for i in range(n + 1): | |
| for j in range(i + 1, n + 1): | |
| d = euclid(points[i], points[j]) | |
| dist_matrix[i][j] = d | |
| dist_matrix[j][i] = d | |
| return dist_matrix | |
| # Clustering algorithms | |
| def sweep_clusters( | |
| df: pd.DataFrame, | |
| depot: Tuple[float, float], | |
| n_vehicles: int, | |
| capacity: float, | |
| ) -> List[List[int]]: | |
| """ | |
| Assign clients to vehicles by angular sweep around the depot, roughly balancing | |
| capacity (sum of 'demand'). | |
| Returns indices (row numbers) per cluster. | |
| """ | |
| dx = df["x"].values - depot[0] | |
| dy = df["y"].values - depot[1] | |
| ang = np.arctan2(dy, dx) | |
| order = np.argsort(ang) | |
| clusters: List[List[int]] = [[] for _ in range(n_vehicles)] | |
| loads = [0.0] * n_vehicles | |
| v = 0 | |
| for idx in order: | |
| d = float(df.loc[idx, "demand"]) | |
| # if adding to current vehicle exceeds capacity *by a lot*, move to next | |
| if loads[v] + d > capacity and v < n_vehicles - 1: | |
| v += 1 | |
| clusters[v].append(int(idx)) | |
| loads[v] += d | |
| return clusters | |
| def kmeans_clusters( | |
| df: pd.DataFrame, | |
| depot: Tuple[float, float], | |
| n_vehicles: int, | |
| capacity: float, | |
| random_state: int = 42, | |
| ) -> List[List[int]]: | |
| """ | |
| Assign clients using k-means clustering, then balance capacity. | |
| Returns indices (row numbers) per cluster. | |
| """ | |
| if len(df) == 0: | |
| return [[] for _ in range(n_vehicles)] | |
| # K-means clustering | |
| X = df[['x', 'y']].values | |
| kmeans = KMeans(n_clusters=min(n_vehicles, len(df)), random_state=random_state, n_init=10) | |
| labels = kmeans.fit_predict(X) | |
| # Group by cluster | |
| initial_clusters: List[List[int]] = [[] for _ in range(n_vehicles)] | |
| for idx, label in enumerate(labels): | |
| initial_clusters[label].append(idx) | |
| # Balance capacity: if a cluster exceeds capacity, split it | |
| balanced_clusters: List[List[int]] = [] | |
| for cluster in initial_clusters: | |
| if not cluster: | |
| continue | |
| # Sort by angle from depot for deterministic splitting | |
| cluster_sorted = sorted(cluster, key=lambda i: np.arctan2( | |
| df.loc[i, 'y'] - depot[1], | |
| df.loc[i, 'x'] - depot[0] | |
| )) | |
| current = [] | |
| current_load = 0.0 | |
| for idx in cluster_sorted: | |
| demand = float(df.loc[idx, 'demand']) | |
| if current_load + demand > capacity and current: | |
| balanced_clusters.append(current) | |
| current = [idx] | |
| current_load = demand | |
| else: | |
| current.append(idx) | |
| current_load += demand | |
| if current: | |
| balanced_clusters.append(current) | |
| # Pad with empty clusters if needed | |
| while len(balanced_clusters) < n_vehicles: | |
| balanced_clusters.append([]) | |
| return balanced_clusters[:n_vehicles] | |
| def clarke_wright_savings( | |
| df: pd.DataFrame, | |
| depot: Tuple[float, float], | |
| n_vehicles: int, | |
| capacity: float, | |
| ) -> List[List[int]]: | |
| """ | |
| Clarke-Wright Savings algorithm for VRP clustering. | |
| Returns indices (row numbers) per route. | |
| """ | |
| n = len(df) | |
| if n == 0: | |
| return [[] for _ in range(n_vehicles)] | |
| # Build distance matrix | |
| dist_matrix = build_distance_matrix(df, depot) | |
| # Calculate savings: s(i,j) = d(0,i) + d(0,j) - d(i,j) | |
| savings = [] | |
| for i in range(1, n + 1): | |
| for j in range(i + 1, n + 1): | |
| s = dist_matrix[0][i] + dist_matrix[0][j] - dist_matrix[i][j] | |
| savings.append((s, i - 1, j - 1)) # Convert to 0-indexed client ids | |
| # Sort by savings (descending) | |
| savings.sort(reverse=True) | |
| # Initialize: each client in its own route | |
| routes: List[List[int]] = [[i] for i in range(n)] | |
| route_loads = [float(df.loc[i, 'demand']) for i in range(n)] | |
| # Track which route each client belongs to | |
| client_to_route = {i: i for i in range(n)} | |
| # Merge routes based on savings | |
| for saving, i, j in savings: | |
| route_i = client_to_route[i] | |
| route_j = client_to_route[j] | |
| # Skip if already in same route | |
| if route_i == route_j: | |
| continue | |
| # Check capacity constraint | |
| if route_loads[route_i] + route_loads[route_j] > capacity: | |
| continue | |
| # Check if i and j are at ends of their respective routes | |
| ri = routes[route_i] | |
| rj = routes[route_j] | |
| # Only merge if they're at route ends | |
| i_at_end = (i == ri[0] or i == ri[-1]) | |
| j_at_end = (j == rj[0] or j == rj[-1]) | |
| if not (i_at_end and j_at_end): | |
| continue | |
| # Merge routes | |
| if i == ri[-1] and j == rj[0]: | |
| new_route = ri + rj | |
| elif i == ri[0] and j == rj[-1]: | |
| new_route = rj + ri | |
| elif i == ri[-1] and j == rj[-1]: | |
| new_route = ri + rj[::-1] | |
| elif i == ri[0] and j == rj[0]: | |
| new_route = ri[::-1] + rj | |
| else: | |
| continue | |
| # Update routes | |
| routes[route_i] = new_route | |
| route_loads[route_i] += route_loads[route_j] | |
| routes[route_j] = [] | |
| route_loads[route_j] = 0.0 | |
| # Update client mapping | |
| for client in rj: | |
| client_to_route[client] = route_i | |
| # Filter out empty routes and limit to n_vehicles | |
| final_routes = [r for r in routes if r] | |
| # If too many routes, merge smallest ones | |
| while len(final_routes) > n_vehicles: | |
| # Find two smallest routes that can be merged | |
| route_sizes = [(sum(df.loc[r, 'demand']), idx) for idx, r in enumerate(final_routes)] | |
| route_sizes.sort() | |
| merged = False | |
| for i in range(len(route_sizes)): | |
| for j in range(i + 1, len(route_sizes)): | |
| idx1, idx2 = route_sizes[i][1], route_sizes[j][1] | |
| if route_sizes[i][0] + route_sizes[j][0] <= capacity: | |
| final_routes[idx1].extend(final_routes[idx2]) | |
| final_routes.pop(idx2) | |
| merged = True | |
| break | |
| if merged: | |
| break | |
| if not merged: | |
| # Force merge smallest two even if over capacity | |
| idx1, idx2 = route_sizes[0][1], route_sizes[1][1] | |
| final_routes[idx1].extend(final_routes[idx2]) | |
| final_routes.pop(idx2) | |
| # Pad with empty routes if needed | |
| while len(final_routes) < n_vehicles: | |
| final_routes.append([]) | |
| return final_routes | |
| def greedy_nearest_neighbor( | |
| df: pd.DataFrame, | |
| depot: Tuple[float, float], | |
| n_vehicles: int, | |
| capacity: float, | |
| ) -> List[List[int]]: | |
| """ | |
| Greedy Nearest Neighbor algorithm for VRP. | |
| Build routes by always adding the nearest unvisited client. | |
| """ | |
| n = len(df) | |
| if n == 0: | |
| return [[] for _ in range(n_vehicles)] | |
| unvisited = set(range(n)) | |
| routes = [] | |
| while unvisited and len(routes) < n_vehicles: | |
| route = [] | |
| current_load = 0.0 | |
| current_pos = depot | |
| while unvisited: | |
| # Find nearest unvisited client that fits in current vehicle | |
| best_client = None | |
| best_distance = float('inf') | |
| for client_idx in unvisited: | |
| client_pos = (df.loc[client_idx, 'x'], df.loc[client_idx, 'y']) | |
| client_demand = df.loc[client_idx, 'demand'] | |
| # Check capacity constraint | |
| if current_load + client_demand <= capacity: | |
| distance = euclid(current_pos, client_pos) | |
| if distance < best_distance: | |
| best_distance = distance | |
| best_client = client_idx | |
| if best_client is None: | |
| break # No more clients fit in this vehicle | |
| # Add client to route | |
| route.append(best_client) | |
| unvisited.remove(best_client) | |
| current_load += df.loc[best_client, 'demand'] | |
| current_pos = (df.loc[best_client, 'x'], df.loc[best_client, 'y']) | |
| if route: | |
| routes.append(route) | |
| # If there are remaining unvisited clients, force them into existing routes | |
| route_idx = 0 | |
| for client_idx in list(unvisited): | |
| if route_idx < len(routes): | |
| routes[route_idx].append(client_idx) | |
| route_idx = (route_idx + 1) % len(routes) | |
| # Pad with empty routes if needed | |
| while len(routes) < n_vehicles: | |
| routes.append([]) | |
| return routes[:n_vehicles] | |
| def genetic_algorithm_vrp( | |
| df: pd.DataFrame, | |
| depot: Tuple[float, float], | |
| n_vehicles: int, | |
| capacity: float, | |
| population_size: int = 50, | |
| generations: int = 100, | |
| mutation_rate: float = 0.1, | |
| elite_size: int = 10, | |
| ) -> List[List[int]]: | |
| """ | |
| Genetic Algorithm for VRP. | |
| Evolves a population of solutions over multiple generations. | |
| """ | |
| n = len(df) | |
| if n == 0: | |
| return [[] for _ in range(n_vehicles)] | |
| # Individual representation: list of client indices with vehicle separators | |
| # Use -1 as vehicle separator | |
| def create_individual(): | |
| """Create a random individual (solution).""" | |
| clients = list(range(n)) | |
| random.shuffle(clients) | |
| # Insert vehicle separators randomly | |
| individual = [] | |
| separators_added = 0 | |
| for i, client in enumerate(clients): | |
| individual.append(client) | |
| # Add separator with some probability, but ensure we don't exceed n_vehicles-1 separators | |
| if (separators_added < n_vehicles - 1 and | |
| i < len(clients) - 1 and | |
| random.random() < 0.3): | |
| individual.append(-1) | |
| separators_added += 1 | |
| return individual | |
| def individual_to_routes(individual): | |
| """Convert individual to route format.""" | |
| routes = [] | |
| current_route = [] | |
| for gene in individual: | |
| if gene == -1: # Vehicle separator | |
| if current_route: | |
| routes.append(current_route) | |
| current_route = [] | |
| else: | |
| current_route.append(gene) | |
| if current_route: | |
| routes.append(current_route) | |
| # Pad with empty routes | |
| while len(routes) < n_vehicles: | |
| routes.append([]) | |
| return routes[:n_vehicles] | |
| def calculate_fitness(individual): | |
| """Calculate fitness (lower is better, so we'll use negative total distance).""" | |
| routes = individual_to_routes(individual) | |
| total_distance = 0.0 | |
| penalty = 0.0 | |
| for route in routes: | |
| if not route: | |
| continue | |
| # Check capacity constraint | |
| route_load = sum(df.loc[i, 'demand'] for i in route) | |
| if route_load > capacity: | |
| penalty += (route_load - capacity) * 1000 # Heavy penalty | |
| # Calculate route distance | |
| points = [depot] + [(df.loc[i, 'x'], df.loc[i, 'y']) for i in route] + [depot] | |
| route_distance = total_distance_func(points) | |
| total_distance += route_distance | |
| return -(total_distance + penalty) # Negative because we want to minimize | |
| def total_distance_func(points): | |
| """Calculate total distance for a sequence of points.""" | |
| return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1)) | |
| def crossover(parent1, parent2): | |
| """Order crossover (OX) for VRP.""" | |
| # Remove separators for crossover | |
| p1_clients = [g for g in parent1 if g != -1] | |
| p2_clients = [g for g in parent2 if g != -1] | |
| if len(p1_clients) < 2: | |
| return parent1[:], parent2[:] | |
| # Standard order crossover | |
| size = len(p1_clients) | |
| start, end = sorted(random.sample(range(size), 2)) | |
| child1 = [None] * size | |
| child2 = [None] * size | |
| # Copy segments | |
| child1[start:end] = p1_clients[start:end] | |
| child2[start:end] = p2_clients[start:end] | |
| # Fill remaining positions | |
| def fill_child(child, other_parent): | |
| remaining = [x for x in other_parent if x not in child] | |
| j = 0 | |
| for i in range(size): | |
| if child[i] is None: | |
| child[i] = remaining[j] | |
| j += 1 | |
| fill_child(child1, p2_clients) | |
| fill_child(child2, p1_clients) | |
| # Add separators back randomly | |
| def add_separators(child): | |
| result = [] | |
| separators_to_add = min(n_vehicles - 1, len(child) // 3) | |
| separator_positions = random.sample(range(1, len(child)), separators_to_add) | |
| for i, gene in enumerate(child): | |
| if i in separator_positions: | |
| result.append(-1) | |
| result.append(gene) | |
| return result | |
| return add_separators(child1), add_separators(child2) | |
| def mutate(individual): | |
| """Mutation: swap two random clients.""" | |
| if random.random() > mutation_rate: | |
| return individual | |
| clients = [i for i, g in enumerate(individual) if g != -1] | |
| if len(clients) < 2: | |
| return individual | |
| # Swap two random clients | |
| idx1, idx2 = random.sample(clients, 2) | |
| individual = individual[:] | |
| individual[idx1], individual[idx2] = individual[idx2], individual[idx1] | |
| return individual | |
| # Initialize population | |
| population = [create_individual() for _ in range(population_size)] | |
| # Evolution | |
| for generation in range(generations): | |
| # Calculate fitness for all individuals | |
| fitness_scores = [(individual, calculate_fitness(individual)) for individual in population] | |
| fitness_scores.sort(key=lambda x: x[1], reverse=True) # Higher fitness is better | |
| # Select elite | |
| elite = [individual for individual, _ in fitness_scores[:elite_size]] | |
| # Create new population | |
| new_population = elite[:] | |
| while len(new_population) < population_size: | |
| # Tournament selection | |
| tournament_size = 5 | |
| parent1 = max(random.sample(fitness_scores, min(tournament_size, len(fitness_scores))), | |
| key=lambda x: x[1])[0] | |
| parent2 = max(random.sample(fitness_scores, min(tournament_size, len(fitness_scores))), | |
| key=lambda x: x[1])[0] | |
| # Crossover | |
| child1, child2 = crossover(parent1, parent2) | |
| # Mutation | |
| child1 = mutate(child1) | |
| child2 = mutate(child2) | |
| new_population.extend([child1, child2]) | |
| population = new_population[:population_size] | |
| # Return best solution | |
| final_fitness = [(individual, calculate_fitness(individual)) for individual in population] | |
| best_individual = max(final_fitness, key=lambda x: x[1])[0] | |
| return individual_to_routes(best_individual) | |
| # Performance and Quality Analysis | |
| def get_algorithm_complexity(algorithm: str, n_clients: int) -> str: | |
| """ | |
| Return theoretical complexity of the algorithm. | |
| """ | |
| if algorithm == 'greedy': | |
| return f"O(n²) ≈ {n_clients**2:.0f} ops" | |
| elif algorithm == 'clarke_wright': | |
| return f"O(n³) ≈ {n_clients**3:.0f} ops" | |
| elif algorithm == 'genetic': | |
| return f"O(g×p×n) ≈ {100 * 50 * n_clients:.0f} ops (100 gen, 50 pop)" | |
| else: | |
| return "Unknown" | |
| def calculate_solution_quality_score(total_dist: float, dist_balance: float, load_balance: float, capacity_util: float) -> float: | |
| """ | |
| Calculate a composite quality score (0-100, higher is better). | |
| Considers distance efficiency, balance, and utilization. | |
| """ | |
| # Normalize components (lower CV and higher utilization are better) | |
| dist_score = max(0, 100 - (dist_balance * 100)) # Lower CV = higher score | |
| load_score = max(0, 100 - (load_balance * 100)) # Lower CV = higher score | |
| util_score = capacity_util * 100 # Higher utilization = higher score | |
| # Weighted average | |
| quality_score = (dist_score * 0.3 + load_score * 0.3 + util_score * 0.4) | |
| return round(quality_score, 1) | |
| def compare_algorithms(df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float) -> Dict: | |
| """ | |
| Run all algorithms on the same instance and compare results. | |
| """ | |
| algorithms = ['greedy', 'clarke_wright', 'genetic'] | |
| results = {} | |
| for alg in algorithms: | |
| try: | |
| result = solve_vrp(df, depot, n_vehicles, capacity, algorithm=alg) | |
| results[alg] = { | |
| 'total_distance': result['metrics']['total_distance'], | |
| 'vehicles_used': result['metrics']['vehicles_used'], | |
| 'execution_time_ms': result['performance']['total_execution_time_ms'], | |
| 'distance_balance_cv': result['metrics']['distance_balance_cv'], | |
| 'load_balance_cv': result['metrics']['load_balance_cv'], | |
| 'capacity_utilization': result['metrics']['capacity_utilization'], | |
| 'quality_score': result['metrics']['solution_quality_score'], | |
| 'clients_per_second': result['performance']['clients_per_second'], | |
| } | |
| except Exception as e: | |
| results[alg] = {'error': str(e)} | |
| # Find best performer in each category | |
| comparison = { | |
| 'results': results, | |
| 'best_distance': min(results.keys(), key=lambda k: results[k].get('total_distance', float('inf')) if 'error' not in results[k] else float('inf')), | |
| 'fastest': min(results.keys(), key=lambda k: results[k].get('execution_time_ms', float('inf')) if 'error' not in results[k] else float('inf')), | |
| 'best_balance': min(results.keys(), key=lambda k: (results[k].get('distance_balance_cv', float('inf')) + results[k].get('load_balance_cv', float('inf'))) if 'error' not in results[k] else float('inf')), | |
| 'best_quality': max(results.keys(), key=lambda k: results[k].get('quality_score', 0) if 'error' not in results[k] else 0), | |
| } | |
| return comparison | |
| def create_performance_chart(comparison_data: Dict) -> Image.Image: | |
| """ | |
| Create a performance comparison chart. | |
| """ | |
| algorithms = list(comparison_data['results'].keys()) | |
| valid_algs = [alg for alg in algorithms if 'error' not in comparison_data['results'][alg]] | |
| if not valid_algs: | |
| # Create empty chart if no valid results | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| ax.text(0.5, 0.5, 'No valid results to display', ha='center', va='center', transform=ax.transAxes) | |
| ax.set_title('Algorithm Comparison - No Data') | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format='png', bbox_inches='tight') | |
| plt.close(fig) | |
| buf.seek(0) | |
| return Image.open(buf) | |
| # Create subplots for different metrics | |
| fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10)) | |
| # Distance comparison | |
| distances = [comparison_data['results'][alg]['total_distance'] for alg in valid_algs] | |
| bars1 = ax1.bar(valid_algs, distances, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) | |
| ax1.set_title('Total Distance') | |
| ax1.set_ylabel('Distance') | |
| best_dist_alg = comparison_data['best_distance'] | |
| if best_dist_alg in valid_algs: | |
| bars1[valid_algs.index(best_dist_alg)].set_color('#d62728') | |
| # Execution time comparison | |
| times = [comparison_data['results'][alg]['execution_time_ms'] for alg in valid_algs] | |
| bars2 = ax2.bar(valid_algs, times, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) | |
| ax2.set_title('Execution Time') | |
| ax2.set_ylabel('Time (ms)') | |
| fastest_alg = comparison_data['fastest'] | |
| if fastest_alg in valid_algs: | |
| bars2[valid_algs.index(fastest_alg)].set_color('#d62728') | |
| # Quality score comparison | |
| quality_scores = [comparison_data['results'][alg]['quality_score'] for alg in valid_algs] | |
| bars3 = ax3.bar(valid_algs, quality_scores, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) | |
| ax3.set_title('Solution Quality Score') | |
| ax3.set_ylabel('Score (0-100)') | |
| best_quality_alg = comparison_data['best_quality'] | |
| if best_quality_alg in valid_algs: | |
| bars3[valid_algs.index(best_quality_alg)].set_color('#d62728') | |
| # Capacity utilization comparison | |
| utilizations = [comparison_data['results'][alg]['capacity_utilization'] for alg in valid_algs] | |
| bars4 = ax4.bar(valid_algs, utilizations, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) | |
| ax4.set_title('Capacity Utilization') | |
| ax4.set_ylabel('Utilization (%)') | |
| plt.tight_layout() | |
| # Convert to PIL Image | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format='png', bbox_inches='tight', dpi=100) | |
| plt.close(fig) | |
| buf.seek(0) | |
| return Image.open(buf) | |
| # Route construction + 2-opt | |
| def nearest_neighbor_route( | |
| pts: List[Tuple[float, float]], | |
| start_idx: int = 0, | |
| ) -> List[int]: | |
| n = len(pts) | |
| unvisited = set(range(n)) | |
| route = [start_idx] | |
| unvisited.remove(start_idx) | |
| while unvisited: | |
| last = route[-1] | |
| nxt = min(unvisited, key=lambda j: euclid(pts[last], pts[j])) | |
| route.append(nxt) | |
| unvisited.remove(nxt) | |
| return route | |
| def two_opt(route: List[int], pts: List[Tuple[float, float]], max_iter=200) -> List[int]: | |
| best = route[:] | |
| best_len = total_distance([pts[i] for i in best]) | |
| n = len(route) | |
| improved = True | |
| it = 0 | |
| while improved and it < max_iter: | |
| improved = False | |
| it += 1 | |
| for i in range(1, n - 2): | |
| for k in range(i + 1, n - 1): | |
| new_route = best[:i] + best[i:k + 1][::-1] + best[k + 1:] | |
| new_len = total_distance([pts[i] for i in new_route]) | |
| if new_len + 1e-9 < best_len: | |
| best, best_len = new_route, new_len | |
| improved = True | |
| if improved is False: | |
| break | |
| return best | |
| def build_route_for_cluster( | |
| df: pd.DataFrame, | |
| idxs: List[int], | |
| depot: Tuple[float, float], | |
| ) -> List[int]: | |
| """ | |
| Build a TSP tour over cluster points and return client indices in visiting order. | |
| Returns client indices (not including the depot) but representing the order. | |
| """ | |
| # Local point list: depot at 0, then cluster in order | |
| pts = [depot] + [(float(df.loc[i, "x"]), float(df.loc[i, "y"])) for i in idxs] | |
| # Greedy tour over all nodes | |
| rr = nearest_neighbor_route(pts, start_idx=0) | |
| # Ensure route starts at 0 and ends at 0 conceptually; we'll remove the 0s later | |
| # Optimize with 2-opt, but keep depot fixed by converting to a path that starts at 0 | |
| rr = two_opt(rr, pts) | |
| # remove the depot index 0 from the sequence (keep order of clients) | |
| order = [idxs[i - 1] for i in rr if i != 0] | |
| return order | |
| # Solve wrapper | |
| # --------------------------- | |
| def solve_vrp( | |
| df: pd.DataFrame, | |
| depot: Tuple[float, float] = (0.0, 0.0), | |
| n_vehicles: int = 4, | |
| capacity: float = 10, | |
| speed: float = 1.0, | |
| algorithm: str = 'sweep', | |
| ) -> Dict: | |
| """ | |
| Solve VRP using specified algorithm with performance tracking. | |
| Args: | |
| df: Client data | |
| depot: Depot coordinates | |
| n_vehicles: Number of vehicles | |
| capacity: Vehicle capacity | |
| speed: Travel speed for time calculations | |
| algorithm: Clustering algorithm ('greedy', 'clarke_wright', 'genetic') | |
| Returns: | |
| { | |
| 'routes': List[List[int]] (row indices of df), | |
| 'total_distance': float, | |
| 'per_route_distance': List[float], | |
| 'assignments_table': pd.DataFrame, | |
| 'metrics': dict, | |
| 'performance': dict | |
| } | |
| """ | |
| start_time = time.time() | |
| # Validate instance | |
| validate_instance(df, n_vehicles, capacity) | |
| validation_time = time.time() - start_time | |
| # 1) cluster using selected algorithm | |
| clustering_start = time.time() | |
| if algorithm == 'greedy': | |
| clusters = greedy_nearest_neighbor(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity) | |
| elif algorithm == 'clarke_wright': | |
| clusters = clarke_wright_savings(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity) | |
| elif algorithm == 'genetic': | |
| clusters = genetic_algorithm_vrp(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity) | |
| else: | |
| raise ValueError(f"Unknown algorithm: {algorithm}. Use 'greedy', 'clarke_wright', or 'genetic'") | |
| clustering_time = time.time() - clustering_start | |
| # 2) route per cluster | |
| routing_start = time.time() | |
| routes: List[List[int]] = [] | |
| per_route_dist: List[float] = [] | |
| soft_tw_violations = 0 | |
| per_route_loads: List[float] = [] | |
| for cl in clusters: | |
| if len(cl) == 0: | |
| routes.append([]) | |
| per_route_dist.append(0.0) | |
| per_route_loads.append(0.0) | |
| continue | |
| order = build_route_for_cluster(df, cl, depot) | |
| routes.append(order) | |
| # compute distance with depot as start/end | |
| pts = [depot] + [(df.loc[i, "x"], df.loc[i, "y"]) for i in order] + [depot] | |
| dist = total_distance(pts) | |
| per_route_dist.append(dist) | |
| # capacity + soft TW check | |
| load = float(df.loc[order, "demand"].sum()) if len(order) else 0.0 | |
| per_route_loads.append(load) | |
| # simple arrival time simulation (speed distance units per time) | |
| t = 0.0 | |
| prev = depot | |
| for i in order: | |
| cur = (df.loc[i, "x"], df.loc[i, "y"]) | |
| t += euclid(prev, cur) / max(speed, 1e-9) | |
| tw_s = float(df.loc[i, "tw_start"]) | |
| tw_e = float(df.loc[i, "tw_end"]) | |
| if t < tw_s: | |
| t = tw_s # wait | |
| if t > tw_e: | |
| soft_tw_violations += 1 | |
| t += float(df.loc[i, "service"]) | |
| prev = cur | |
| # back to depot time is irrelevant for TW in this simple model | |
| routing_time = time.time() - routing_start | |
| total_dist = float(sum(per_route_dist)) | |
| # Build assignment table | |
| rows = [] | |
| for v, route in enumerate(routes): | |
| for seq, idx in enumerate(route, start=1): | |
| rows.append( | |
| { | |
| "vehicle": v + 1, | |
| "sequence": seq, | |
| "id": df.loc[idx, "id"], | |
| "x": float(df.loc[idx, "x"]), | |
| "y": float(df.loc[idx, "y"]), | |
| "demand": float(df.loc[idx, "demand"]), | |
| } | |
| ) | |
| assign_df = pd.DataFrame(rows).sort_values(["vehicle", "sequence"]).reset_index(drop=True) | |
| # Enhanced metrics | |
| active_routes = [d for d in per_route_dist if d > 0] | |
| active_loads = [l for l in per_route_loads if l > 0] | |
| # Load balancing: coefficient of variation | |
| load_balance = 0.0 | |
| if len(active_loads) > 1: | |
| load_std = np.std(active_loads) | |
| load_mean = np.mean(active_loads) | |
| load_balance = load_std / load_mean if load_mean > 0 else 0.0 | |
| # Distance balancing | |
| dist_balance = 0.0 | |
| if len(active_routes) > 1: | |
| dist_std = np.std(active_routes) | |
| dist_mean = np.mean(active_routes) | |
| dist_balance = dist_std / dist_mean if dist_mean > 0 else 0.0 | |
| # Capacity utilization | |
| total_capacity_used = sum(active_loads) | |
| total_capacity_available = capacity * len(active_loads) | |
| capacity_utilization = total_capacity_used / total_capacity_available if total_capacity_available > 0 else 0.0 | |
| total_time = time.time() - start_time | |
| # Performance metrics | |
| performance = { | |
| "total_execution_time_ms": round(total_time * 1000, 2), | |
| "validation_time_ms": round(validation_time * 1000, 2), | |
| "clustering_time_ms": round(clustering_time * 1000, 2), | |
| "routing_time_ms": round(routing_time * 1000, 2), | |
| "clients_per_second": round(len(df) / total_time, 1) if total_time > 0 else 0, | |
| "algorithm_complexity": get_algorithm_complexity(algorithm, len(df)), | |
| } | |
| metrics = { | |
| "algorithm": algorithm, | |
| "vehicles_used": int(sum(1 for r in routes if len(r) > 0)), | |
| "vehicles_available": n_vehicles, | |
| "total_distance": round(total_dist, 3), | |
| "avg_distance_per_vehicle": round(np.mean(active_routes), 3) if active_routes else 0.0, | |
| "max_distance": round(max(active_routes), 3) if active_routes else 0.0, | |
| "min_distance": round(min(active_routes), 3) if active_routes else 0.0, | |
| "distance_balance_cv": round(dist_balance, 3), | |
| "per_route_distance": [round(d, 3) for d in per_route_dist], | |
| "per_route_load": [round(l, 3) for l in per_route_loads], | |
| "avg_load_per_vehicle": round(np.mean(active_loads), 3) if active_loads else 0.0, | |
| "load_balance_cv": round(load_balance, 3), | |
| "capacity_utilization": round(capacity_utilization * 100, 1), | |
| "capacity": capacity, | |
| "soft_time_window_violations": int(soft_tw_violations), | |
| "total_clients": len(df), | |
| "solution_quality_score": calculate_solution_quality_score(total_dist, dist_balance, load_balance, capacity_utilization), | |
| "note": f"Heuristic solution ({algorithm} → greedy → 2-opt). TW are soft. Lower CV = better balance.", | |
| } | |
| return { | |
| "routes": routes, | |
| "total_distance": total_dist, | |
| "per_route_distance": per_route_dist, | |
| "assignments_table": assign_df, | |
| "metrics": metrics, | |
| "performance": performance, | |
| } | |
| # --------------------------- | |
| # Visualization | |
| # --------------------------- | |
| def plot_solution( | |
| df: pd.DataFrame, | |
| sol: Dict, | |
| depot: Tuple[float, float] = (0.0, 0.0), | |
| ): | |
| routes = sol["routes"] | |
| fig, ax = plt.subplots(figsize=(7.5, 6.5)) | |
| ax.scatter([depot[0]], [depot[1]], s=120, marker="s", label="Depot", zorder=5) | |
| # color cycle | |
| colors = plt.rcParams["axes.prop_cycle"].by_key().get( | |
| "color", ["C0","C1","C2","C3","C4","C5"] | |
| ) | |
| for v, route in enumerate(routes): | |
| if not route: | |
| continue | |
| c = colors[v % len(colors)] | |
| xs = [depot[0]] + [float(df.loc[i, "x"]) for i in route] + [depot[0]] | |
| ys = [depot[1]] + [float(df.loc[i, "y"]) for i in route] + [depot[1]] | |
| ax.plot(xs, ys, "-", lw=2, color=c, alpha=0.9, label=f"Vehicle {v+1}") | |
| ax.scatter(xs[1:-1], ys[1:-1], s=36, color=c, zorder=4) | |
| # label sequence numbers lightly | |
| for k, idx in enumerate(route, start=1): | |
| ax.text( | |
| float(df.loc[idx, "x"]), | |
| float(df.loc[idx, "y"]), | |
| str(k), | |
| fontsize=8, | |
| ha="center", | |
| va="center", | |
| color="white", | |
| bbox=dict(boxstyle="circle,pad=0.2", fc=c, ec="none", alpha=0.7), | |
| ) | |
| ax.set_title("Ride-Sharing / CVRP Routes (Heuristic)") | |
| ax.set_xlabel("X") | |
| ax.set_ylabel("Y") | |
| ax.grid(True, alpha=0.25) | |
| ax.legend(loc="best", fontsize=8, framealpha=0.9) | |
| ax.set_aspect("equal", adjustable="box") | |
| # ✅ Convert Matplotlib figure → PIL.Image for Gradio | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png", bbox_inches="tight") | |
| plt.close(fig) | |
| buf.seek(0) | |
| return Image.open(buf) | |