| import math |
| import random |
| from typing import Dict, List, Tuple |
|
|
| import matplotlib.pyplot as plt |
| import numpy as np |
| import pandas as pd |
|
|
|
|
| |
| |
| |
|
|
| def make_template_dataframe(): |
| """Blank template users can download/fill.""" |
| return pd.DataFrame( |
| { |
| "id": ["A", "B", "C"], |
| "x": [10, -5, 15], |
| "y": [4, -12, 8], |
| "demand": [1, 2, 1], |
| "tw_start": [0, 0, 0], |
| "tw_end": [9999, 9999, 9999], |
| "service": [0, 0, 0], |
| } |
| ) |
|
|
| def parse_uploaded_csv(file) -> pd.DataFrame: |
| df = pd.read_csv(file.name if hasattr(file, "name") else file) |
| required = {"id", "x", "y", "demand"} |
| missing = required - set(df.columns) |
| if missing: |
| raise ValueError(f"Missing required columns: {sorted(missing)}") |
|
|
| |
| if "tw_start" not in df.columns: |
| df["tw_start"] = 0 |
| if "tw_end" not in df.columns: |
| df["tw_end"] = 999999 |
| if "service" not in df.columns: |
| df["service"] = 0 |
|
|
| |
| df["id"] = df["id"].astype(str) |
| for col in ["x", "y", "demand", "tw_start", "tw_end", "service"]: |
| df[col] = pd.to_numeric(df[col], errors="coerce") |
| df = df.dropna() |
| df.reset_index(drop=True, inplace=True) |
| return df |
|
|
| def generate_random_instance( |
| n_clients=30, |
| n_vehicles=4, |
| capacity=10, |
| spread=50, |
| demand_min=1, |
| demand_max=3, |
| seed=42, |
| ) -> pd.DataFrame: |
| rng = np.random.default_rng(seed) |
| xs = rng.uniform(-spread, spread, size=n_clients) |
| ys = rng.uniform(-spread, spread, size=n_clients) |
| demands = rng.integers(demand_min, demand_max + 1, size=n_clients) |
|
|
| df = pd.DataFrame( |
| { |
| "id": [f"C{i+1}" for i in range(n_clients)], |
| "x": xs, |
| "y": ys, |
| "demand": demands, |
| "tw_start": np.zeros(n_clients, dtype=float), |
| "tw_end": np.full(n_clients, 999999.0), |
| "service": np.zeros(n_clients, dtype=float), |
| } |
| ) |
| return df |
|
|
|
|
| |
| |
| |
|
|
| def euclid(a: Tuple[float, float], b: Tuple[float, float]) -> float: |
| return float(math.hypot(a[0] - b[0], a[1] - b[1])) |
|
|
| def total_distance(points: List[Tuple[float, float]]) -> float: |
| return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1)) |
|
|
|
|
| |
| |
| |
|
|
| def sweep_clusters( |
| df: pd.DataFrame, |
| depot: Tuple[float, float], |
| n_vehicles: int, |
| capacity: float, |
| ) -> List[List[int]]: |
| """ |
| Assign clients to vehicles by angular sweep around the depot, roughly balancing |
| capacity (sum of 'demand'). |
| Returns indices (row numbers) per cluster. |
| """ |
| dx = df["x"].values - depot[0] |
| dy = df["y"].values - depot[1] |
| ang = np.arctan2(dy, dx) |
| order = np.argsort(ang) |
|
|
| clusters: List[List[int]] = [[] for _ in range(n_vehicles)] |
| loads = [0.0] * n_vehicles |
| v = 0 |
| for idx in order: |
| d = float(df.loc[idx, "demand"]) |
| |
| if loads[v] + d > capacity and v < n_vehicles - 1: |
| v += 1 |
| clusters[v].append(int(idx)) |
| loads[v] += d |
|
|
| return clusters |
|
|
|
|
| |
| |
| |
|
|
| def nearest_neighbor_route( |
| pts: List[Tuple[float, float]], |
| start_idx: int = 0, |
| ) -> List[int]: |
| n = len(pts) |
| unvisited = set(range(n)) |
| route = [start_idx] |
| unvisited.remove(start_idx) |
| while unvisited: |
| last = route[-1] |
| nxt = min(unvisited, key=lambda j: euclid(pts[last], pts[j])) |
| route.append(nxt) |
| unvisited.remove(nxt) |
| return route |
|
|
| def two_opt(route: List[int], pts: List[Tuple[float, float]], max_iter=200) -> List[int]: |
| best = route[:] |
| best_len = total_distance([pts[i] for i in best]) |
| n = len(route) |
| improved = True |
| it = 0 |
| while improved and it < max_iter: |
| improved = False |
| it += 1 |
| for i in range(1, n - 2): |
| for k in range(i + 1, n - 1): |
| new_route = best[:i] + best[i:k + 1][::-1] + best[k + 1:] |
| new_len = total_distance([pts[i] for i in new_route]) |
| if new_len + 1e-9 < best_len: |
| best, best_len = new_route, new_len |
| improved = True |
| if improved is False: |
| break |
| return best |
|
|
| def build_route_for_cluster( |
| df: pd.DataFrame, |
| idxs: List[int], |
| depot: Tuple[float, float], |
| ) -> List[int]: |
| """ |
| Build a TSP tour over cluster points and return client indices in visiting order. |
| Returns client indices (not including the depot) but representing the order. |
| """ |
| |
| pts = [depot] + [(float(df.loc[i, "x"]), float(df.loc[i, "y"])) for i in idxs] |
| |
| rr = nearest_neighbor_route(pts, start_idx=0) |
| |
| |
| rr = two_opt(rr, pts) |
| |
| order = [idxs[i - 1] for i in rr if i != 0] |
| return order |
|
|
|
|
| |
| |
| |
|
|
| def solve_vrp( |
| df: pd.DataFrame, |
| depot: Tuple[float, float] = (0.0, 0.0), |
| n_vehicles: int = 4, |
| capacity: float = 10, |
| speed: float = 1.0, |
| ) -> Dict: |
| """ |
| Returns: |
| { |
| 'routes': List[List[int]] (row indices of df), |
| 'total_distance': float, |
| 'per_route_distance': List[float], |
| 'assignments_table': pd.DataFrame, |
| 'metrics': dict |
| } |
| """ |
| |
| clusters = sweep_clusters(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity) |
|
|
| |
| routes: List[List[int]] = [] |
| per_route_dist: List[float] = [] |
| soft_tw_violations = 0 |
| per_route_loads: List[float] = [] |
|
|
| for cl in clusters: |
| if len(cl) == 0: |
| routes.append([]) |
| per_route_dist.append(0.0) |
| per_route_loads.append(0.0) |
| continue |
| order = build_route_for_cluster(df, cl, depot) |
| routes.append(order) |
|
|
| |
| pts = [depot] + [(df.loc[i, "x"], df.loc[i, "y"]) for i in order] + [depot] |
| dist = total_distance(pts) |
| per_route_dist.append(dist) |
|
|
| |
| load = float(df.loc[order, "demand"].sum()) if len(order) else 0.0 |
| per_route_loads.append(load) |
|
|
| |
| t = 0.0 |
| prev = depot |
| for i in order: |
| cur = (df.loc[i, "x"], df.loc[i, "y"]) |
| t += euclid(prev, cur) / max(speed, 1e-9) |
| tw_s = float(df.loc[i, "tw_start"]) |
| tw_e = float(df.loc[i, "tw_end"]) |
| if t < tw_s: |
| t = tw_s |
| if t > tw_e: |
| soft_tw_violations += 1 |
| t += float(df.loc[i, "service"]) |
| prev = cur |
| |
|
|
| total_dist = float(sum(per_route_dist)) |
|
|
| |
| rows = [] |
| for v, route in enumerate(routes): |
| for seq, idx in enumerate(route, start=1): |
| rows.append( |
| { |
| "vehicle": v + 1, |
| "sequence": seq, |
| "id": df.loc[idx, "id"], |
| "x": float(df.loc[idx, "x"]), |
| "y": float(df.loc[idx, "y"]), |
| "demand": float(df.loc[idx, "demand"]), |
| } |
| ) |
| assign_df = pd.DataFrame(rows).sort_values(["vehicle", "sequence"]).reset_index(drop=True) |
|
|
| metrics = { |
| "vehicles_used": int(sum(1 for r in routes if len(r) > 0)), |
| "total_distance": round(total_dist, 3), |
| "per_route_distance": [round(d, 3) for d in per_route_dist], |
| "per_route_load": per_route_loads, |
| "capacity": capacity, |
| "soft_time_window_violations": int(soft_tw_violations), |
| "note": "Heuristic solution (sweep → greedy → 2-opt). TW are soft (informational).", |
| } |
|
|
| return { |
| "routes": routes, |
| "total_distance": total_dist, |
| "per_route_distance": per_route_dist, |
| "assignments_table": assign_df, |
| "metrics": metrics, |
| } |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| from PIL import Image |
| import io |
|
|
| def plot_solution( |
| df: pd.DataFrame, |
| sol: Dict, |
| depot: Tuple[float, float] = (0.0, 0.0), |
| ): |
| routes = sol["routes"] |
|
|
| fig, ax = plt.subplots(figsize=(7.5, 6.5)) |
| ax.scatter([depot[0]], [depot[1]], s=120, marker="s", label="Depot", zorder=5) |
|
|
| |
| colors = plt.rcParams["axes.prop_cycle"].by_key().get( |
| "color", ["C0","C1","C2","C3","C4","C5"] |
| ) |
|
|
| for v, route in enumerate(routes): |
| if not route: |
| continue |
| c = colors[v % len(colors)] |
| xs = [depot[0]] + [float(df.loc[i, "x"]) for i in route] + [depot[0]] |
| ys = [depot[1]] + [float(df.loc[i, "y"]) for i in route] + [depot[1]] |
| ax.plot(xs, ys, "-", lw=2, color=c, alpha=0.9, label=f"Vehicle {v+1}") |
| ax.scatter(xs[1:-1], ys[1:-1], s=36, color=c, zorder=4) |
|
|
| |
| for k, idx in enumerate(route, start=1): |
| ax.text( |
| float(df.loc[idx, "x"]), |
| float(df.loc[idx, "y"]), |
| str(k), |
| fontsize=8, |
| ha="center", |
| va="center", |
| color="white", |
| bbox=dict(boxstyle="circle,pad=0.2", fc=c, ec="none", alpha=0.7), |
| ) |
|
|
| ax.set_title("Ride-Sharing / CVRP Routes (Heuristic)") |
| ax.set_xlabel("X") |
| ax.set_ylabel("Y") |
| ax.grid(True, alpha=0.25) |
| ax.legend(loc="best", fontsize=8, framealpha=0.9) |
| ax.set_aspect("equal", adjustable="box") |
|
|
| |
| buf = io.BytesIO() |
| fig.savefig(buf, format="png", bbox_inches="tight") |
| plt.close(fig) |
| buf.seek(0) |
| return Image.open(buf) |
|
|
|
|