Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.cluster import DBSCAN, KMeans | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import silhouette_score | |
| from scipy.spatial.distance import pdist, squareform | |
| import json | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class AdvancedGeoTrackAnalyzer: | |
| def __init__(self, data_path_or_df, sample_size=400000): | |
| """ | |
| Initialize the analyzer with data path or DataFrame | |
| Parameters: | |
| data_path_or_df: str or pandas.DataFrame - Path to CSV file or DataFrame | |
| sample_size: int - Maximum number of rows to use for training (default 400k) | |
| """ | |
| if isinstance(data_path_or_df, str): | |
| print(f"Loading data from {data_path_or_df}") | |
| self.df = pd.read_csv(data_path_or_df) | |
| else: | |
| self.df = data_path_or_df.copy() | |
| print(f"Original dataset size: {len(self.df):,} rows") | |
| print(f"Available columns: {list(self.df.columns)}") | |
| # Sample data if it's too large | |
| if len(self.df) > sample_size: | |
| print(f"Sampling {sample_size:,} rows from {len(self.df):,} total rows") | |
| self.df = self.df.sample(n=sample_size, random_state=42).reset_index(drop=True) | |
| print(f"Using sampled dataset of {len(self.df):,} rows") | |
| self.processed_df = None | |
| self.routes = None | |
| self.tight_places = None | |
| def preprocess_data(self): | |
| """Preprocess the geo-tracking data""" | |
| print("Preprocessing data...") | |
| # Make a copy for processing | |
| self.processed_df = self.df.copy() | |
| # Reset index to avoid ambiguity issues | |
| self.processed_df = self.processed_df.reset_index(drop=True) | |
| # Check for required columns | |
| required_cols = ['randomized_id', 'lat', 'lng'] | |
| missing_cols = [col for col in required_cols if col not in self.processed_df.columns] | |
| if missing_cols: | |
| raise ValueError(f"Missing required columns: {missing_cols}") | |
| # Check for optional columns | |
| has_speed = 'spd' in self.processed_df.columns | |
| has_azimuth = 'azm' in self.processed_df.columns | |
| print(f"Speed data available: {has_speed}") | |
| print(f"Azimuth data available: {has_azimuth}") | |
| # Sort by randomized_id for trajectory analysis | |
| self.processed_df = self.processed_df.sort_values(['randomized_id']).reset_index(drop=True) | |
| # Feature engineering | |
| print("Creating derived features...") | |
| # Group by randomized_id to calculate trajectory features | |
| grouped = self.processed_df.groupby('randomized_id') | |
| # Calculate distance between consecutive points in each trajectory | |
| def haversine_distance(lat1, lon1, lat2, lon2): | |
| """Calculate the great circle distance between two points on earth""" | |
| # Convert decimal degrees to radians | |
| lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) | |
| # Haversine formula | |
| dlat = lat2 - lat1 | |
| dlon = lon2 - lon1 | |
| a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2 | |
| c = 2 * np.arcsin(np.sqrt(a)) | |
| r = 6371 # Radius of earth in kilometers | |
| return c * r * 1000 # Convert to meters | |
| # Calculate distance between consecutive points | |
| lat_prev = grouped['lat'].shift(1) | |
| lng_prev = grouped['lng'].shift(1) | |
| self.processed_df['distance_to_prev'] = haversine_distance( | |
| lat_prev, lng_prev, | |
| self.processed_df['lat'], self.processed_df['lng'] | |
| ).fillna(0) | |
| # Speed-related features if speed data is available | |
| if has_speed: | |
| self.processed_df['speed_change'] = grouped['spd'].diff().fillna(0) | |
| else: | |
| # Estimate speed from distance (assuming 1 second intervals) | |
| self.processed_df['estimated_speed'] = self.processed_df['distance_to_prev'] * 3.6 # m/s to km/h | |
| self.processed_df['speed_change'] = grouped['estimated_speed'].diff().fillna(0) | |
| # Direction features if azimuth data is available | |
| if has_azimuth: | |
| self.processed_df['direction_change'] = grouped['azm'].diff().fillna(0) | |
| else: | |
| # Calculate bearing between consecutive points | |
| def calculate_bearing(lat1, lon1, lat2, lon2): | |
| lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) | |
| dlon = lon2 - lon1 | |
| y = np.sin(dlon) * np.cos(lat2) | |
| x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon) | |
| bearing = np.degrees(np.arctan2(y, x)) | |
| return (bearing + 360) % 360 | |
| bearing = calculate_bearing( | |
| lat_prev, lng_prev, | |
| self.processed_df['lat'], self.processed_df['lng'] | |
| ) | |
| self.processed_df['calculated_bearing'] = bearing | |
| self.processed_df['direction_change'] = grouped['calculated_bearing'].diff().fillna(0) | |
| # Remove rows with invalid coordinates | |
| self.processed_df = self.processed_df[ | |
| (self.processed_df['lat'].between(-90, 90)) & | |
| (self.processed_df['lng'].between(-180, 180)) | |
| ].reset_index(drop=True) | |
| print(f"Preprocessing complete. Final dataset: {len(self.processed_df):,} rows") | |
| def identify_popular_routes(self, eps_route=0.01, min_samples_route=5): | |
| """Identify popular routes by clustering start-end point pairs - Compatible with generate_report""" | |
| print("Identifying popular routes...") | |
| if self.processed_df is None: | |
| raise ValueError("Data must be preprocessed first") | |
| # Extract start and end points for each trajectory | |
| print("Extracting trajectory start and end points...") | |
| trajectory_summary = self.processed_df.groupby('randomized_id').agg({ | |
| 'lat': ['first', 'last', 'count'], | |
| 'lng': ['first', 'last'] | |
| }).reset_index() | |
| # Flatten column names | |
| trajectory_summary.columns = [ | |
| 'randomized_id', 'start_lat', 'end_lat', 'point_count', 'start_lng', 'end_lng' | |
| ] | |
| print(f"Total trajectories: {len(trajectory_summary)}") | |
| # Filter trajectories with minimum points (at least 3 points to be considered a route) | |
| valid_trajectories = trajectory_summary[trajectory_summary['point_count'] >= 3].copy() | |
| print(f"Trajectories with ≥3 points: {len(valid_trajectories)}") | |
| if len(valid_trajectories) == 0: | |
| print("No valid trajectories found") | |
| self.routes = {} | |
| return {} | |
| # Calculate route distances to filter out very short routes | |
| valid_trajectories['route_distance_deg'] = np.sqrt( | |
| (valid_trajectories['end_lat'] - valid_trajectories['start_lat'])**2 + | |
| (valid_trajectories['end_lng'] - valid_trajectories['start_lng'])**2 | |
| ) | |
| # Use a more lenient distance threshold | |
| distance_threshold = valid_trajectories['route_distance_deg'].quantile(0.1) # Bottom 10% | |
| print(f"Distance threshold: {distance_threshold:.6f} degrees") | |
| # Filter out very short routes | |
| meaningful_routes = valid_trajectories[ | |
| valid_trajectories['route_distance_deg'] > distance_threshold | |
| ].copy() | |
| print(f"Routes after distance filtering: {len(meaningful_routes)}") | |
| if len(meaningful_routes) < min_samples_route: | |
| print(f"Not enough meaningful routes ({len(meaningful_routes)}) for clustering (need at least {min_samples_route})") | |
| # Lower the minimum samples requirement | |
| min_samples_route = max(2, len(meaningful_routes) // 5) | |
| print(f"Adjusting min_samples_route to: {min_samples_route}") | |
| if len(meaningful_routes) < 2: | |
| print("Not enough routes for any clustering") | |
| self.routes = {} | |
| return {} | |
| # Create route vectors for clustering | |
| route_vectors = meaningful_routes[['start_lat', 'start_lng', 'end_lat', 'end_lng']].values | |
| print(f"Route vectors shape: {route_vectors.shape}") | |
| # Initialize routes dictionary | |
| self.routes = {} | |
| # Try multiple clustering approaches | |
| # Method 1: DBSCAN with geographic coordinates | |
| print("\nTrying DBSCAN clustering...") | |
| try: | |
| # Scale the coordinates | |
| scaler = StandardScaler() | |
| scaled_routes = scaler.fit_transform(route_vectors) | |
| # Try different eps values | |
| eps_values = [0.1, 0.2, 0.5, 1.0, 1.5, 2.0] | |
| best_eps = None | |
| best_clusters = None | |
| max_clusters = 0 | |
| for eps in eps_values: | |
| clustering = DBSCAN(eps=eps, min_samples=min_samples_route) | |
| cluster_labels = clustering.fit_predict(scaled_routes) | |
| n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0) | |
| n_noise = list(cluster_labels).count(-1) | |
| print(f" eps={eps}: {n_clusters} clusters, {n_noise} noise points") | |
| if n_clusters > max_clusters and n_clusters <= len(meaningful_routes) // 2: | |
| max_clusters = n_clusters | |
| best_eps = eps | |
| best_clusters = cluster_labels | |
| if best_clusters is not None and max_clusters > 0: | |
| print(f"Best DBSCAN result: eps={best_eps}, {max_clusters} clusters") | |
| unique_clusters = np.unique(best_clusters[best_clusters != -1]) | |
| for cluster_id in unique_clusters: | |
| cluster_mask = best_clusters == cluster_id | |
| cluster_routes = route_vectors[cluster_mask] | |
| cluster_trajectory_ids = meaningful_routes.loc[ | |
| meaningful_routes.index[cluster_mask], 'randomized_id' | |
| ].values | |
| # Calculate cluster statistics | |
| avg_start_lat = np.mean(cluster_routes[:, 0]) | |
| avg_start_lng = np.mean(cluster_routes[:, 1]) | |
| avg_end_lat = np.mean(cluster_routes[:, 2]) | |
| avg_end_lng = np.mean(cluster_routes[:, 3]) | |
| # Calculate average route length in METERS (for compatibility with generate_report) | |
| route_length_m = np.mean([ | |
| self.haversine_distance_m(route[0], route[1], route[2], route[3]) | |
| for route in cluster_routes | |
| ]) | |
| self.routes[f"dbscan_{cluster_id}"] = { | |
| 'route_count': len(cluster_routes), | |
| 'trajectory_ids': cluster_trajectory_ids.tolist(), | |
| 'avg_start_point': {'lat': avg_start_lat, 'lng': avg_start_lng}, | |
| 'avg_end_point': {'lat': avg_end_lat, 'lng': avg_end_lng}, | |
| 'avg_route_length_m': route_length_m, # In meters for compatibility | |
| 'popularity_score': len(cluster_routes) / len(meaningful_routes) * 100, | |
| 'method': 'DBSCAN' | |
| } | |
| except Exception as e: | |
| print(f"DBSCAN failed: {e}") | |
| # Method 2: KMeans clustering if DBSCAN didn't work well | |
| if len(self.routes) == 0: | |
| print("\nTrying KMeans clustering...") | |
| try: | |
| # Try different numbers of clusters | |
| max_k = min(10, len(meaningful_routes) // 3) | |
| if max_k >= 2: | |
| scaler = StandardScaler() | |
| scaled_routes = scaler.fit_transform(route_vectors) | |
| best_k = 2 | |
| best_score = -1 | |
| for k in range(2, max_k + 1): | |
| kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) | |
| cluster_labels = kmeans.fit_predict(scaled_routes) | |
| # Calculate silhouette score | |
| try: | |
| score = silhouette_score(scaled_routes, cluster_labels) | |
| print(f" k={k}: silhouette score = {score:.3f}") | |
| if score > best_score: | |
| best_score = score | |
| best_k = k | |
| except: | |
| continue | |
| # Use best k | |
| print(f"Using k={best_k} (best silhouette score: {best_score:.3f})") | |
| kmeans = KMeans(n_clusters=best_k, random_state=42, n_init=10) | |
| cluster_labels = kmeans.fit_predict(scaled_routes) | |
| for cluster_id in range(best_k): | |
| cluster_mask = cluster_labels == cluster_id | |
| cluster_routes = route_vectors[cluster_mask] | |
| cluster_trajectory_ids = meaningful_routes.loc[ | |
| meaningful_routes.index[cluster_mask], 'randomized_id' | |
| ].values | |
| if len(cluster_routes) >= 2: # At least 2 routes in cluster | |
| # Calculate cluster statistics | |
| avg_start_lat = np.mean(cluster_routes[:, 0]) | |
| avg_start_lng = np.mean(cluster_routes[:, 1]) | |
| avg_end_lat = np.mean(cluster_routes[:, 2]) | |
| avg_end_lng = np.mean(cluster_routes[:, 3]) | |
| # Calculate average route length in METERS | |
| route_length_m = np.mean([ | |
| self.haversine_distance_m(route[0], route[1], route[2], route[3]) | |
| for route in cluster_routes | |
| ]) | |
| self.routes[f"kmeans_{cluster_id}"] = { | |
| 'route_count': len(cluster_routes), | |
| 'trajectory_ids': cluster_trajectory_ids.tolist(), | |
| 'avg_start_point': {'lat': avg_start_lat, 'lng': avg_start_lng}, | |
| 'avg_end_point': {'lat': avg_end_lat, 'lng': avg_end_lng}, | |
| 'avg_route_length_m': route_length_m, # In meters for compatibility | |
| 'popularity_score': len(cluster_routes) / len(meaningful_routes) * 100, | |
| 'method': 'KMeans' | |
| } | |
| except Exception as e: | |
| print(f"KMeans failed: {e}") | |
| # Method 3: Simple grid-based clustering if both fail | |
| if len(self.routes) == 0: | |
| print("\nTrying grid-based clustering...") | |
| try: | |
| # Create a simple grid-based approach | |
| lat_bins = 20 | |
| lng_bins = 20 | |
| # Create bins for start and end points | |
| start_lat_bins = pd.cut(meaningful_routes['start_lat'], bins=lat_bins, labels=False) | |
| start_lng_bins = pd.cut(meaningful_routes['start_lng'], bins=lng_bins, labels=False) | |
| end_lat_bins = pd.cut(meaningful_routes['end_lat'], bins=lat_bins, labels=False) | |
| end_lng_bins = pd.cut(meaningful_routes['end_lng'], bins=lng_bins, labels=False) | |
| # Create route signatures | |
| meaningful_routes['route_signature'] = ( | |
| start_lat_bins.astype(str) + '_' + start_lng_bins.astype(str) + '_' + | |
| end_lat_bins.astype(str) + '_' + end_lng_bins.astype(str) | |
| ) | |
| # Count routes by signature | |
| signature_counts = meaningful_routes['route_signature'].value_counts() | |
| popular_signatures = signature_counts[signature_counts >= 2] # At least 2 routes | |
| print(f"Found {len(popular_signatures)} popular route patterns") | |
| for i, (signature, count) in enumerate(popular_signatures.head(10).items()): | |
| cluster_routes_df = meaningful_routes[meaningful_routes['route_signature'] == signature] | |
| # Calculate average route length in METERS | |
| route_length_m = np.mean([ | |
| self.haversine_distance_m(row['start_lat'], row['start_lng'], | |
| row['end_lat'], row['end_lng']) | |
| for _, row in cluster_routes_df.iterrows() | |
| ]) | |
| self.routes[f"grid_{i}"] = { | |
| 'route_count': count, | |
| 'trajectory_ids': cluster_routes_df['randomized_id'].tolist(), | |
| 'avg_start_point': { | |
| 'lat': cluster_routes_df['start_lat'].mean(), | |
| 'lng': cluster_routes_df['start_lng'].mean() | |
| }, | |
| 'avg_end_point': { | |
| 'lat': cluster_routes_df['end_lat'].mean(), | |
| 'lng': cluster_routes_df['end_lng'].mean() | |
| }, | |
| 'avg_route_length_m': route_length_m, # In meters for compatibility | |
| 'popularity_score': count / len(meaningful_routes) * 100, | |
| 'method': 'Grid-based' | |
| } | |
| except Exception as e: | |
| print(f"Grid-based clustering failed: {e}") | |
| # Sort routes by popularity | |
| if self.routes: | |
| self.routes = dict(sorted( | |
| self.routes.items(), | |
| key=lambda x: x[1]['route_count'], | |
| reverse=True | |
| )) | |
| print(f"\nSuccessfully identified {len(self.routes)} popular route clusters!") | |
| for route_id, route_info in list(self.routes.items())[:5]: | |
| print(f" {route_id}: {route_info['route_count']} trips ({route_info['popularity_score']:.1f}%)") | |
| else: | |
| print("No popular routes could be identified") | |
| self.routes = {} | |
| return self.routes | |
| def haversine_distance_m(self, lat1, lon1, lat2, lon2): | |
| """Calculate haversine distance in METERS (for compatibility with generate_report)""" | |
| # Convert decimal degrees to radians | |
| lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) | |
| # Haversine formula | |
| dlat = lat2 - lat1 | |
| dlon = lon2 - lon1 | |
| a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2 | |
| c = 2 * np.arcsin(np.sqrt(a)) | |
| r = 6371 # Radius of earth in kilometers | |
| return c * r * 1000 # Return in METERS | |
| def identify_tight_places(self, eps_tight=0.0005, min_samples_tight=50, density_threshold=0.8): | |
| """Identify tight places (congestion areas) based on point density and movement patterns""" | |
| print("Identifying tight places (congestion areas)...") | |
| if self.processed_df is None: | |
| raise ValueError("Data must be preprocessed first") | |
| # Use all GPS points for density analysis | |
| coords = self.processed_df[['lat', 'lng']].values | |
| # Apply DBSCAN clustering to find high-density areas | |
| clustering = DBSCAN(eps=eps_tight, min_samples=min_samples_tight) | |
| clusters = clustering.fit_predict(coords) | |
| # Add cluster labels to dataframe | |
| self.processed_df['density_cluster'] = clusters | |
| # Analyze each cluster to identify tight places | |
| unique_clusters = np.unique(clusters[clusters != -1]) | |
| self.tight_places = {} | |
| for cluster_id in unique_clusters: | |
| cluster_mask = clusters == cluster_id | |
| cluster_points = coords[cluster_mask] | |
| cluster_data = self.processed_df[self.processed_df['density_cluster'] == cluster_id] | |
| # Calculate density metrics | |
| cluster_area_km2 = self.calculate_cluster_area(cluster_points) | |
| point_density = len(cluster_points) / max(cluster_area_km2, 0.001) # points per km² | |
| # Calculate movement characteristics | |
| if 'spd' in cluster_data.columns: | |
| avg_speed = cluster_data['spd'].mean() | |
| speed_variance = cluster_data['spd'].var() | |
| else: | |
| avg_speed = cluster_data['estimated_speed'].mean() | |
| speed_variance = cluster_data['estimated_speed'].var() | |
| # Calculate how many unique vehicles pass through this area | |
| unique_vehicles = cluster_data['randomized_id'].nunique() | |
| # Calculate congestion indicators | |
| # Low speed + high density + many vehicles = congestion | |
| congestion_score = (point_density * unique_vehicles) / max(avg_speed, 1) | |
| # Identify as tight place if meets criteria | |
| is_tight_place = ( | |
| point_density > density_threshold * np.mean([ | |
| len(coords[clusters == c]) / max(self.calculate_cluster_area(coords[clusters == c]), 0.001) | |
| for c in unique_clusters | |
| ]) and | |
| avg_speed < np.percentile(self.processed_df.get('spd', self.processed_df.get('estimated_speed', [30])), 25) | |
| ) | |
| self.tight_places[cluster_id] = { | |
| 'center_lat': np.mean(cluster_points[:, 0]), | |
| 'center_lng': np.mean(cluster_points[:, 1]), | |
| 'point_count': len(cluster_points), | |
| 'unique_vehicles': unique_vehicles, | |
| 'area_km2': cluster_area_km2, | |
| 'point_density_per_km2': point_density, | |
| 'avg_speed_kmh': avg_speed, | |
| 'speed_variance': speed_variance, | |
| 'congestion_score': congestion_score, | |
| 'is_tight_place': is_tight_place, | |
| 'severity': 'High' if congestion_score > np.percentile([ | |
| (len(coords[clusters == c]) * self.processed_df[self.processed_df['density_cluster'] == c]['randomized_id'].nunique()) / | |
| max(self.processed_df[self.processed_df['density_cluster'] == c].get('spd', self.processed_df[self.processed_df['density_cluster'] == c].get('estimated_speed', [30])).mean(), 1) | |
| for c in unique_clusters | |
| ], 75) else 'Medium' if congestion_score > np.percentile([ | |
| (len(coords[clusters == c]) * self.processed_df[self.processed_df['density_cluster'] == c]['randomized_id'].nunique()) / | |
| max(self.processed_df[self.processed_df['density_cluster'] == c].get('spd', self.processed_df[self.processed_df['density_cluster'] == c].get('estimated_speed', [30])).mean(), 1) | |
| for c in unique_clusters | |
| ], 50) else 'Low' | |
| } | |
| # Filter to only tight places | |
| self.tight_places = { | |
| k: v for k, v in self.tight_places.items() | |
| if v['is_tight_place'] | |
| } | |
| # Sort by congestion score | |
| self.tight_places = dict(sorted( | |
| self.tight_places.items(), | |
| key=lambda x: x[1]['congestion_score'], | |
| reverse=True | |
| )) | |
| print(f"Identified {len(self.tight_places)} tight places (congestion areas)") | |
| return self.tight_places | |
| def calculate_cluster_area(self, points): | |
| """Calculate the approximate area of a cluster in km²""" | |
| if len(points) < 3: | |
| return 0.001 # Minimum area for small clusters | |
| # Use convex hull approach for area calculation | |
| from scipy.spatial import ConvexHull | |
| try: | |
| hull = ConvexHull(points) | |
| # Convert to meters using rough approximation | |
| lat_to_m = 111000 # meters per degree latitude | |
| lng_to_m = 111000 * np.cos(np.radians(np.mean(points[:, 0]))) # adjust for longitude | |
| # Scale points to meters | |
| points_m = points.copy() | |
| points_m[:, 0] *= lat_to_m | |
| points_m[:, 1] *= lng_to_m | |
| hull_m = ConvexHull(points_m) | |
| area_m2 = hull_m.volume # In 2D, volume gives area | |
| area_km2 = area_m2 / 1_000_000 # Convert to km² | |
| return max(area_km2, 0.001) # Minimum area | |
| except: | |
| # Fallback: bounding box area | |
| lat_range = np.max(points[:, 0]) - np.min(points[:, 0]) | |
| lng_range = np.max(points[:, 1]) - np.min(points[:, 1]) | |
| area_deg2 = lat_range * lng_range | |
| area_km2 = area_deg2 * 111 * 111 # rough conversion | |
| return max(area_km2, 0.001) | |
| def analyze_route_efficiency(self): | |
| """Analyze route efficiency and suggest optimizations""" | |
| print("Analyzing route efficiency...") | |
| if not self.routes: | |
| print("No routes identified. Run identify_popular_routes() first.") | |
| return {} | |
| efficiency_analysis = {} | |
| for route_id, route_info in self.routes.items(): | |
| trajectory_ids = route_info['trajectory_ids'] | |
| # Get all trajectories for this route | |
| route_trajectories = self.processed_df[ | |
| self.processed_df['randomized_id'].isin(trajectory_ids) | |
| ] | |
| # Calculate efficiency metrics | |
| total_distances = [] | |
| total_times = [] | |
| avg_speeds = [] | |
| for traj_id in trajectory_ids: | |
| traj_data = route_trajectories[route_trajectories['randomized_id'] == traj_id] | |
| if len(traj_data) > 1: | |
| total_distance = traj_data['distance_to_prev'].sum() | |
| total_distances.append(total_distance) | |
| if 'spd' in traj_data.columns: | |
| avg_speed = traj_data['spd'].mean() | |
| else: | |
| avg_speed = traj_data['estimated_speed'].mean() | |
| avg_speeds.append(avg_speed) | |
| if total_distances and avg_speeds: | |
| efficiency_analysis[route_id] = { | |
| 'avg_distance_m': np.mean(total_distances), | |
| 'distance_variance': np.var(total_distances), | |
| 'avg_speed_kmh': np.mean(avg_speeds), | |
| 'speed_consistency': 1 / (1 + np.var(avg_speeds)), # Higher is more consistent | |
| 'efficiency_score': np.mean(avg_speeds) / max(np.mean(total_distances) / 1000, 0.1), # Speed per km | |
| 'route_optimization_potential': 'High' if np.var(total_distances) > np.mean(total_distances) * 0.3 else 'Low' | |
| } | |
| return efficiency_analysis | |
| def create_visualizations_for_gradio(self): | |
| """Create visualizations and return figures for Gradio (plotly for routes, matplotlib for others)""" | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| print("Creating visualizations for Gradio...") | |
| # Set up the plotting style for matplotlib | |
| plt.style.use('default') | |
| sns.set_palette("husl") | |
| figures = {} | |
| # 1. Popular Routes Visualization using Plotly (Real Map) | |
| if self.routes: | |
| # Debug: Print coordinate ranges | |
| print(f"Coordinate ranges: Lat {self.processed_df['lat'].min():.4f} to {self.processed_df['lat'].max():.4f}, " | |
| f"Lng {self.processed_df['lng'].min():.4f} to {self.processed_df['lng'].max():.4f}") | |
| # Try different approaches for mapping | |
| try: | |
| # Method 1: Try Scattermapbox first | |
| fig1 = go.Figure() | |
| # Add base GPS points (sample for performance) | |
| sample_points = self.processed_df.sample(min(3000, len(self.processed_df))) | |
| fig1.add_trace(go.Scattermapbox( | |
| lat=sample_points['lat'], | |
| lon=sample_points['lng'], | |
| mode='markers', | |
| marker=dict(size=3, color='lightgray', opacity=0.4), | |
| name='GPS Points', | |
| hoverinfo='skip' | |
| )) | |
| # Add popular routes with different colors | |
| colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 'pink', 'olive', 'cyan', 'magenta'] | |
| for i, (route_id, route_info) in enumerate(list(self.routes.items())[:10]): | |
| color = colors[i % len(colors)] | |
| start_point = route_info['avg_start_point'] | |
| end_point = route_info['avg_end_point'] | |
| # Add start point | |
| fig1.add_trace(go.Scattermapbox( | |
| lat=[start_point['lat']], | |
| lon=[start_point['lng']], | |
| mode='markers', | |
| marker=dict(size=12, color=color, symbol='circle'), | |
| name=f'Route {route_id} Start ({route_info["route_count"]} trips)', | |
| hovertemplate=f'<b>Route {route_id} - Start</b><br>' + | |
| f'Trips: {route_info["route_count"]}<br>' + | |
| f'Lat: {start_point["lat"]:.4f}<br>' + | |
| f'Lng: {start_point["lng"]:.4f}<extra></extra>' | |
| )) | |
| # Add end point | |
| fig1.add_trace(go.Scattermapbox( | |
| lat=[end_point['lat']], | |
| lon=[end_point['lng']], | |
| mode='markers', | |
| marker=dict(size=12, color=color, symbol='square'), | |
| name=f'Route {route_id} End', | |
| hovertemplate=f'<b>Route {route_id} - End</b><br>' + | |
| f'Avg Length: {route_info["avg_route_length_m"]/1000:.2f} km<br>' + | |
| f'Lat: {end_point["lat"]:.4f}<br>' + | |
| f'Lng: {end_point["lng"]:.4f}<extra></extra>' | |
| )) | |
| # Add route line | |
| fig1.add_trace(go.Scattermapbox( | |
| lat=[start_point['lat'], end_point['lat']], | |
| lon=[start_point['lng'], end_point['lng']], | |
| mode='lines', | |
| line=dict(width=3, color=color), | |
| name=f'Route {route_id} Path', | |
| hoverinfo='skip' | |
| )) | |
| # Calculate center and zoom | |
| center_lat = self.processed_df['lat'].mean() | |
| center_lng = self.processed_df['lng'].mean() | |
| lat_range = self.processed_df['lat'].max() - self.processed_df['lat'].min() | |
| lng_range = self.processed_df['lng'].max() - self.processed_df['lng'].min() | |
| max_range = max(lat_range, lng_range) | |
| if max_range > 1: | |
| zoom_level = 8 | |
| elif max_range > 0.1: | |
| zoom_level = 10 | |
| elif max_range > 0.01: | |
| zoom_level = 12 | |
| else: | |
| zoom_level = 14 | |
| fig1.update_layout( | |
| title='Popular Routes on Real Map<br><sub>Circle=Start, Square=End</sub>', | |
| mapbox=dict( | |
| style='carto-positron', | |
| center=dict(lat=center_lat, lon=center_lng), | |
| zoom=zoom_level | |
| ), | |
| showlegend=True, | |
| height=600, | |
| margin=dict(l=0, r=0, t=50, b=0) | |
| ) | |
| figures['popular_routes'] = fig1 | |
| print("✅ Created Scattermapbox visualization") | |
| except Exception as e: | |
| print(f"⚠️ Scattermapbox failed: {e}, trying Scatter Geo...") | |
| # Method 2: Fallback to scatter_geo | |
| try: | |
| fig1 = go.Figure() | |
| # Add base GPS points | |
| sample_points = self.processed_df.sample(min(3000, len(self.processed_df))) | |
| fig1.add_trace(go.Scattergeo( | |
| lat=sample_points['lat'], | |
| lon=sample_points['lng'], | |
| mode='markers', | |
| marker=dict(size=3, color='lightgray', opacity=0.4), | |
| name='GPS Points', | |
| hoverinfo='skip' | |
| )) | |
| colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 'pink', 'olive', 'cyan', 'magenta'] | |
| for i, (route_id, route_info) in enumerate(list(self.routes.items())[:10]): | |
| color = colors[i % len(colors)] | |
| start_point = route_info['avg_start_point'] | |
| end_point = route_info['avg_end_point'] | |
| # Add start point | |
| fig1.add_trace(go.Scattergeo( | |
| lat=[start_point['lat']], | |
| lon=[start_point['lng']], | |
| mode='markers', | |
| marker=dict(size=12, color=color, symbol='circle'), | |
| name=f'Route {route_id} Start ({route_info["route_count"]} trips)', | |
| hovertemplate=f'<b>Route {route_id} - Start</b><br>' + | |
| f'Trips: {route_info["route_count"]}<br>' + | |
| f'Lat: {start_point["lat"]:.4f}<br>' + | |
| f'Lng: {start_point["lng"]:.4f}<extra></extra>' | |
| )) | |
| # Add end point | |
| fig1.add_trace(go.Scattergeo( | |
| lat=[end_point['lat']], | |
| lon=[end_point['lng']], | |
| mode='markers', | |
| marker=dict(size=12, color=color, symbol='square'), | |
| name=f'Route {route_id} End', | |
| hovertemplate=f'<b>Route {route_id} - End</b><br>' + | |
| f'Avg Length: {route_info["avg_route_length_m"]/1000:.2f} km<br>' + | |
| f'Lat: {end_point["lat"]:.4f}<br>' + | |
| f'Lng: {end_point["lng"]:.4f}<extra></extra>' | |
| )) | |
| # Add route line | |
| fig1.add_trace(go.Scattergeo( | |
| lat=[start_point['lat'], end_point['lat']], | |
| lon=[start_point['lng'], end_point['lng']], | |
| mode='lines', | |
| line=dict(width=3, color=color), | |
| name=f'Route {route_id} Path', | |
| hoverinfo='skip' | |
| )) | |
| center_lat = self.processed_df['lat'].mean() | |
| center_lng = self.processed_df['lng'].mean() | |
| fig1.update_layout( | |
| title='Popular Routes on World Map<br><sub>Circle=Start, Square=End</sub>', | |
| geo=dict( | |
| projection_type='natural earth', | |
| showland=True, | |
| landcolor='rgb(243, 243, 243)', | |
| coastlinecolor='rgb(204, 204, 204)', | |
| center=dict(lat=center_lat, lon=center_lng), | |
| projection_scale=1 | |
| ), | |
| showlegend=True, | |
| height=600, | |
| margin=dict(l=0, r=0, t=50, b=0) | |
| ) | |
| figures['popular_routes'] = fig1 | |
| print("✅ Created Scatter Geo visualization") | |
| except Exception as e2: | |
| print(f"⚠️ Scatter Geo also failed: {e2}, using matplotlib fallback...") | |
| # Method 3: Matplotlib fallback | |
| fig1 = plt.figure(figsize=(15, 10)) | |
| # Plot all points in light gray | |
| plt.scatter(self.processed_df['lng'], self.processed_df['lat'], | |
| c='lightgray', alpha=0.1, s=0.5, label='All GPS Points') | |
| # Plot popular routes | |
| colors_mpl = plt.cm.Set1(np.linspace(0, 1, len(self.routes))) | |
| for i, (route_id, route_info) in enumerate(list(self.routes.items())[:10]): | |
| start_point = route_info['avg_start_point'] | |
| end_point = route_info['avg_end_point'] | |
| # Plot start and end points | |
| plt.scatter(start_point['lng'], start_point['lat'], | |
| c=[colors_mpl[i]], s=100, marker='o', | |
| label=f'Route {route_id} Start ({route_info["route_count"]} trips)') | |
| plt.scatter(end_point['lng'], end_point['lat'], | |
| c=[colors_mpl[i]], s=100, marker='s') | |
| # Draw line between start and end | |
| plt.plot([start_point['lng'], end_point['lng']], | |
| [start_point['lat'], end_point['lat']], | |
| c=colors_mpl[i], linewidth=2, alpha=0.7) | |
| plt.xlabel('Longitude') | |
| plt.ylabel('Latitude') | |
| plt.title('Popular Routes Identification\n(Circle=Start, Square=End)') | |
| plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') | |
| plt.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| figures['popular_routes'] = fig1 | |
| print("✅ Created matplotlib fallback visualization") | |
| # 2. Tight Places (Congestion Areas) Visualization - Keep as matplotlib | |
| if self.tight_places: | |
| fig2 = plt.figure(figsize=(15, 10)) | |
| # Plot all points | |
| plt.scatter(self.processed_df['lng'], self.processed_df['lat'], | |
| c='lightblue', alpha=0.1, s=0.5, label='All GPS Points') | |
| # Plot tight places with size based on congestion score | |
| for place_id, place_info in self.tight_places.items(): | |
| size = min(place_info['congestion_score'] * 10, 500) | |
| color = {'High': 'red', 'Medium': 'orange', 'Low': 'yellow'}[place_info['severity']] | |
| plt.scatter(place_info['center_lng'], place_info['center_lat'], | |
| s=size, c=color, alpha=0.7, edgecolors='black', | |
| label=f'{place_info["severity"]} Congestion ({place_info["unique_vehicles"]} vehicles)') | |
| plt.xlabel('Longitude') | |
| plt.ylabel('Latitude') | |
| plt.title('Tight Places (Congestion Areas) Identification\n(Size = Congestion Score)') | |
| plt.legend() | |
| plt.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| figures['tight_places'] = fig2 | |
| # 3. Combined Analysis Map | |
| fig3 = plt.figure(figsize=(15, 10)) | |
| # Base map | |
| plt.scatter(self.processed_df['lng'], self.processed_df['lat'], | |
| c='lightgray', alpha=0.05, s=0.3) | |
| # Popular routes | |
| if self.routes: | |
| route_colors = plt.cm.Blues(np.linspace(0.4, 1, len(self.routes))) | |
| for i, (route_id, route_info) in enumerate(list(self.routes.items())[:5]): | |
| start_point = route_info['avg_start_point'] | |
| end_point = route_info['avg_end_point'] | |
| plt.plot([start_point['lng'], end_point['lng']], | |
| [start_point['lat'], end_point['lat']], | |
| c=route_colors[i], linewidth=3, alpha=0.8, | |
| label=f'Popular Route {route_id}') | |
| # Tight places | |
| if self.tight_places: | |
| for place_id, place_info in self.tight_places.items(): | |
| size = min(place_info['congestion_score'] * 15, 300) | |
| plt.scatter(place_info['center_lng'], place_info['center_lat'], | |
| s=size, c='red', alpha=0.8, marker='X', edgecolors='darkred', | |
| label='Congestion Area' if place_id == list(self.tight_places.keys())[0] else "") | |
| plt.xlabel('Longitude') | |
| plt.ylabel('Latitude') | |
| plt.title('Combined Analysis: Popular Routes & Congestion Areas') | |
| plt.legend() | |
| plt.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| figures['combined_analysis'] = fig3 | |
| # 4. Statistics Dashboard | |
| fig4, axes = plt.subplots(2, 2, figsize=(15, 10)) | |
| # Route popularity distribution | |
| if self.routes: | |
| route_counts = [info['route_count'] for info in self.routes.values()] | |
| axes[0, 0].bar(range(len(route_counts)), route_counts, color='skyblue') | |
| axes[0, 0].set_xlabel('Route Cluster ID') | |
| axes[0, 0].set_ylabel('Number of Trips') | |
| axes[0, 0].set_title('Route Popularity Distribution') | |
| axes[0, 0].grid(True, alpha=0.3) | |
| # Congestion severity distribution | |
| if self.tight_places: | |
| severity_counts = {} | |
| for place_info in self.tight_places.values(): | |
| severity = place_info['severity'] | |
| severity_counts[severity] = severity_counts.get(severity, 0) + 1 | |
| axes[0, 1].pie(severity_counts.values(), labels=severity_counts.keys(), | |
| autopct='%1.1f%%', colors=['red', 'orange', 'yellow']) | |
| axes[0, 1].set_title('Congestion Severity Distribution') | |
| # Speed distribution | |
| speed_col = 'spd' if 'spd' in self.processed_df.columns else 'estimated_speed' | |
| if speed_col in self.processed_df.columns: | |
| axes[1, 0].hist(self.processed_df[speed_col], bins=50, alpha=0.7, color='green') | |
| axes[1, 0].set_xlabel('Speed (km/h)') | |
| axes[1, 0].set_ylabel('Frequency') | |
| axes[1, 0].set_title('Speed Distribution') | |
| axes[1, 0].grid(True, alpha=0.3) | |
| # Vehicle count by area | |
| unique_vehicles_per_cluster = self.processed_df.groupby('density_cluster')['randomized_id'].nunique() | |
| axes[1, 1].bar(range(len(unique_vehicles_per_cluster)), | |
| unique_vehicles_per_cluster.values, color='purple', alpha=0.7) | |
| axes[1, 1].set_xlabel('Area Cluster') | |
| axes[1, 1].set_ylabel('Unique Vehicles') | |
| axes[1, 1].set_title('Vehicle Distribution by Area') | |
| axes[1, 1].grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| figures['statistics_dashboard'] = fig4 | |
| print("Visualizations created for Gradio!") | |
| return figures | |
| def create_visualizations(self, output_dir='./geo_analysis_output'): | |
| """Create comprehensive visualizations and save to files (legacy method)""" | |
| import os | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Get figures from the new method | |
| figures = self.create_visualizations_for_gradio() | |
| # Save each figure | |
| for name, fig in figures.items(): | |
| if hasattr(fig, 'write_image'): # Plotly figure | |
| fig.write_image(f'{output_dir}/{name}.png', width=1500, height=600, scale=2) | |
| else: # Matplotlib figure | |
| fig.savefig(f'{output_dir}/{name}.png', dpi=300, bbox_inches='tight') | |
| plt.close(fig) | |
| print(f"Visualizations saved to {output_dir}/") | |
| def generate_report(self): | |
| """Generate a comprehensive analysis report""" | |
| print("Generating analysis report...") | |
| report = { | |
| 'data_summary': { | |
| 'total_records': len(self.processed_df), | |
| 'unique_vehicles': self.processed_df['randomized_id'].nunique(), | |
| 'geographic_bounds': { | |
| 'lat_min': self.processed_df['lat'].min(), | |
| 'lat_max': self.processed_df['lat'].max(), | |
| 'lng_min': self.processed_df['lng'].min(), | |
| 'lng_max': self.processed_df['lng'].max() | |
| } | |
| }, | |
| 'popular_routes': { | |
| 'total_route_clusters': len(self.routes) if self.routes else 0, | |
| 'top_5_routes': [] | |
| }, | |
| 'tight_places': { | |
| 'total_congestion_areas': len(self.tight_places) if self.tight_places else 0, | |
| 'severity_breakdown': {}, | |
| 'top_5_congestion_areas': [] | |
| } | |
| } | |
| # Add popular routes details | |
| if self.routes: | |
| for i, (route_id, route_info) in enumerate(list(self.routes.items())[:5]): | |
| report['popular_routes']['top_5_routes'].append({ | |
| 'route_id': route_id, | |
| 'trip_count': route_info['route_count'], | |
| 'popularity_percentage': route_info['popularity_score'], | |
| 'avg_length_km': route_info['avg_route_length_m'] / 1000, | |
| 'start_location': route_info['avg_start_point'], | |
| 'end_location': route_info['avg_end_point'] | |
| }) | |
| # Add tight places details | |
| if self.tight_places: | |
| severity_counts = {'High': 0, 'Medium': 0, 'Low': 0} | |
| for place_info in self.tight_places.values(): | |
| severity_counts[place_info['severity']] += 1 | |
| report['tight_places']['severity_breakdown'] = severity_counts | |
| for i, (place_id, place_info) in enumerate(list(self.tight_places.items())[:5]): | |
| report['tight_places']['top_5_congestion_areas'].append({ | |
| 'area_id': place_id, | |
| 'congestion_score': place_info['congestion_score'], | |
| 'severity': place_info['severity'], | |
| 'unique_vehicles': place_info['unique_vehicles'], | |
| 'avg_speed_kmh': place_info['avg_speed_kmh'], | |
| 'location': { | |
| 'lat': place_info['center_lat'], | |
| 'lng': place_info['center_lng'] | |
| } | |
| }) | |
| return report | |
| def run_complete_analysis(data_path_or_df, output_dir='./geo_analysis_output', sample_size=400000): | |
| """Run complete geo-tracking analysis pipeline focused on routes and congestion""" | |
| print("="*60) | |
| print("ADVANCED GEO-TRACKING ANALYSIS") | |
| print("FOCUS: Popular Routes & Congestion Areas") | |
| print("="*60) | |
| # Initialize analyzer with sampling | |
| analyzer = AdvancedGeoTrackAnalyzer(data_path_or_df, sample_size=sample_size) | |
| # 1. Preprocess data | |
| analyzer.preprocess_data() | |
| # 2. Identify popular routes | |
| print("\n" + "="*40) | |
| print("IDENTIFYING POPULAR ROUTES") | |
| print("="*40) | |
| routes = analyzer.identify_popular_routes() | |
| # 3. Identify tight places (congestion areas) | |
| print("\n" + "="*40) | |
| print("IDENTIFYING CONGESTION AREAS") | |
| print("="*40) | |
| tight_places = analyzer.identify_tight_places() | |
| # 4. Analyze route efficiency | |
| print("\n" + "="*40) | |
| print("ANALYZING ROUTE EFFICIENCY") | |
| print("="*40) | |
| efficiency = analyzer.analyze_route_efficiency() | |
| # 5. Create visualizations | |
| print("\n" + "="*40) | |
| print("CREATING VISUALIZATIONS") | |
| print("="*40) | |
| analyzer.create_visualizations(output_dir) | |
| # 6. Generate report | |
| report = analyzer.generate_report() | |
| print("\n" + "="*60) | |
| print("ANALYSIS COMPLETE!") | |
| print("="*60) | |
| print(f"Results saved to: {output_dir}") | |
| print(f"Total records processed: {len(analyzer.processed_df):,}") | |
| print(f"Unique vehicles: {analyzer.processed_df['randomized_id'].nunique():,}") | |
| print(f"Popular routes identified: {len(routes)}") | |
| print(f"Congestion areas identified: {len(tight_places)}") | |
| def convert_numpy_types(obj): | |
| """Convert numpy types to native Python types for JSON serialization""" | |
| if isinstance(obj, dict): | |
| return {str(k): convert_numpy_types(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [convert_numpy_types(item) for item in obj] | |
| elif isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| else: | |
| return obj | |
| if routes: | |
| print(f"\nTop 3 Popular Routes:") | |
| for i, (route_id, route_info) in enumerate(list(routes.items())[:3]): | |
| print(f" Route {route_id}: {route_info['route_count']} trips ({route_info['popularity_score']:.1f}% of all routes)") | |
| with open(f'{output_dir}/popular_routes.json', 'w') as f: | |
| json.dump(convert_numpy_types(routes), f, indent=2, default=str) | |
| print(f"Popular routes saved to {output_dir}/popular_routes.json") | |
| if tight_places: | |
| print(f"\nTop 3 Congestion Areas:") | |
| for i, (place_id, place_info) in enumerate(list(tight_places.items())[:3]): | |
| print(f" Area {place_id}: {place_info['severity']} severity, {place_info['unique_vehicles']} vehicles, avg speed {place_info['avg_speed_kmh']:.1f} km/h") | |
| with open(f'{output_dir}/tight_places.json', 'w') as f: | |
| json.dump(convert_numpy_types(tight_places), f, indent=2, default=str) | |
| print(f"Tight places saved to {output_dir}/tight_places.json") | |
| return analyzer, report | |
| def predict_traffic_patterns_with_plots(df, sample_size=500000): | |
| """ | |
| Analyze traffic patterns from DataFrame and return predictions as JSON plus matplotlib figures for Gradio | |
| Parameters: | |
| df: pandas.DataFrame - Input DataFrame with geo-tracking data | |
| sample_size: int - Maximum number of rows to use for analysis (default 500k) | |
| Returns: | |
| tuple: (predictions_dict, figures_dict) where: | |
| - predictions_dict: JSON-serializable predictions | |
| - figures_dict: Dictionary of matplotlib figures for Gradio display | |
| """ | |
| def convert_numpy_types(obj): | |
| """Convert numpy types to native Python types for JSON serialization""" | |
| if isinstance(obj, dict): | |
| return {str(k): convert_numpy_types(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [convert_numpy_types(item) for item in obj] | |
| elif isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| else: | |
| return obj | |
| try: | |
| # Initialize analyzer with sampling | |
| analyzer = AdvancedGeoTrackAnalyzer(df, sample_size=sample_size) | |
| # Run analysis steps | |
| analyzer.preprocess_data() | |
| routes = analyzer.identify_popular_routes() | |
| tight_places = analyzer.identify_tight_places() | |
| efficiency = analyzer.analyze_route_efficiency() | |
| # Generate visualizations for Gradio (returns matplotlib figures) | |
| figures = analyzer.create_visualizations_for_gradio() | |
| # Generate report | |
| report = analyzer.generate_report() | |
| # Convert the report to JSON-serializable format | |
| json_predictions = convert_numpy_types(report) | |
| # Create predictions dictionary | |
| predictions = { | |
| 'status': 'success', | |
| 'analysis_summary': json_predictions, | |
| 'popular_routes': { | |
| 'total_clusters': len(analyzer.routes) if analyzer.routes else 0, | |
| 'routes': convert_numpy_types(analyzer.routes) if analyzer.routes else {} | |
| }, | |
| 'congestion_areas': { | |
| 'total_areas': len(analyzer.tight_places) if analyzer.tight_places else 0, | |
| 'areas': convert_numpy_types(analyzer.tight_places) if analyzer.tight_places else {} | |
| }, | |
| 'metadata': { | |
| 'sample_size_used': len(analyzer.processed_df), | |
| 'unique_vehicles': analyzer.processed_df['randomized_id'].nunique(), | |
| 'analysis_date': pd.Timestamp.now().isoformat() | |
| } | |
| } | |
| return predictions, figures | |
| except Exception as e: | |
| error_predictions = { | |
| 'status': 'error', | |
| 'error_message': str(e), | |
| 'analysis_summary': {}, | |
| 'popular_routes': {'total_clusters': 0, 'routes': {}}, | |
| 'congestion_areas': {'total_areas': 0, 'areas': {}}, | |
| 'metadata': {'error_date': pd.Timestamp.now().isoformat()} | |
| } | |
| return error_predictions, {} |