m7n's picture
first commit
d1ed09d
raw
history blame
22.5 kB
from __future__ import annotations
from itertools import count
import logging
from typing import TYPE_CHECKING
from toolz import unique, concat, pluck, get, memoize
from numba import literal_unroll
import numpy as np
import xarray as xr
from .antialias import AntialiasCombination
from .reductions import SpecialColumn, UsesCudaMutex, by, category_codes, summary
from .utils import (isnull, ngjit,
nanmax_in_place, nanmin_in_place, nansum_in_place, nanfirst_in_place, nanlast_in_place,
nanmax_n_in_place_3d, nanmax_n_in_place_4d, nanmin_n_in_place_3d, nanmin_n_in_place_4d,
nanfirst_n_in_place_3d, nanfirst_n_in_place_4d, nanlast_n_in_place_3d, nanlast_n_in_place_4d,
row_min_in_place, row_min_n_in_place_3d, row_min_n_in_place_4d,
row_max_in_place, row_max_n_in_place_3d, row_max_n_in_place_4d,
)
try:
from datashader.transfer_functions._cuda_utils import cuda_mutex_lock, cuda_mutex_unlock
except ImportError:
cuda_mutex_lock, cuda_mutex_unlock = None, None
if TYPE_CHECKING:
from datashader.antialias import UnzippedAntialiasStage2
__all__ = ['compile_components']
logger = logging.getLogger(__name__)
@memoize
def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, partitioned=False):
"""Given an ``Aggregation`` object and a schema, return 5 sub-functions
and information on how to perform the second stage aggregation if
antialiasing is requested,
Parameters
----------
agg : Aggregation
The expression describing the aggregation(s) to be computed.
schema : DataShape
Columns and dtypes in the source dataset.
glyph : Glyph
The glyph to render.
antialias : bool
Whether to render using antialiasing.
cuda : bool
Whether to render using CUDA (on the GPU) or CPU.
partitioned : bool
Whether the source dataset is partitioned using dask.
Returns
-------
A tuple of the following:
``create(shape)``
Function that takes the aggregate shape, and returns a tuple of
initialized numpy arrays.
``info(df, canvas_shape)``
Function that takes a dataframe, and returns preprocessed 1D numpy
arrays of the needed columns.
``append(i, x, y, *aggs_and_cols)``
Function that appends the ``i``th row of the table to the ``(x, y)``
bin, given the base arrays and columns in ``aggs_and_cols``. This does
the bulk of the work.
``combine(base_tuples)``
Function that combines a list of base tuples into a single base tuple.
This forms the reducing step in a reduction tree.
``finalize(aggs, cuda)``
Function that is given a tuple of base numpy arrays and returns the
finalized ``DataArray`` or ``Dataset``.
``antialias_stage_2``
If using antialiased lines this is a tuple of the ``AntialiasCombination``
values corresponding to the aggs. If not using antialiased lines then
this is ``False``.
``antialias_stage_2_funcs``
If using antialiased lines which require a second stage combine, this
is a tuple of the three combine functions which are the accumulate,
clear and copy_back functions. If not using antialiased lines then this
is ``None``.
``column_names``
Names of DataFrame columns or DataArray variables that are used by the
agg.
"""
reds = list(traverse_aggregation(agg))
# List of base reductions (actually computed)
bases = list(unique(concat(r._build_bases(cuda, partitioned) for r in reds)))
dshapes = [b.out_dshape(schema, antialias, cuda, partitioned) for b in bases]
# Information on how to perform second stage aggregation of antialiased lines,
# including whether antialiased lines self-intersect or not as we need a single
# value for this even for a compound reduction. This is by default True, but
# is False if a single constituent reduction requests it.
if antialias:
self_intersect, antialias_stage_2 = make_antialias_stage_2(reds, bases)
if cuda:
import cupy
array_module = cupy
else:
array_module = np
antialias_stage_2 = antialias_stage_2(array_module)
antialias_stage_2_funcs = make_antialias_stage_2_functions(antialias_stage_2, bases, cuda,
partitioned)
else:
self_intersect = False
antialias_stage_2 = False
antialias_stage_2_funcs = None
# List of tuples of
# (append, base, input columns, temps, combine temps, uses cuda mutex, is_categorical)
calls = [_get_call_tuples(b, d, schema, cuda, antialias, self_intersect, partitioned)
for (b, d) in zip(bases, dshapes)]
# List of unique column names needed, including nan_check_columns
cols = list(concat(pluck(2, calls)))
nan_check_cols = list(c[3] for c in calls if c[3] is not None)
cols = list(unique(cols + nan_check_cols))
# List of temps needed
temps = list(pluck(4, calls))
combine_temps = list(pluck(5, calls))
create = make_create(bases, dshapes, cuda)
append, any_uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias)
info = make_info(cols, cuda, any_uses_cuda_mutex)
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned)
finalize = make_finalize(bases, agg, schema, cuda, partitioned)
column_names = [c.column for c in cols if c.column != SpecialColumn.RowIndex]
return create, info, append, combine, finalize, antialias_stage_2, antialias_stage_2_funcs, \
column_names
def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero: float,
n_reduction: bool, categorical: bool):
if n_reduction:
if zero == -1:
if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST):
return row_max_n_in_place_4d if categorical else row_max_n_in_place_3d
elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST):
return row_min_n_in_place_4d if categorical else row_min_n_in_place_3d
else:
raise NotImplementedError
else:
if combination == AntialiasCombination.MAX:
return nanmax_n_in_place_4d if categorical else nanmax_n_in_place_3d
elif combination == AntialiasCombination.MIN:
return nanmin_n_in_place_4d if categorical else nanmin_n_in_place_3d
elif combination == AntialiasCombination.FIRST:
return nanfirst_n_in_place_4d if categorical else nanfirst_n_in_place_3d
elif combination == AntialiasCombination.LAST:
return nanlast_n_in_place_4d if categorical else nanlast_n_in_place_3d
else:
raise NotImplementedError
else:
# The aggs to combine here are either 3D (ny, nx, ncat) if categorical is True or
# 2D (ny, nx) if categorical is False. The same combination functions can be for both
# as all elements are independent.
if zero == -1:
if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST):
return row_max_in_place
elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST):
return row_min_in_place
else:
raise NotImplementedError
else:
if combination == AntialiasCombination.MAX:
return nanmax_in_place
elif combination == AntialiasCombination.MIN:
return nanmin_in_place
elif combination == AntialiasCombination.FIRST:
return nanfirst_in_place
elif combination == AntialiasCombination.LAST:
return nanlast_in_place
else:
return nansum_in_place
def make_antialias_stage_2_functions(antialias_stage_2, bases, cuda, partitioned):
aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical = antialias_stage_2
# Accumulate functions.
funcs = [_get_antialias_stage_2_combine_func(comb, zero, n_red, cat) for comb, zero, n_red, cat
in zip(aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical)]
base_is_where = [b.is_where() for b in bases]
next_base_is_where = base_is_where[1:] + [False]
namespace = {}
namespace["literal_unroll"] = literal_unroll
for func in set(funcs):
namespace[func.__name__] = func
# Generator of unique names for combine functions
names = (f"combine{i}" for i in count())
# aa_stage_2_accumulate
lines = [
"def aa_stage_2_accumulate(aggs_and_copies, first_pass):",
# Don't need to accumulate if first_pass, just copy (opposite of aa_stage_2_copy_back)
" if first_pass:",
" for a in literal_unroll(aggs_and_copies):",
" a[1][:] = a[0][:]",
" else:",
]
for i, (func, is_where, next_is_where) in enumerate(zip(funcs, base_is_where,
next_base_is_where)):
if is_where:
where_reduction = bases[i]
if isinstance(where_reduction, by):
where_reduction = where_reduction.reduction
combine = where_reduction._combine_callback(cuda, partitioned, aa_categorical[i])
name = next(names) # Unique name
namespace[name] = combine
lines.append(
f" {name}(aggs_and_copies[{i}][::-1], aggs_and_copies[{i-1}][::-1])")
elif next_is_where:
# This is dealt with as part of the following base which is a where reduction.
pass
else:
lines.append(
f" {func.__name__}(aggs_and_copies[{i}][1], aggs_and_copies[{i}][0])")
code = "\n".join(lines)
logger.debug(code)
exec(code, namespace)
aa_stage_2_accumulate = ngjit(namespace["aa_stage_2_accumulate"])
# aa_stage_2_clear
if np.any(np.isnan(aa_zeroes)):
namespace["nan"] = np.nan
lines = ["def aa_stage_2_clear(aggs_and_copies):"]
for i, aa_zero in enumerate(aa_zeroes):
lines.append(f" aggs_and_copies[{i}][0].fill({aa_zero})")
code = "\n".join(lines)
logger.debug(code)
exec(code, namespace)
aa_stage_2_clear = ngjit(namespace["aa_stage_2_clear"])
# aa_stage_2_copy_back
@ngjit
def aa_stage_2_copy_back(aggs_and_copies):
# Numba access to heterogeneous tuples is only permitted using literal_unroll.
for agg_and_copy in literal_unroll(aggs_and_copies):
agg_and_copy[0][:] = agg_and_copy[1][:]
return aa_stage_2_accumulate, aa_stage_2_clear, aa_stage_2_copy_back
def traverse_aggregation(agg):
"""Yield a left->right traversal of an aggregation"""
if isinstance(agg, summary):
for a in agg.values:
for a2 in traverse_aggregation(a):
yield a2
else:
yield agg
def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect, partitioned):
# Comments refer to usage in make_append()
return (
base._build_append(dshape, schema, cuda, antialias, self_intersect), # func
(base,), # bases
base.inputs, # cols, arrays of these are passed to reduction append functions
base.nan_check_column, # column used to check for NaNs in some where reductions
base._build_temps(cuda), # temps
base._build_combine_temps(cuda, partitioned), # combine temps
base.uses_cuda_mutex() if cuda else UsesCudaMutex.No, # uses cuda mutex
base.is_categorical(),
)
def make_create(bases, dshapes, cuda):
creators = [b._build_create(d) for (b, d) in zip(bases, dshapes)]
if cuda:
import cupy
array_module = cupy
else:
array_module = np
return lambda shape: tuple(c(shape, array_module) for c in creators)
def make_info(cols, cuda, uses_cuda_mutex: bool):
def info(df, canvas_shape):
ret = tuple(c.apply(df, cuda) for c in cols)
if uses_cuda_mutex:
import cupy # Guaranteed to be available if uses_cuda_mutex is True
import numba
from packaging.version import Version
if Version(numba.__version__) >= Version("0.57"):
mutex_array = cupy.zeros(canvas_shape, dtype=np.uint32)
else:
mutex_array = cupy.zeros((1,), dtype=np.uint32)
ret += (mutex_array,)
return ret
return info
def make_append(bases, cols, calls, glyph, antialias):
names = ('_{0}'.format(i) for i in count())
inputs = list(bases) + list(cols)
namespace = {}
need_isnull = any(call[3] for call in calls)
if need_isnull:
namespace["isnull"] = isnull
global_cuda_mutex = any(call[6] == UsesCudaMutex.Global for call in calls)
any_uses_cuda_mutex = any(call[6] != UsesCudaMutex.No for call in calls)
if any_uses_cuda_mutex:
# This adds an argument to the append() function that is the cuda mutex
# generated in make_info.
inputs += ["_cuda_mutex"]
namespace["cuda_mutex_lock"] = cuda_mutex_lock
namespace["cuda_mutex_unlock"] = cuda_mutex_unlock
signature = [next(names) for i in inputs]
arg_lk = dict(zip(inputs, signature))
local_lk = {}
head = []
body = []
ndims = glyph.ndims
if ndims is not None:
subscript = ', '.join(['i' + str(n) for n in range(ndims)])
else:
subscript = None
prev_local_cuda_mutex = False
categorical_args = {} # Reuse categorical arguments if used in more than one reduction
where_selectors = {} # Reuse where.selector if used more than once in a summary reduction
if logger.isEnabledFor(logging.DEBUG): # mostly does nothing...
logger.debug(f"global_cuda_mutex {global_cuda_mutex}")
logger.debug(f"any_uses_cuda_mutex {any_uses_cuda_mutex}")
for k, v in arg_lk.items():
logger.debug(f"arg_lk {v} {k} {getattr(k, 'column', None)}")
def get_cuda_mutex_call(lock: bool) -> str:
func = "cuda_mutex_lock" if lock else "cuda_mutex_unlock"
return f'{func}({arg_lk["_cuda_mutex"]}, (y, x))'
for index, (func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex, categorical) \
in enumerate(calls):
local_cuda_mutex = not global_cuda_mutex and uses_cuda_mutex == UsesCudaMutex.Local
local_lk.update(zip(temps, (next(names) for i in temps)))
func_name = next(names)
logger.debug(f"func {func_name} {func}")
namespace[func_name] = func
args = [arg_lk[i] for i in bases]
if categorical and isinstance(cols[0], category_codes):
args.extend('{0}[{1}]'.format(arg_lk[col], subscript) for col in cols[1:])
elif ndims is None:
args.extend('{0}'.format(arg_lk[i]) for i in cols)
elif categorical:
args.extend('{0}[{1}][1]'.format(arg_lk[i], subscript)
for i in cols)
else:
args.extend('{0}[{1}]'.format(arg_lk[i], subscript)
for i in cols)
if categorical:
# Categorical aggregate arrays need to be unpacked
categorical_arg = arg_lk[cols[0]]
cat_name = categorical_args.get(categorical_arg, None)
if cat_name is None:
# Each categorical column only needs to be unpacked once
col_index = '' if isinstance(cols[0], category_codes) else '[0]'
cat_name = f'cat{next(names)}'
categorical_args[categorical_arg] = cat_name
head.append(f'{cat_name} = int({categorical_arg}[{subscript}]{col_index})')
arg = signature[index]
head.append(f'{arg} = {arg}[:, :, {cat_name}]')
args.extend([local_lk[i] for i in temps])
if antialias:
args += ["aa_factor", "prev_aa_factor"]
if local_cuda_mutex and prev_local_cuda_mutex:
# Avoid unnecessary mutex unlock and lock cycle
body.pop()
is_where = len(bases) == 1 and bases[0].is_where()
if is_where:
where_reduction = bases[0]
if isinstance(where_reduction, by):
where_reduction = where_reduction.reduction
selector_hash = hash(where_reduction.selector)
update_index_arg_name = where_selectors.get(selector_hash, None)
new_selector = update_index_arg_name is None
if new_selector:
update_index_arg_name = next(names)
where_selectors[selector_hash] = update_index_arg_name
args.append(update_index_arg_name)
# where reduction needs access to the return of the contained
# reduction, which is the preceding one here.
prev_body = body.pop()
if local_cuda_mutex and not prev_local_cuda_mutex:
body.append(get_cuda_mutex_call(True))
if new_selector:
body.append(f'{update_index_arg_name} = {prev_body}')
else:
body.append(prev_body)
# If nan_check_column is defined then need to check if value of
# correct row in that column is NaN and if so do nothing. This
# check needs to occur before the where.selector is called.
if nan_check_column is None:
whitespace = ''
else:
var = f"{arg_lk[nan_check_column]}[{subscript}]"
prev_body = body[-1]
body[-1] = f'if not isnull({var}):'
body.append(f' {prev_body}')
whitespace = ' '
body.append(f'{whitespace}if {update_index_arg_name} >= 0:')
body.append(f' {whitespace}{func_name}(x, y, {", ".join(args)})')
else:
if local_cuda_mutex and not prev_local_cuda_mutex:
body.append(get_cuda_mutex_call(True))
if nan_check_column:
var = f"{arg_lk[nan_check_column]}[{subscript}]"
body.append(f'if not isnull({var}):')
body.append(f' {func_name}(x, y, {", ".join(args)})')
else:
body.append(f'{func_name}(x, y, {", ".join(args)})')
if local_cuda_mutex:
body.append(get_cuda_mutex_call(False))
prev_local_cuda_mutex = local_cuda_mutex
body = head + ['{0} = {1}[y, x]'.format(name, arg_lk[agg])
for agg, name in local_lk.items()] + body
if global_cuda_mutex:
body = [get_cuda_mutex_call(True)] + body + [get_cuda_mutex_call(False)]
if antialias:
signature = ["aa_factor", "prev_aa_factor"] + signature
if ndims is None:
code = ('def append(x, y, {0}):\n'
' {1}').format(', '.join(signature), '\n '.join(body))
else:
code = ('def append({0}, x, y, {1}):\n'
' {2}'
).format(subscript, ', '.join(signature), '\n '.join(body))
logger.debug(code)
exec(code, namespace)
return ngjit(namespace['append']), any_uses_cuda_mutex
def make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned):
# Lookup of base Reduction to argument index.
arg_lk = dict((k, v) for (v, k) in enumerate(bases))
# Also need lookup of by.reduction as the contained reduction is not aware of its wrapper.
arg_lk.update(dict((k.reduction, v) for (v, k) in enumerate(bases) if isinstance(k, by)))
# where._combine() deals with combine of preceding reduction so exclude
# it from explicit combine calls.
base_is_where = [b.is_where() for b in bases]
next_base_is_where = base_is_where[1:] + [False]
calls = [(None if n else b._build_combine(d, antialias, cuda, partitioned),
[arg_lk[i] for i in (b,) + t + ct])
for (b, d, t, ct, n) in zip(bases, dshapes, temps, combine_temps, next_base_is_where)]
def combine(base_tuples):
bases = tuple(np.stack(bs) for bs in zip(*base_tuples))
ret = []
for is_where, (func, inds) in zip(base_is_where, calls):
if func is None:
continue
call = func(*get(inds, bases))
if is_where:
# Separate aggs of where reduction and its selector,
# selector's goes first to match order of bases.
ret.extend(call[::-1])
else:
ret.append(call)
return tuple(ret)
return combine
def make_finalize(bases, agg, schema, cuda, partitioned):
arg_lk = dict((k, v) for (v, k) in enumerate(bases))
if isinstance(agg, summary):
calls = []
for key, val in zip(agg.keys, agg.values):
f = make_finalize(bases, val, schema, cuda, partitioned)
try:
# Override bases if possible
bases = val._build_bases(cuda, partitioned)
except AttributeError:
pass
inds = [arg_lk[b] for b in bases]
calls.append((key, f, inds))
def finalize(bases, cuda=False, **kwargs):
data = {key: finalizer(get(inds, bases), cuda, **kwargs)
for (key, finalizer, inds) in calls}
# Copy x and y range attrs from any DataArray (their ranges are all the same)
# to set on parent Dataset
name = agg.keys[0] # Name of first DataArray.
attrs = {attr: data[name].attrs[attr] for attr in ('x_range', 'y_range')}
return xr.Dataset(data, attrs=attrs)
return finalize
else:
return agg._build_finalize(schema)
def make_antialias_stage_2(reds, bases):
# Only called if antialias is True.
# Prefer a single-stage antialiased aggregation, but if any requested
# reduction requires two stages then force use of two for all reductions.
self_intersect = True
for red in reds:
if red._antialias_requires_2_stages():
self_intersect = False
break
def antialias_stage_2(array_module) -> UnzippedAntialiasStage2:
return tuple(zip(*concat(b._antialias_stage_2(self_intersect, array_module)
for b in bases)))
return self_intersect, antialias_stage_2