|
""" |
|
Cycler |
|
====== |
|
|
|
Cycling through combinations of values, producing dictionaries. |
|
|
|
You can add cyclers:: |
|
|
|
from cycler import cycler |
|
cc = (cycler(color=list('rgb')) + |
|
cycler(linestyle=['-', '--', '-.'])) |
|
for d in cc: |
|
print(d) |
|
|
|
Results in:: |
|
|
|
{'color': 'r', 'linestyle': '-'} |
|
{'color': 'g', 'linestyle': '--'} |
|
{'color': 'b', 'linestyle': '-.'} |
|
|
|
|
|
You can multiply cyclers:: |
|
|
|
from cycler import cycler |
|
cc = (cycler(color=list('rgb')) * |
|
cycler(linestyle=['-', '--', '-.'])) |
|
for d in cc: |
|
print(d) |
|
|
|
Results in:: |
|
|
|
{'color': 'r', 'linestyle': '-'} |
|
{'color': 'r', 'linestyle': '--'} |
|
{'color': 'r', 'linestyle': '-.'} |
|
{'color': 'g', 'linestyle': '-'} |
|
{'color': 'g', 'linestyle': '--'} |
|
{'color': 'g', 'linestyle': '-.'} |
|
{'color': 'b', 'linestyle': '-'} |
|
{'color': 'b', 'linestyle': '--'} |
|
{'color': 'b', 'linestyle': '-.'} |
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
from collections.abc import Hashable, Iterable, Generator |
|
import copy |
|
from functools import reduce |
|
from itertools import product, cycle |
|
from operator import mul, add |
|
|
|
from typing import TypeVar, Generic, Callable, Union, Dict, List, Any, overload, cast |
|
|
|
__version__ = "0.12.1" |
|
|
|
K = TypeVar("K", bound=Hashable) |
|
L = TypeVar("L", bound=Hashable) |
|
V = TypeVar("V") |
|
U = TypeVar("U") |
|
|
|
|
|
def _process_keys( |
|
left: Cycler[K, V] | Iterable[dict[K, V]], |
|
right: Cycler[K, V] | Iterable[dict[K, V]] | None, |
|
) -> set[K]: |
|
""" |
|
Helper function to compose cycler keys. |
|
|
|
Parameters |
|
---------- |
|
left, right : iterable of dictionaries or None |
|
The cyclers to be composed. |
|
|
|
Returns |
|
------- |
|
keys : set |
|
The keys in the composition of the two cyclers. |
|
""" |
|
l_peek: dict[K, V] = next(iter(left)) if left != [] else {} |
|
r_peek: dict[K, V] = next(iter(right)) if right is not None else {} |
|
l_key: set[K] = set(l_peek.keys()) |
|
r_key: set[K] = set(r_peek.keys()) |
|
if l_key & r_key: |
|
raise ValueError("Can not compose overlapping cycles") |
|
return l_key | r_key |
|
|
|
|
|
def concat(left: Cycler[K, V], right: Cycler[K, U]) -> Cycler[K, V | U]: |
|
r""" |
|
Concatenate `Cycler`\s, as if chained using `itertools.chain`. |
|
|
|
The keys must match exactly. |
|
|
|
Examples |
|
-------- |
|
>>> num = cycler('a', range(3)) |
|
>>> let = cycler('a', 'abc') |
|
>>> num.concat(let) |
|
cycler('a', [0, 1, 2, 'a', 'b', 'c']) |
|
|
|
Returns |
|
------- |
|
`Cycler` |
|
The concatenated cycler. |
|
""" |
|
if left.keys != right.keys: |
|
raise ValueError( |
|
"Keys do not match:\n" |
|
"\tIntersection: {both!r}\n" |
|
"\tDisjoint: {just_one!r}".format( |
|
both=left.keys & right.keys, just_one=left.keys ^ right.keys |
|
) |
|
) |
|
_l = cast(Dict[K, List[Union[V, U]]], left.by_key()) |
|
_r = cast(Dict[K, List[Union[V, U]]], right.by_key()) |
|
return reduce(add, (_cycler(k, _l[k] + _r[k]) for k in left.keys)) |
|
|
|
|
|
class Cycler(Generic[K, V]): |
|
""" |
|
Composable cycles. |
|
|
|
This class has compositions methods: |
|
|
|
``+`` |
|
for 'inner' products (zip) |
|
|
|
``+=`` |
|
in-place ``+`` |
|
|
|
``*`` |
|
for outer products (`itertools.product`) and integer multiplication |
|
|
|
``*=`` |
|
in-place ``*`` |
|
|
|
and supports basic slicing via ``[]``. |
|
|
|
Parameters |
|
---------- |
|
left, right : Cycler or None |
|
The 'left' and 'right' cyclers. |
|
op : func or None |
|
Function which composes the 'left' and 'right' cyclers. |
|
""" |
|
|
|
def __call__(self): |
|
return cycle(self) |
|
|
|
def __init__( |
|
self, |
|
left: Cycler[K, V] | Iterable[dict[K, V]] | None, |
|
right: Cycler[K, V] | None = None, |
|
op: Any = None, |
|
): |
|
""" |
|
Semi-private init. |
|
|
|
Do not use this directly, use `cycler` function instead. |
|
""" |
|
if isinstance(left, Cycler): |
|
self._left: Cycler[K, V] | list[dict[K, V]] = Cycler( |
|
left._left, left._right, left._op |
|
) |
|
elif left is not None: |
|
|
|
|
|
self._left = [copy.copy(v) for v in left] |
|
else: |
|
self._left = [] |
|
|
|
if isinstance(right, Cycler): |
|
self._right: Cycler[K, V] | None = Cycler( |
|
right._left, right._right, right._op |
|
) |
|
else: |
|
self._right = None |
|
|
|
self._keys: set[K] = _process_keys(self._left, self._right) |
|
self._op: Any = op |
|
|
|
def __contains__(self, k): |
|
return k in self._keys |
|
|
|
@property |
|
def keys(self) -> set[K]: |
|
"""The keys this Cycler knows about.""" |
|
return set(self._keys) |
|
|
|
def change_key(self, old: K, new: K) -> None: |
|
""" |
|
Change a key in this cycler to a new name. |
|
Modification is performed in-place. |
|
|
|
Does nothing if the old key is the same as the new key. |
|
Raises a ValueError if the new key is already a key. |
|
Raises a KeyError if the old key isn't a key. |
|
""" |
|
if old == new: |
|
return |
|
if new in self._keys: |
|
raise ValueError( |
|
f"Can't replace {old} with {new}, {new} is already a key" |
|
) |
|
if old not in self._keys: |
|
raise KeyError( |
|
f"Can't replace {old} with {new}, {old} is not a key" |
|
) |
|
|
|
self._keys.remove(old) |
|
self._keys.add(new) |
|
|
|
if self._right is not None and old in self._right.keys: |
|
self._right.change_key(old, new) |
|
|
|
|
|
|
|
elif isinstance(self._left, Cycler): |
|
self._left.change_key(old, new) |
|
else: |
|
|
|
|
|
|
|
self._left = [{new: entry[old]} for entry in self._left] |
|
|
|
@classmethod |
|
def _from_iter(cls, label: K, itr: Iterable[V]) -> Cycler[K, V]: |
|
""" |
|
Class method to create 'base' Cycler objects |
|
that do not have a 'right' or 'op' and for which |
|
the 'left' object is not another Cycler. |
|
|
|
Parameters |
|
---------- |
|
label : hashable |
|
The property key. |
|
|
|
itr : iterable |
|
Finite length iterable of the property values. |
|
|
|
Returns |
|
------- |
|
`Cycler` |
|
New 'base' cycler. |
|
""" |
|
ret: Cycler[K, V] = cls(None) |
|
ret._left = list({label: v} for v in itr) |
|
ret._keys = {label} |
|
return ret |
|
|
|
def __getitem__(self, key: slice) -> Cycler[K, V]: |
|
|
|
if isinstance(key, slice): |
|
trans = self.by_key() |
|
return reduce(add, (_cycler(k, v[key]) for k, v in trans.items())) |
|
else: |
|
raise ValueError("Can only use slices with Cycler.__getitem__") |
|
|
|
def __iter__(self) -> Generator[dict[K, V], None, None]: |
|
if self._right is None: |
|
for left in self._left: |
|
yield dict(left) |
|
else: |
|
if self._op is None: |
|
raise TypeError( |
|
"Operation cannot be None when both left and right are defined" |
|
) |
|
for a, b in self._op(self._left, self._right): |
|
out = {} |
|
out.update(a) |
|
out.update(b) |
|
yield out |
|
|
|
def __add__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: |
|
""" |
|
Pair-wise combine two equal length cyclers (zip). |
|
|
|
Parameters |
|
---------- |
|
other : Cycler |
|
""" |
|
if len(self) != len(other): |
|
raise ValueError( |
|
f"Can only add equal length cycles, not {len(self)} and {len(other)}" |
|
) |
|
return Cycler( |
|
cast(Cycler[Union[K, L], Union[V, U]], self), |
|
cast(Cycler[Union[K, L], Union[V, U]], other), |
|
zip |
|
) |
|
|
|
@overload |
|
def __mul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: |
|
... |
|
|
|
@overload |
|
def __mul__(self, other: int) -> Cycler[K, V]: |
|
... |
|
|
|
def __mul__(self, other): |
|
""" |
|
Outer product of two cyclers (`itertools.product`) or integer |
|
multiplication. |
|
|
|
Parameters |
|
---------- |
|
other : Cycler or int |
|
""" |
|
if isinstance(other, Cycler): |
|
return Cycler( |
|
cast(Cycler[Union[K, L], Union[V, U]], self), |
|
cast(Cycler[Union[K, L], Union[V, U]], other), |
|
product |
|
) |
|
elif isinstance(other, int): |
|
trans = self.by_key() |
|
return reduce( |
|
add, (_cycler(k, v * other) for k, v in trans.items()) |
|
) |
|
else: |
|
return NotImplemented |
|
|
|
@overload |
|
def __rmul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: |
|
... |
|
|
|
@overload |
|
def __rmul__(self, other: int) -> Cycler[K, V]: |
|
... |
|
|
|
def __rmul__(self, other): |
|
return self * other |
|
|
|
def __len__(self) -> int: |
|
op_dict: dict[Callable, Callable[[int, int], int]] = {zip: min, product: mul} |
|
if self._right is None: |
|
return len(self._left) |
|
l_len = len(self._left) |
|
r_len = len(self._right) |
|
return op_dict[self._op](l_len, r_len) |
|
|
|
|
|
|
|
def __iadd__(self, other: Cycler[K, V]) -> Cycler[K, V]: |
|
""" |
|
In-place pair-wise combine two equal length cyclers (zip). |
|
|
|
Parameters |
|
---------- |
|
other : Cycler |
|
""" |
|
if not isinstance(other, Cycler): |
|
raise TypeError("Cannot += with a non-Cycler object") |
|
|
|
old_self = copy.copy(self) |
|
self._keys = _process_keys(old_self, other) |
|
self._left = old_self |
|
self._op = zip |
|
self._right = Cycler(other._left, other._right, other._op) |
|
return self |
|
|
|
def __imul__(self, other: Cycler[K, V] | int) -> Cycler[K, V]: |
|
""" |
|
In-place outer product of two cyclers (`itertools.product`). |
|
|
|
Parameters |
|
---------- |
|
other : Cycler |
|
""" |
|
if not isinstance(other, Cycler): |
|
raise TypeError("Cannot *= with a non-Cycler object") |
|
|
|
old_self = copy.copy(self) |
|
self._keys = _process_keys(old_self, other) |
|
self._left = old_self |
|
self._op = product |
|
self._right = Cycler(other._left, other._right, other._op) |
|
return self |
|
|
|
def __eq__(self, other: object) -> bool: |
|
if not isinstance(other, Cycler): |
|
return False |
|
if len(self) != len(other): |
|
return False |
|
if self.keys ^ other.keys: |
|
return False |
|
return all(a == b for a, b in zip(self, other)) |
|
|
|
__hash__ = None |
|
|
|
def __repr__(self) -> str: |
|
op_map = {zip: "+", product: "*"} |
|
if self._right is None: |
|
lab = self.keys.pop() |
|
itr = list(v[lab] for v in self) |
|
return f"cycler({lab!r}, {itr!r})" |
|
else: |
|
op = op_map.get(self._op, "?") |
|
msg = "({left!r} {op} {right!r})" |
|
return msg.format(left=self._left, op=op, right=self._right) |
|
|
|
def _repr_html_(self) -> str: |
|
|
|
output = "<table>" |
|
sorted_keys = sorted(self.keys, key=repr) |
|
for key in sorted_keys: |
|
output += f"<th>{key!r}</th>" |
|
for d in iter(self): |
|
output += "<tr>" |
|
for k in sorted_keys: |
|
output += f"<td>{d[k]!r}</td>" |
|
output += "</tr>" |
|
output += "</table>" |
|
return output |
|
|
|
def by_key(self) -> dict[K, list[V]]: |
|
""" |
|
Values by key. |
|
|
|
This returns the transposed values of the cycler. Iterating |
|
over a `Cycler` yields dicts with a single value for each key, |
|
this method returns a `dict` of `list` which are the values |
|
for the given key. |
|
|
|
The returned value can be used to create an equivalent `Cycler` |
|
using only `+`. |
|
|
|
Returns |
|
------- |
|
transpose : dict |
|
dict of lists of the values for each key. |
|
""" |
|
|
|
|
|
|
|
|
|
keys = self.keys |
|
out: dict[K, list[V]] = {k: list() for k in keys} |
|
|
|
for d in self: |
|
for k in keys: |
|
out[k].append(d[k]) |
|
return out |
|
|
|
|
|
_transpose = by_key |
|
|
|
def simplify(self) -> Cycler[K, V]: |
|
""" |
|
Simplify the cycler into a sum (but no products) of cyclers. |
|
|
|
Returns |
|
------- |
|
simple : Cycler |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
trans = self.by_key() |
|
return reduce(add, (_cycler(k, v) for k, v in trans.items())) |
|
|
|
concat = concat |
|
|
|
|
|
@overload |
|
def cycler(arg: Cycler[K, V]) -> Cycler[K, V]: |
|
... |
|
|
|
|
|
@overload |
|
def cycler(**kwargs: Iterable[V]) -> Cycler[str, V]: |
|
... |
|
|
|
|
|
@overload |
|
def cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]: |
|
... |
|
|
|
|
|
def cycler(*args, **kwargs): |
|
""" |
|
Create a new `Cycler` object from a single positional argument, |
|
a pair of positional arguments, or the combination of keyword arguments. |
|
|
|
cycler(arg) |
|
cycler(label1=itr1[, label2=iter2[, ...]]) |
|
cycler(label, itr) |
|
|
|
Form 1 simply copies a given `Cycler` object. |
|
|
|
Form 2 composes a `Cycler` as an inner product of the |
|
pairs of keyword arguments. In other words, all of the |
|
iterables are cycled simultaneously, as if through zip(). |
|
|
|
Form 3 creates a `Cycler` from a label and an iterable. |
|
This is useful for when the label cannot be a keyword argument |
|
(e.g., an integer or a name that has a space in it). |
|
|
|
Parameters |
|
---------- |
|
arg : Cycler |
|
Copy constructor for Cycler (does a shallow copy of iterables). |
|
label : name |
|
The property key. In the 2-arg form of the function, |
|
the label can be any hashable object. In the keyword argument |
|
form of the function, it must be a valid python identifier. |
|
itr : iterable |
|
Finite length iterable of the property values. |
|
Can be a single-property `Cycler` that would |
|
be like a key change, but as a shallow copy. |
|
|
|
Returns |
|
------- |
|
cycler : Cycler |
|
New `Cycler` for the given property |
|
|
|
""" |
|
if args and kwargs: |
|
raise TypeError( |
|
"cycler() can only accept positional OR keyword arguments -- not both." |
|
) |
|
|
|
if len(args) == 1: |
|
if not isinstance(args[0], Cycler): |
|
raise TypeError( |
|
"If only one positional argument given, it must " |
|
"be a Cycler instance." |
|
) |
|
return Cycler(args[0]) |
|
elif len(args) == 2: |
|
return _cycler(*args) |
|
elif len(args) > 2: |
|
raise TypeError( |
|
"Only a single Cycler can be accepted as the lone " |
|
"positional argument. Use keyword arguments instead." |
|
) |
|
|
|
if kwargs: |
|
return reduce(add, (_cycler(k, v) for k, v in kwargs.items())) |
|
|
|
raise TypeError("Must have at least a positional OR keyword arguments") |
|
|
|
|
|
def _cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]: |
|
""" |
|
Create a new `Cycler` object from a property name and iterable of values. |
|
|
|
Parameters |
|
---------- |
|
label : hashable |
|
The property key. |
|
itr : iterable |
|
Finite length iterable of the property values. |
|
|
|
Returns |
|
------- |
|
cycler : Cycler |
|
New `Cycler` for the given property |
|
""" |
|
if isinstance(itr, Cycler): |
|
keys = itr.keys |
|
if len(keys) != 1: |
|
msg = "Can not create Cycler from a multi-property Cycler" |
|
raise ValueError(msg) |
|
|
|
lab = keys.pop() |
|
|
|
|
|
itr = (v[lab] for v in itr) |
|
|
|
return Cycler._from_iter(label, itr) |
|
|