""" Contains the filters used to filter out images from the Mapillary API. """ import inspect import yaml from datetime import datetime from functools import partial import numpy as np import pandas as pd import shapely import shapely.geometry from shapely.prepared import prep from shapely import contains_xy from .. import logger def in_shape_filter(df: pd.DataFrame, geojson_shape): polygon = shapely.geometry.shape(geojson_shape["features"][0]["geometry"]) mask = contains_xy(polygon, x=df["geometry.long"], y=df["geometry.lat"]) return mask def value_range_filter(df: pd.DataFrame, key, from_v=None, to_v=None): c = df[key] if from_v is not None and to_v is not None: if from_v == to_v: return c == from_v else: return np.logical_and(c >= from_v, c <= to_v) elif from_v is not None: return c >= from_v elif to_v is not None: return c <= to_v else: raise Exception("from_v and to_v cannot both be None") def value_in_list_filter(df: pd.DataFrame, key, lst, exclude=False): mask = df[key].isin(lst) if exclude: mask = ~mask return mask def value_missing_filter(df: pd.DataFrame, keys): return np.all(df[keys].notna(), axis=1) def date_filter(df: pd.DataFrame, from_year=None, to_year=None): """ Args: before_year: integer representing the year after_year: integer representing the year """ if from_year is not None: from_year = int(datetime(from_year, 1, 1).timestamp())*1e3 if to_year is not None: to_year = int(datetime(to_year, 1, 1).timestamp())*1e3 return value_range_filter(df, "captured_at", from_year, to_year) def quality_score_filter(df: pd.DataFrame, from_score=None, to_score=None): return value_range_filter(df, "quality_score", from_v=from_score, to_v=to_score) def angle_dist(a1, a2): a = a1-a2 return np.abs((a + 180) % 360 - 180) def angle_discrip_filter(df: pd.DataFrame, thresh, less_than=True): """ Args: thresh: Threshold in degrees """ a1 = df["computed_compass_angle"] a2 = df["compass_angle"] diff = angle_dist(a1, a2) if less_than: return diff < thresh else: return diff > thresh def haversine_np(lon1, lat1, lon2, lat2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) All args must be of equal length. """ lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2]) dlon = lon2 - lon1 dlat = lat2 - lat1 a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2 c = 2 * np.arcsin(np.sqrt(a)) km = 6378.137 * c return km*1e3 def loc_discrip_filter(df: pd.DataFrame, thresh, less_than=True): """ Args: thresh: Threshold in meters """ lat1 = df["computed_geometry.lat"] lon1 = df["computed_geometry.long"] lat2 = df["geometry.lat"] lon2 = df["geometry.long"] diff = haversine_np(lon1, lat1, lon2, lat2) if less_than: return diff < thresh else: return diff > thresh def sequence_sparsity_filter(df: pd.DataFrame, dist_thresh): """ TODO This filter filters out images that are too close to each other within a sequence """ pass class Filter(): def __init__(self, filter_func, name=None, **kwargs): self.filter_func = filter_func self.name = name self.kwargs = kwargs def __call__(self, df: pd.DataFrame): return self.filter_func(df, **self.kwargs) def __str__(self) -> str: if self.name is None: tag = self.filter_func.__name__ else: tag = f"{self.filter_func.__name__}:{self.name}" return tag def __repr__(self): kwargs_fmt = ", ".join([f"{k}={v}" for k,v in self.kwargs.items()]) return f"{self.__str__()} | kwargs({kwargs_fmt})" class FilterPipeline(): def __init__(self, filters: list, sequential=True, name=None, verbose=True): """ Args: sequential: Whether to apply filters sequentially or compute the masks for all of them then apply once at the end. verbose: Whether to log the effect of each filter or not """ self.filters = filters self.sequential = sequential self.name = name self.verbose = verbose def __call__(self, df: pd.DataFrame): N = df.shape[0] if not self.sequential: running_mask = np.full(df.shape[0], True, dtype=bool) for f in self.filters: mask = f(df) if self.verbose: s = np.sum(mask) logger.info(f"{f} keeps {s}/{mask.shape[0]} ({s/mask.shape[0]*100:.2f}%) of the images") if self.sequential: df = df[mask] if df.shape[0] == 0: logger.warn("No images left during filtering.. Stopping pipeline") return df else: running_mask = np.logical_and(running_mask, mask) if not self.sequential: df = df[running_mask] logger.info(f"Filter Pipeline {self.name} kept {df.shape[0]}/{N} ({df.shape[0]/N*100:.2f}%) of the images") return df def __str__(self): return f"Pipeline {self.name}: " + "\n".join([str(x) for x in self.filters]) def __repr__(self): return f"Pipeline {self.name}: " + "\n".join([repr(x) for x in self.filters]) @staticmethod def load_from_yaml(file_path): def is_primitive(x): return isinstance(x, (float, int, bool, str)) with open(file_path, 'r') as stream: pipeline_dict = yaml.safe_load(stream)["filter_pipeline"] sig = inspect.signature(FilterPipeline.__init__) init_args = dict() for param in sig.parameters.values(): if param.name in pipeline_dict and is_primitive(pipeline_dict[param.name]): init_args[param.name] = pipeline_dict[param.name] filter_dicts = pipeline_dict["filters"] filters = list() for filter_dict in filter_dicts: filter_func_name, kwargs = list(filter_dict.items())[0] filter_func = globals()[filter_func_name] filters.append(Filter(filter_func=filter_func, **kwargs)) pipeline = FilterPipeline(filters, **init_args) return pipeline if __name__ == "__main__": FilterPipeline.load_from_yaml("mia/fpv/filter_pipelines/mia.yaml")