|
""" |
|
Contains the filters used to filter out images from the Mapillary API. |
|
""" |
|
|
|
import inspect |
|
import yaml |
|
from datetime import datetime |
|
from functools import partial |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import shapely |
|
import shapely.geometry |
|
from shapely.prepared import prep |
|
from shapely import contains_xy |
|
|
|
from .. import logger |
|
|
|
def in_shape_filter(df: pd.DataFrame, geojson_shape): |
|
polygon = shapely.geometry.shape(geojson_shape["features"][0]["geometry"]) |
|
mask = contains_xy(polygon, x=df["geometry.long"], y=df["geometry.lat"]) |
|
return mask |
|
|
|
def value_range_filter(df: pd.DataFrame, key, from_v=None, to_v=None): |
|
c = df[key] |
|
if from_v is not None and to_v is not None: |
|
if from_v == to_v: |
|
return c == from_v |
|
else: |
|
return np.logical_and(c >= from_v, c <= to_v) |
|
elif from_v is not None: |
|
return c >= from_v |
|
elif to_v is not None: |
|
return c <= to_v |
|
else: |
|
raise Exception("from_v and to_v cannot both be None") |
|
|
|
def value_in_list_filter(df: pd.DataFrame, key, lst, exclude=False): |
|
mask = df[key].isin(lst) |
|
if exclude: |
|
mask = ~mask |
|
return mask |
|
|
|
|
|
def value_missing_filter(df: pd.DataFrame, keys): |
|
return np.all(df[keys].notna(), axis=1) |
|
|
|
|
|
def date_filter(df: pd.DataFrame, from_year=None, to_year=None): |
|
""" |
|
Args: |
|
before_year: integer representing the year |
|
after_year: integer representing the year |
|
""" |
|
if from_year is not None: |
|
from_year = int(datetime(from_year, 1, 1).timestamp())*1e3 |
|
if to_year is not None: |
|
to_year = int(datetime(to_year, 1, 1).timestamp())*1e3 |
|
return value_range_filter(df, "captured_at", from_year, to_year) |
|
|
|
def quality_score_filter(df: pd.DataFrame, from_score=None, to_score=None): |
|
return value_range_filter(df, "quality_score", from_v=from_score, to_v=to_score) |
|
|
|
def angle_dist(a1, a2): |
|
a = a1-a2 |
|
return np.abs((a + 180) % 360 - 180) |
|
|
|
def angle_discrip_filter(df: pd.DataFrame, thresh, less_than=True): |
|
""" |
|
Args: |
|
thresh: Threshold in degrees |
|
""" |
|
a1 = df["computed_compass_angle"] |
|
a2 = df["compass_angle"] |
|
|
|
diff = angle_dist(a1, a2) |
|
|
|
if less_than: |
|
return diff < thresh |
|
else: |
|
return diff > thresh |
|
|
|
def haversine_np(lon1, lat1, lon2, lat2): |
|
""" |
|
Calculate the great circle distance between two points |
|
on the earth (specified in decimal degrees) |
|
|
|
All args must be of equal length. |
|
|
|
""" |
|
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2]) |
|
|
|
dlon = lon2 - lon1 |
|
dlat = lat2 - lat1 |
|
|
|
a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2 |
|
|
|
c = 2 * np.arcsin(np.sqrt(a)) |
|
km = 6378.137 * c |
|
return km*1e3 |
|
|
|
def loc_discrip_filter(df: pd.DataFrame, thresh, less_than=True): |
|
""" |
|
Args: |
|
thresh: Threshold in meters |
|
""" |
|
lat1 = df["computed_geometry.lat"] |
|
lon1 = df["computed_geometry.long"] |
|
lat2 = df["geometry.lat"] |
|
lon2 = df["geometry.long"] |
|
diff = haversine_np(lon1, lat1, lon2, lat2) |
|
if less_than: |
|
return diff < thresh |
|
else: |
|
return diff > thresh |
|
|
|
def sequence_sparsity_filter(df: pd.DataFrame, dist_thresh): |
|
""" |
|
TODO |
|
This filter filters out images that are too close to each other within a sequence |
|
""" |
|
pass |
|
|
|
|
|
class Filter(): |
|
def __init__(self, filter_func, name=None, **kwargs): |
|
self.filter_func = filter_func |
|
self.name = name |
|
self.kwargs = kwargs |
|
|
|
def __call__(self, df: pd.DataFrame): |
|
return self.filter_func(df, **self.kwargs) |
|
|
|
def __str__(self) -> str: |
|
if self.name is None: |
|
tag = self.filter_func.__name__ |
|
else: |
|
tag = f"{self.filter_func.__name__}:{self.name}" |
|
return tag |
|
|
|
def __repr__(self): |
|
kwargs_fmt = ", ".join([f"{k}={v}" for k,v in self.kwargs.items()]) |
|
return f"{self.__str__()} | kwargs({kwargs_fmt})" |
|
|
|
|
|
class FilterPipeline(): |
|
def __init__(self, filters: list, sequential=True, name=None, verbose=True): |
|
""" |
|
Args: |
|
sequential: Whether to apply filters sequentially or compute the masks |
|
for all of them then apply once at the end. |
|
verbose: Whether to log the effect of each filter or not |
|
""" |
|
self.filters = filters |
|
self.sequential = sequential |
|
self.name = name |
|
self.verbose = verbose |
|
|
|
def __call__(self, df: pd.DataFrame): |
|
N = df.shape[0] |
|
if not self.sequential: |
|
running_mask = np.full(df.shape[0], True, dtype=bool) |
|
|
|
for f in self.filters: |
|
mask = f(df) |
|
if self.verbose: |
|
s = np.sum(mask) |
|
logger.info(f"{f} keeps {s}/{mask.shape[0]} ({s/mask.shape[0]*100:.2f}%) of the images") |
|
|
|
if self.sequential: |
|
df = df[mask] |
|
if df.shape[0] == 0: |
|
logger.warn("No images left during filtering.. Stopping pipeline") |
|
return df |
|
else: |
|
running_mask = np.logical_and(running_mask, mask) |
|
|
|
if not self.sequential: |
|
df = df[running_mask] |
|
|
|
logger.info(f"Filter Pipeline {self.name} kept {df.shape[0]}/{N} ({df.shape[0]/N*100:.2f}%) of the images") |
|
return df |
|
|
|
def __str__(self): |
|
return f"Pipeline {self.name}: " + "\n".join([str(x) for x in self.filters]) |
|
|
|
def __repr__(self): |
|
return f"Pipeline {self.name}: " + "\n".join([repr(x) for x in self.filters]) |
|
|
|
@staticmethod |
|
def load_from_yaml(file_path): |
|
def is_primitive(x): |
|
return isinstance(x, (float, int, bool, str)) |
|
|
|
with open(file_path, 'r') as stream: |
|
pipeline_dict = yaml.safe_load(stream)["filter_pipeline"] |
|
|
|
sig = inspect.signature(FilterPipeline.__init__) |
|
init_args = dict() |
|
for param in sig.parameters.values(): |
|
if param.name in pipeline_dict and is_primitive(pipeline_dict[param.name]): |
|
init_args[param.name] = pipeline_dict[param.name] |
|
|
|
filter_dicts = pipeline_dict["filters"] |
|
filters = list() |
|
|
|
for filter_dict in filter_dicts: |
|
filter_func_name, kwargs = list(filter_dict.items())[0] |
|
filter_func = globals()[filter_func_name] |
|
filters.append(Filter(filter_func=filter_func, **kwargs)) |
|
|
|
pipeline = FilterPipeline(filters, **init_args) |
|
return pipeline |
|
|
|
if __name__ == "__main__": |
|
FilterPipeline.load_from_yaml("mia/fpv/filter_pipelines/mia.yaml") |