""" |
Contains the filters used to filter out images from the Mapillary API. |
""" |
import inspect |
import yaml |
from datetime import datetime |
from functools import partial |
import numpy as np |
import pandas as pd |
import shapely |
import shapely.geometry |
from shapely.prepared import prep |
from shapely import contains_xy |
from .. import logger |
def in_shape_filter(df: pd.DataFrame, geojson_shape): |
polygon = shapely.geometry.shape(geojson_shape["features"][0]["geometry"]) |
mask = contains_xy(polygon, x=df["geometry.long"], y=df["geometry.lat"]) |
return mask |
def value_range_filter(df: pd.DataFrame, key, from_v=None, to_v=None): |
c = df[key] |
if from_v is not None and to_v is not None: |
if from_v == to_v: |
return c == from_v |
else: |
return np.logical_and(c >= from_v, c <= to_v) |
elif from_v is not None: |
return c >= from_v |
elif to_v is not None: |
return c <= to_v |
else: |
raise Exception("from_v and to_v cannot both be None") |
def value_in_list_filter(df: pd.DataFrame, key, lst, exclude=False): |
mask = df[key].isin(lst) |
if exclude: |
mask = ~mask |
return mask |
def value_missing_filter(df: pd.DataFrame, keys): |
return np.all(df[keys].notna(), axis=1) |
def date_filter(df: pd.DataFrame, from_year=None, to_year=None): |
""" |
Args: |
before_year: integer representing the year |
after_year: integer representing the year |
""" |
if from_year is not None: |
from_year = int(datetime(from_year, 1, 1).timestamp())*1e3 |
if to_year is not None: |
to_year = int(datetime(to_year, 1, 1).timestamp())*1e3 |
return value_range_filter(df, "captured_at", from_year, to_year) |
def quality_score_filter(df: pd.DataFrame, from_score=None, to_score=None): |
return value_range_filter(df, "quality_score", from_v=from_score, to_v=to_score) |
def angle_dist(a1, a2): |
a = a1-a2 |
return np.abs((a + 180) % 360 - 180) |
def angle_discrip_filter(df: pd.DataFrame, thresh, less_than=True): |
""" |
Args: |
thresh: Threshold in degrees |
""" |
a1 = df["computed_compass_angle"] |
a2 = df["compass_angle"] |
diff = angle_dist(a1, a2) |
if less_than: |
return diff < thresh |
else: |
return diff > thresh |
def haversine_np(lon1, lat1, lon2, lat2): |
""" |
Calculate the great circle distance between two points |
on the earth (specified in decimal degrees) |
All args must be of equal length. |
""" |
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2]) |
dlon = lon2 - lon1 |
dlat = lat2 - lat1 |
a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2 |
c = 2 * np.arcsin(np.sqrt(a)) |
km = 6378.137 * c |
return km*1e3 |
def loc_discrip_filter(df: pd.DataFrame, thresh, less_than=True): |
""" |
Args: |
thresh: Threshold in meters |
""" |
lat1 = df["computed_geometry.lat"] |
lon1 = df["computed_geometry.long"] |
lat2 = df["geometry.lat"] |
lon2 = df["geometry.long"] |
diff = haversine_np(lon1, lat1, lon2, lat2) |
if less_than: |
return diff < thresh |
else: |
return diff > thresh |
def sequence_sparsity_filter(df: pd.DataFrame, dist_thresh): |
""" |
This filter filters out images that are too close to each other within a sequence |
""" |
pass |
class Filter(): |
def __init__(self, filter_func, name=None, **kwargs): |
self.filter_func = filter_func |
self.name = name |
self.kwargs = kwargs |
def __call__(self, df: pd.DataFrame): |
return self.filter_func(df, **self.kwargs) |
def __str__(self) -> str: |
if self.name is None: |
tag = self.filter_func.__name__ |
else: |
tag = f"{self.filter_func.__name__}:{self.name}" |
return tag |
def __repr__(self): |
kwargs_fmt = ", ".join([f"{k}={v}" for k,v in self.kwargs.items()]) |
return f"{self.__str__()} | kwargs({kwargs_fmt})" |
class FilterPipeline(): |
def __init__(self, filters: list, sequential=True, name=None, verbose=True): |
""" |
Args: |
sequential: Whether to apply filters sequentially or compute the masks |
for all of them then apply once at the end. |
verbose: Whether to log the effect of each filter or not |
""" |
self.filters = filters |
self.sequential = sequential |
self.name = name |
self.verbose = verbose |
def __call__(self, df: pd.DataFrame): |
N = df.shape[0] |
if not self.sequential: |
running_mask = np.full(df.shape[0], True, dtype=bool) |
for f in self.filters: |
mask = f(df) |
if self.verbose: |
s = np.sum(mask) |
logger.info(f"{f} keeps {s}/{mask.shape[0]} ({s/mask.shape[0]*100:.2f}%) of the images") |
if self.sequential: |
df = df[mask] |
if df.shape[0] == 0: |
logger.warn("No images left during filtering.. Stopping pipeline") |
return df |
else: |
running_mask = np.logical_and(running_mask, mask) |
if not self.sequential: |
df = df[running_mask] |
logger.info(f"Filter Pipeline {self.name} kept {df.shape[0]}/{N} ({df.shape[0]/N*100:.2f}%) of the images") |
return df |
def __str__(self): |
return f"Pipeline {self.name}: " + "\n".join([str(x) for x in self.filters]) |
def __repr__(self): |
return f"Pipeline {self.name}: " + "\n".join([repr(x) for x in self.filters]) |
@staticmethod |
def load_from_yaml(file_path): |
def is_primitive(x): |
return isinstance(x, (float, int, bool, str)) |
with open(file_path, 'r') as stream: |
pipeline_dict = yaml.safe_load(stream)["filter_pipeline"] |
sig = inspect.signature(FilterPipeline.__init__) |
init_args = dict() |
for param in sig.parameters.values(): |
if param.name in pipeline_dict and is_primitive(pipeline_dict[param.name]): |
init_args[param.name] = pipeline_dict[param.name] |
filter_dicts = pipeline_dict["filters"] |
filters = list() |
for filter_dict in filter_dicts: |
filter_func_name, kwargs = list(filter_dict.items())[0] |
filter_func = globals()[filter_func_name] |
filters.append(Filter(filter_func=filter_func, **kwargs)) |
pipeline = FilterPipeline(filters, **init_args) |
return pipeline |
if __name__ == "__main__": |
FilterPipeline.load_from_yaml("mia/fpv/filter_pipelines/mia.yaml") |