""" Example usage: python3.9 -m mapper.data.debug.calc_stats -d /ocean/projects/cis220039p/shared/map_perception/dataset_v0 """ import datetime from datetime import datetime, timezone, timedelta import time import argparse import os from pathlib import Path import json from astral import LocationInfo from astral.sun import sun from timezonefinder import TimezoneFinder import numpy as np import pandas as pd import geopandas as gpd from pyproj.transformer import Transformer from matplotlib import pyplot as plt from matplotlib.backends.backend_pdf import PdfPages import tqdm from ..fpv import filters from .. import logger def is_daytime(timestamp, latitude, longitude): # Create a LocationInfo object for the given latitude and longitude tz_str = TimezoneFinder().timezone_at(lng=longitude, lat=latitude) location = LocationInfo(name="", region="", timezone=tz_str, latitude=latitude, longitude=longitude) # Convert the timestamp to a datetime object dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) # We query one day before and one day after to avoid timezone ambiguities # Our query timestamp is guaranteed to fall into one of those 3 dates. # Astral sometimes returns sunrise or sunsets that are not from the same query date # Refer to this https://github.com/sffjunkie/astral/issues/83 d0 = (dt - timedelta(days=1)).date() d1 = dt.date() d2 = (dt + timedelta(days=1)).date() # Calculate sunrise and sunset times times = list() for d in [d0, d1, d2]: s = sun(location.observer, date=d) sunrise = s['sunrise'] sunset = s['sunset'] times.append((sunrise, "sunrise")) times.append((sunset, 'sunset')) # Need to sort because there is no particular order # where sunrise is always before sunset or vice versa times = sorted(times, key=lambda x: x[0]) assert times[-1][0] > dt > times[0][0] for i in range(1, len(times)): if dt < times[i][0]: prev_event = times[i-1][1] break return prev_event == "sunrise" def calculate_occupancy_map(df: pd.DataFrame, bev_meter_coverage=112, meters_per_pixel=112): """ Args: bev_meter_coverage: How much did the BEVs in the dataframe cover in meters meters_per_pixel: At what resolution should we initialize the occupancy map. This need not be the same resolution as the BEV. That would be unnecessarilly slow but most accurate. """ # convert pandas dataframe to geopandas dataframe gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy( df['computed_geometry.long'], df['computed_geometry.lat']), crs=4326) utm_crs = gdf.estimate_utm_crs() gdf_utm = gdf.to_crs(utm_crs) left = gdf_utm.geometry.x.min() - bev_meter_coverage right = gdf_utm.geometry.x.max() + bev_meter_coverage bottom = gdf_utm.geometry.y.min() - bev_meter_coverage top = gdf_utm.geometry.y.max() + bev_meter_coverage width = right - left height = top - bottom width_pixels = int(width // meters_per_pixel) height_pixels = int(height // meters_per_pixel) if bev_meter_coverage % meters_per_pixel != 0: logger.warn(f"bev_meter_coverage {bev_meter_coverage} is not divisble by meters_per_pixel " f"{meters_per_pixel}. Occupancy may be overestimated.") bev_pixels = int(np.ceil(bev_meter_coverage / meters_per_pixel)) logger.info(f"Initializing {height_pixels}x{width_pixels} occupancy map. Using {bev_pixels}x{bev_pixels} pixels for each BEV.") map = np.zeros((height_pixels, width_pixels), dtype=bool) for row in gdf_utm.itertuples(): utm_x = row.geometry.x utm_y = row.geometry.y img_x = int((utm_x - left) // meters_per_pixel) img_y = int((utm_y - bottom) // meters_per_pixel) bev_pixels_left = bev_pixels // 2 bev_pixels_right = bev_pixels - bev_pixels_left map[img_y - bev_pixels_left: img_y + bev_pixels_right, img_x - bev_pixels_left: img_x + bev_pixels_right] = True return map if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--dataset_dir", '-d', type=str, required=True, help="Dataset directory") parser.add_argument("--locations", '-l', type=str, default="all", help="Location names in CSV format. Set to 'all' to traverse all locations.") parser.add_argument("--plot", action="store_true", help="Store plots per location in PDFs") parser.add_argument("--output", "-o", default=None, type=str, help="output json file to store statistics") args = parser.parse_args() locations = list() if args.locations.lower() == "all": locations = os.listdir(args.dataset_dir) locations = [l for l in locations if os.path.isdir(os.path.join(args.dataset_dir, l))] else: locations = args.locations.split(",") logger.info(f"Parsing {len(locations)} locations..") all_locs_stats = dict() for location in tqdm.tqdm(locations): dataset_dir = Path(args.dataset_dir) location_dir = dataset_dir / location bev_dir = location_dir / "bev_raw" semantic_mask_dir = location_dir / "semantic_masks" osm_cache_dir = location_dir / "osm_cache" pq_name = 'image_metadata_filtered_processed.parquet' df = pd.read_parquet(location_dir / pq_name) df = df[df["computed_geometry.lat"].notna()] df = df[df["computed_geometry.long"].notna()] logger.info(f"Loaded {df.shape[0]} image metadata from {location}") # Calc derrivative attributes tqdm.tqdm.pandas() df["loc_descrip"] = filters.haversine_np( lon1=df["geometry.long"], lat1=df["geometry.lat"], lon2=df["computed_geometry.long"], lat2=df["computed_geometry.lat"] ) df["angle_descrip"] = filters.angle_dist( df["compass_angle"], df["computed_compass_angle"] ) # FIXME: Super slow # df["is_daytime"] = df.progress_apply(lambda x: is_daytime(x["captured_at"]*1e-3, # x["computed_geometry.lat"], # x["computed_geometry.long"]), # axis="columns", raw=False, engine="python") meters_per_pixel = 7 map = calculate_occupancy_map(df, bev_meter_coverage=112, meters_per_pixel=meters_per_pixel) # Calc aggregate stats loc_stats = dict() loc_stats["num_images"] = len(df) loc_stats["area_covered_km2"] = np.sum(map) * meters_per_pixel ** 2 * 1e-6 loc_stats["camera_types"] = set(df["camera_type"].unique()) loc_stats["camera_makes"] = set(df["make"].unique()) loc_stats["camera_model"] = set(df["model"].unique()) all_locs_stats[location] = loc_stats # Plot if requested if args.plot: with PdfPages(location_dir / "stats.pdf") as pdf: plt.figure() plt.imshow(map) plt.title(f"{location} occupancy map") pdf.savefig() plt.close() for k in ["make", "model", "camera_type", "loc_descrip", "angle_descrip"]: plt.figure() df[k].hist() plt.title(k) plt.xlabel(k) plt.xticks(rotation=90) plt.ylabel("Count") plt.tight_layout() pdf.savefig() plt.close() # Aggregate all stats aggregated_stats = dict() for loc, loc_stats in all_locs_stats.items(): for k,v in loc_stats.items(): if isinstance(v, float) or isinstance(v, int): if k not in aggregated_stats.keys(): aggregated_stats[k] = v else: aggregated_stats[k] += v elif isinstance(v, set): if k not in aggregated_stats.keys(): aggregated_stats[k] = v else: aggregated_stats[k] = aggregated_stats[k].union(v) aggregated_stats[f"{k}_count"] = len(aggregated_stats[k]) else: raise Exception(f"{v} is not supported !") all_locs_stats["aggregated"] = aggregated_stats print(all_locs_stats) # Store for json for loc, loc_stats in all_locs_stats.items(): for k,v in loc_stats.items(): if isinstance(v, set): loc_stats[k] = list(v) if args.output: with open(args.output, "w") as f: json.dump(all_locs_stats, f, indent=2)