| import os |
| import math |
| import logging |
| import json |
| import numpy as np |
| import pandas as pd |
| from werkzeug.utils import secure_filename |
| from werkzeug.exceptions import RequestEntityTooLarge |
| from flask import Flask, render_template, request, jsonify, send_file |
| import io |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| app = Flask(__name__) |
| app.secret_key = os.urandom(24) |
|
|
| |
| app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 |
| ALLOWED_EXTENSIONS = {'json', 'xlsx', 'xls', 'csv'} |
|
|
| |
|
|
| def allowed_file(filename): |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
| def has_null_byte(content): |
| |
| if isinstance(content, bytes): |
| return b'\0' in content |
| return False |
|
|
| |
|
|
| def haversine_distance(coord1, coord2): |
| """ |
| Calculate the great circle distance between two points |
| on the earth (specified in decimal degrees) |
| """ |
| lat1, lon1 = coord1 |
| lat2, lon2 = coord2 |
| |
| R = 6371 |
| |
| phi1 = math.radians(lat1) |
| phi2 = math.radians(lat2) |
| delta_phi = math.radians(lat2 - lat1) |
| delta_lambda = math.radians(lon2 - lon1) |
| |
| a = math.sin(delta_phi / 2.0)**2 + \ |
| math.cos(phi1) * math.cos(phi2) * \ |
| math.sin(delta_lambda / 2.0)**2 |
| |
| c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) |
| |
| return R * c |
|
|
| def calculate_distance_matrix(locations): |
| """ |
| Compute N x N distance matrix. |
| locations: list of dicts with 'lat', 'lng' |
| """ |
| n = len(locations) |
| matrix = np.zeros((n, n)) |
| coords = [(loc['lat'], loc['lng']) for loc in locations] |
| |
| for i in range(n): |
| for j in range(i + 1, n): |
| dist = haversine_distance(coords[i], coords[j]) |
| matrix[i][j] = dist |
| matrix[j][i] = dist |
| return matrix |
|
|
| def solve_tsp_nearest_neighbor(dist_matrix, start_index=0): |
| """ |
| Greedy Nearest Neighbor construction. |
| """ |
| n = len(dist_matrix) |
| unvisited = set(range(n)) |
| unvisited.remove(start_index) |
| route = [start_index] |
| current = start_index |
| |
| while unvisited: |
| nearest = min(unvisited, key=lambda x: dist_matrix[current][x]) |
| route.append(nearest) |
| unvisited.remove(nearest) |
| current = nearest |
| |
| |
| |
| route.append(start_index) |
| return route |
|
|
| def two_opt_improvement(route, dist_matrix): |
| """ |
| 2-Opt local search optimization. |
| """ |
| best_route = route |
| improved = True |
| n = len(route) |
| |
| |
| def get_route_distance(r): |
| d = 0 |
| for i in range(len(r) - 1): |
| d += dist_matrix[r[i]][r[i+1]] |
| return d |
| |
| best_distance = get_route_distance(route) |
| |
| while improved: |
| improved = False |
| |
| |
| for i in range(1, n - 2): |
| for j in range(i + 1, n - 1): |
| if j - i == 1: continue |
| |
| |
| new_route = best_route[:] |
| new_route[i:j+1] = best_route[i:j+1][::-1] |
| |
| new_distance = get_route_distance(new_route) |
| |
| if new_distance < best_distance: |
| best_distance = new_distance |
| best_route = new_route |
| improved = True |
| |
| |
| |
| |
| return best_route, best_distance |
|
|
| |
|
|
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/health') |
| def health(): |
| return jsonify({"status": "healthy"}), 200 |
|
|
| @app.route('/api/optimize', methods=['POST']) |
| def optimize_route(): |
| try: |
| data = request.json |
| locations = data.get('locations', []) |
| |
| if not locations or len(locations) < 2: |
| return jsonify({"error": "Need at least 2 locations (Depot + 1 Stop)"}), 400 |
| |
| if len(locations) > 100: |
| return jsonify({"error": "Max 100 stops allowed for demo performance"}), 400 |
|
|
| |
| matrix = calculate_distance_matrix(locations) |
| |
| |
| initial_route_indices = solve_tsp_nearest_neighbor(matrix, start_index=0) |
| |
| |
| optimized_indices, total_distance = two_opt_improvement(initial_route_indices, matrix) |
| |
| |
| |
| optimized_path = [] |
| for idx in optimized_indices: |
| loc = locations[idx].copy() |
| |
| if idx == 0: |
| loc['type'] = 'depot' |
| else: |
| loc['type'] = 'stop' |
| optimized_path.append(loc) |
| |
| return jsonify({ |
| "status": "success", |
| "total_distance_km": round(total_distance, 2), |
| "stops_count": len(locations), |
| "optimized_path": optimized_path, |
| "original_indices": optimized_indices |
| }) |
| |
| except Exception as e: |
| logger.error(f"Optimization error: {e}") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route('/api/upload', methods=['POST']) |
| def upload_file(): |
| try: |
| if 'file' not in request.files: |
| return jsonify({"error": "No file part"}), 400 |
| |
| file = request.files['file'] |
| |
| if file.filename == '': |
| return jsonify({"error": "No selected file"}), 400 |
| |
| if file and allowed_file(file.filename): |
| filename = secure_filename(file.filename) |
| |
| |
| if '\0' in filename: |
| return jsonify({"error": "Invalid filename (null byte detected)"}), 400 |
|
|
| try: |
| |
| if filename.endswith('.json'): |
| data = json.load(file) |
| locations = data if isinstance(data, list) else data.get('locations', []) |
| elif filename.endswith(('.xlsx', '.xls')): |
| df = pd.read_excel(file) |
| |
| if 'lat' not in df.columns or 'lng' not in df.columns: |
| return jsonify({"error": "Excel must have 'lat' and 'lng' columns"}), 400 |
| locations = df.to_dict(orient='records') |
| elif filename.endswith('.csv'): |
| df = pd.read_csv(file) |
| if 'lat' not in df.columns or 'lng' not in df.columns: |
| return jsonify({"error": "CSV must have 'lat' and 'lng' columns"}), 400 |
| locations = df.to_dict(orient='records') |
| else: |
| return jsonify({"error": "Unsupported file format"}), 400 |
| |
| |
| cleaned_locations = [] |
| for idx, loc in enumerate(locations): |
| try: |
| cleaned_locations.append({ |
| "id": loc.get('id', idx), |
| "lat": float(loc['lat']), |
| "lng": float(loc['lng']), |
| "type": loc.get('type', 'stop') |
| }) |
| except (ValueError, TypeError): |
| continue |
| |
| if not cleaned_locations: |
| return jsonify({"error": "No valid locations found in file"}), 400 |
|
|
| return jsonify({ |
| "status": "success", |
| "locations": cleaned_locations, |
| "message": f"Successfully loaded {len(cleaned_locations)} locations" |
| }) |
|
|
| except Exception as e: |
| logger.error(f"File processing error: {e}") |
| return jsonify({"error": f"Failed to process file: {str(e)}"}), 500 |
| |
| return jsonify({"error": "File type not allowed"}), 400 |
|
|
| except RequestEntityTooLarge: |
| return jsonify({"error": "File too large (Max 50MB)"}), 413 |
| except Exception as e: |
| logger.error(f"Upload error: {e}") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.errorhandler(413) |
| def request_entity_too_large(error): |
| return jsonify({"error": "File too large (Max 50MB)"}), 413 |
|
|
| @app.errorhandler(500) |
| def internal_error(error): |
| return jsonify({"error": "Internal Server Error"}), 500 |
|
|
| @app.errorhandler(404) |
| def not_found(error): |
| return jsonify({"error": "Resource Not Found"}), 404 |
|
|
| if __name__ == '__main__': |
| port = int(os.environ.get('PORT', 7860)) |
| app.run(host='0.0.0.0', port=port) |
|
|