# Adapted from OrienterNet import json from pathlib import Path import numpy as np import httpx import asyncio from aiolimiter import AsyncLimiter import tqdm import requests import mercantile import geojson import turfpy.measurement from vt2geojson.tools import vt_bytes_to_geojson from opensfm.pygeometry import Camera, Pose from opensfm.pymap import Shot from .. import logger from .geo import Projection semaphore = asyncio.Semaphore(100) # number of parallel threads. image_filename = "{image_id}.jpg" info_filename = "{image_id}.json" class MapillaryDownloader: image_fields = ( "id", "height", "width", "camera_parameters", "camera_type", "captured_at", "compass_angle", "geometry", "altitude", "computed_compass_angle", "computed_geometry", "computed_altitude", "computed_rotation", "thumb_2048_url", "thumb_original_url", "sequence", "sfm_cluster", "creator", "make", "model", "is_pano", "quality_score", "exif_orientation" ) image_info_url = ( "https://graph.mapillary.com/{image_id}?access_token={token}&fields={fields}" ) seq_info_url = "https://graph.mapillary.com/image_ids?access_token={token}&sequence_id={seq_id}" tile_info_url = "https://tiles.mapillary.com/maps/vtp/mly1_public/2/{z}/{x}/{y}?access_token={token}" max_requests_per_minute = 50_000 def __init__(self, token: str): self.token = token self.client = httpx.AsyncClient( transport=httpx.AsyncHTTPTransport(retries=20), timeout=600 ) self.limiter = AsyncLimiter(self.max_requests_per_minute // 2, time_period=60) async def call_api(self, url: str): async with self.limiter: r = await self.client.get(url) if not r.is_success: logger.error("Error in API call: %s", r.text) return r async def get_tile_image_points(self, tile): url = self.tile_info_url.format( x=tile.x, y=tile.y, z=tile.z, token=self.token ) try : r = await self.call_api(url) if r.is_success: geo_d = vt_bytes_to_geojson( b_content=r._content, x=tile.x, y=tile.y, z=tile.z, layer="image", ) d = geo_d["features"] return tile, d except Exception as e: logger.error(f"{type(e).__name__}: {e}") return tile, None async def get_tiles_image_points(self, tiles, retries=3): tile_to_images = {} tasks = [self.get_tile_image_points(t) for t in tiles] for i in range(retries): failed_tiles = list() for task in tqdm.asyncio.tqdm.as_completed(tasks): tile, image_ids = await task if image_ids is not None: tile_to_images[f"z_{tile.z}_x{tile.x}_y{tile.y}"] = image_ids else: logger.error(f"Error when retrieving tile z_{tile.z}_x{tile.x}_y{tile.y}. Image_ids is None. Skipping.") failed_tiles.append(tile) if len(failed_tiles) == 0: break else: if i == retries-1: logger.error(f"Failed to retrieve {len(failed_tiles)} tiles in attempt {i}. Maxed out retries. Skipping those tiles.") else: logger.error(f"Failed to retrieve {len(failed_tiles)} tiles in attempt {i}. Trying again..") tasks = [self.get_tile_image_points(t) for t in failed_tiles] return tile_to_images async def get_image_info(self, image_id: int): url = self.image_info_url.format( image_id=image_id, token=self.token, fields=",".join(self.image_fields), ) r = await self.call_api(url) if r.is_success: return json.loads(r.text) async def get_sequence_info(self, seq_id: str): url = self.seq_info_url.format(seq_id=seq_id, token=self.token) r = await self.call_api(url) if r.is_success: return json.loads(r.text) async def download_image_pixels(self, url: str, path: Path): r = await self.call_api(url) if r.is_success: with open(path, "wb") as fid: fid.write(r.content) return r.is_success async def get_image_info_cached(self, image_id: int, path: Path): if path.exists(): info = json.loads(path.read_text()) else: info = await self.get_image_info(image_id) path.write_text(json.dumps(info)) return info async def download_image_pixels_cached(self, url: str, path: Path): if path.exists(): return True else: return await self.download_image_pixels(url, path) async def fetch_images_in_sequence(i, downloader): async with semaphore: info = await downloader.get_sequence_info(i) image_ids = [int(d["id"]) for d in info["data"]] return i, image_ids async def fetch_images_in_sequences(sequence_ids, downloader): seq_to_images_ids = {} tasks = [fetch_images_in_sequence(i, downloader) for i in sequence_ids] for task in tqdm.asyncio.tqdm.as_completed(tasks): i, image_ids = await task seq_to_images_ids[i] = image_ids return seq_to_images_ids async def fetch_image_info(i, downloader, dir_): async with semaphore: path = dir_ / info_filename.format(image_id=i) # info = await downloader.get_image_info_cached(i, path) info = await downloader.get_image_info(i) # FIXME: temporarily disable caching, takes too long to reads many (>1mil) files return i, info async def fetch_image_infos(image_ids, downloader, dir_): infos = {} num_fail = 0 tasks = [fetch_image_info(i, downloader, dir_) for i in image_ids] for task in tqdm.asyncio.tqdm.as_completed(tasks): i, info = await task if info is None: num_fail += 1 else: infos[i] = info return infos, num_fail async def fetch_image_pixels(i, url, downloader, dir_, overwrite=False): async with semaphore: path = dir_ / image_filename.format(image_id=i) if overwrite: path.unlink(missing_ok=True) success = await downloader.download_image_pixels_cached(url, path) return i, success async def fetch_images_pixels(image_urls, downloader, dir_): num_fail = 0 tasks = [fetch_image_pixels(*id_url, downloader, dir_) for id_url in image_urls] for task in tqdm.asyncio.tqdm.as_completed(tasks): i, success = await task num_fail += not success return num_fail def opensfm_camera_from_info(info: dict) -> Camera: cam_type = info["camera_type"] if cam_type == "perspective": camera = Camera.create_perspective(*info["camera_parameters"]) elif cam_type == "fisheye": camera = Camera.create_fisheye(*info["camera_parameters"]) elif Camera.is_panorama(cam_type): camera = Camera.create_spherical() else: raise ValueError(cam_type) camera.width = info["width"] camera.height = info["height"] camera.id = info["id"] return camera def opensfm_shot_from_info(info: dict, projection: Projection) -> Shot: latlong = info["computed_geometry.coordinates"][::-1] alt = info["computed_altitude"] xyz = projection.project(np.array([*latlong, alt]), return_z=True) c_rotvec_w = np.array(info["computed_rotation"]) pose = Pose() pose.set_from_cam_to_world(-c_rotvec_w, xyz) camera = opensfm_camera_from_info(info) return latlong, Shot(info["id"], camera, pose) def get_city_boundary(city, state=None, country=None, fetch_shape=False): # Use Nominatim API to get the boundary of the city base_url = "https://nominatim.openstreetmap.org/search" params = { 'city': city, 'state': state, 'country': country, 'format': 'json', 'limit': 1, 'polygon_geojson': 1 if fetch_shape else 0 } # Without a user-agent we may get blocked. This is an arbitrary user-agent and can be changed # Rotating between user-agents may circumvent blocks but may not be fair headers = { 'User-Agent': f'mapperceptionnet_{city}_{state}' } response = requests.get(base_url, params=params, headers=headers) if response.status_code != 200: logger.error(f"Nominatim error when fetching boundary data for {city}, {state}.\n" f"Status code: {response.status_code}. Content: {response.content}") return None data = response.json() if data is None: logger.warn(f"No data returned by Nominatim for {city}, {state}") return None # Extract bbox data from the API response bbox_data = data[0]['boundingbox'] bbox = { 'west': float(bbox_data[2]), 'south': float(bbox_data[0]), 'east': float(bbox_data[3]), 'north': float(bbox_data[1]) } if fetch_shape: # Extract GeoJSON boundary data from the API response boundary_geojson = data[0]['geojson'] boundary_geojson = { "type": "FeatureCollection", "features": [ {"type": "Feature", "properties": {}, "geometry": boundary_geojson}] } return bbox, boundary_geojson else: return bbox def get_tiles_from_boundary(boundary_info, zoom=14): if boundary_info["bound_type"] == "auto_shape": # TODO: Instead of tiles from the big bbox, return tiles that hug the shape geojson_shape = boundary_info["shape"] # FIXME What to do when boundary is defined by multiple polygons!! # Visualization tool https://geojson.tools/ coords = geojson_shape["features"][0]["geometry"]["coordinates"] try: polygon = geojson.Polygon(coords) coordinates = turfpy.measurement.bbox(polygon) except: logger.warn(f"Boundary is defined by {len(coords)} polygons. Choosing first polygon blindly") polygon = geojson.Polygon(coords[0]) coordinates = turfpy.measurement.bbox(polygon) coordinates = dict(zip(["west", "south", "east", "north"], coordinates)) else: coordinates = boundary_info["bbox"] tiles = list( mercantile.tiles( **coordinates, zooms=zoom, ) ) return tiles