+"""
+Cycler
+======
+
+Cycling through combinations of values, producing dictionaries.
+
+You can add cyclers::
+
+ from cycler import cycler
+ cc = (cycler(color=list('rgb')) +
+ cycler(linestyle=['-', '--', '-.']))
+ for d in cc:
+ print(d)
+
+Results in::
+
+ {'color': 'r', 'linestyle': '-'}
+ {'color': 'g', 'linestyle': '--'}
+ {'color': 'b', 'linestyle': '-.'}
+
+
+You can multiply cyclers::
+
+ from cycler import cycler
+ cc = (cycler(color=list('rgb')) *
+ cycler(linestyle=['-', '--', '-.']))
+ for d in cc:
+ print(d)
+
+Results in::
+
+ {'color': 'r', 'linestyle': '-'}
+ {'color': 'r', 'linestyle': '--'}
+ {'color': 'r', 'linestyle': '-.'}
+ {'color': 'g', 'linestyle': '-'}
+ {'color': 'g', 'linestyle': '--'}
+ {'color': 'g', 'linestyle': '-.'}
+ {'color': 'b', 'linestyle': '-'}
+ {'color': 'b', 'linestyle': '--'}
+ {'color': 'b', 'linestyle': '-.'}
+"""
+
+
+from __future__ import annotations
+
+from collections.abc import Hashable, Iterable, Generator
+import copy
+from functools import reduce
+from itertools import product, cycle
+from operator import mul, add
+# Dict, List, Union required for runtime cast calls
+from typing import TypeVar, Generic, Callable, Union, Dict, List, Any, overload, cast
+
+__version__ = "0.12.1"
+
+K = TypeVar("K", bound=Hashable)
+L = TypeVar("L", bound=Hashable)
+V = TypeVar("V")
+U = TypeVar("U")
+
+
+def _process_keys(
+ left: Cycler[K, V] | Iterable[dict[K, V]],
+ right: Cycler[K, V] | Iterable[dict[K, V]] | None,
+) -> set[K]:
+ """
+ Helper function to compose cycler keys.
+
+ Parameters
+ ----------
+ left, right : iterable of dictionaries or None
+ The cyclers to be composed.
+
+ Returns
+ -------
+ keys : set
+ The keys in the composition of the two cyclers.
+ """
+ l_peek: dict[K, V] = next(iter(left)) if left != [] else {}
+ r_peek: dict[K, V] = next(iter(right)) if right is not None else {}
+ l_key: set[K] = set(l_peek.keys())
+ r_key: set[K] = set(r_peek.keys())
+ if l_key & r_key:
+ raise ValueError("Can not compose overlapping cycles")
+ return l_key | r_key
+
+
+
+
[docs]
+
def concat(left: Cycler[K, V], right: Cycler[K, U]) -> Cycler[K, V | U]:
+
r"""
+
Concatenate `Cycler`\s, as if chained using `itertools.chain`.
+
+
The keys must match exactly.
+
+
Examples
+
--------
+
>>> num = cycler('a', range(3))
+
>>> let = cycler('a', 'abc')
+
>>> num.concat(let)
+
cycler('a', [0, 1, 2, 'a', 'b', 'c'])
+
+
Returns
+
-------
+
`Cycler`
+
The concatenated cycler.
+
"""
+
if left.keys != right.keys:
+
raise ValueError(
+
"Keys do not match:\n"
+
"\tIntersection: {both!r}\n"
+
"\tDisjoint: {just_one!r}".format(
+
both=left.keys & right.keys, just_one=left.keys ^ right.keys
+
)
+
)
+
_l = cast(Dict[K, List[Union[V, U]]], left.by_key())
+
_r = cast(Dict[K, List[Union[V, U]]], right.by_key())
+
return reduce(add, (_cycler(k, _l[k] + _r[k]) for k in left.keys))
+
+
+
+
+
[docs]
+
class Cycler(Generic[K, V]):
+
"""
+
Composable cycles.
+
+
This class has compositions methods:
+
+
``+``
+
for 'inner' products (zip)
+
+
``+=``
+
in-place ``+``
+
+
``*``
+
for outer products (`itertools.product`) and integer multiplication
+
+
``*=``
+
in-place ``*``
+
+
and supports basic slicing via ``[]``.
+
+
Parameters
+
----------
+
left, right : Cycler or None
+
The 'left' and 'right' cyclers.
+
op : func or None
+
Function which composes the 'left' and 'right' cyclers.
+
"""
+
+
def __call__(self):
+
return cycle(self)
+
+
+
[docs]
+
def __init__(
+
self,
+
left: Cycler[K, V] | Iterable[dict[K, V]] | None,
+
right: Cycler[K, V] | None = None,
+
op: Any = None,
+
):
+
"""
+
Semi-private init.
+
+
Do not use this directly, use `cycler` function instead.
+
"""
+
if isinstance(left, Cycler):
+
self._left: Cycler[K, V] | list[dict[K, V]] = Cycler(
+
left._left, left._right, left._op
+
)
+
elif left is not None:
+
# Need to copy the dictionary or else that will be a residual
+
# mutable that could lead to strange errors
+
self._left = [copy.copy(v) for v in left]
+
else:
+
self._left = []
+
+
if isinstance(right, Cycler):
+
self._right: Cycler[K, V] | None = Cycler(
+
right._left, right._right, right._op
+
)
+
else:
+
self._right = None
+
+
self._keys: set[K] = _process_keys(self._left, self._right)
+
self._op: Any = op
+
+
+
def __contains__(self, k):
+
return k in self._keys
+
+
@property
+
def keys(self) -> set[K]:
+
"""The keys this Cycler knows about."""
+
return set(self._keys)
+
+
+
[docs]
+
def change_key(self, old: K, new: K) -> None:
+
"""
+
Change a key in this cycler to a new name.
+
Modification is performed in-place.
+
+
Does nothing if the old key is the same as the new key.
+
Raises a ValueError if the new key is already a key.
+
Raises a KeyError if the old key isn't a key.
+
"""
+
if old == new:
+
return
+
if new in self._keys:
+
raise ValueError(
+
f"Can't replace {old} with {new}, {new} is already a key"
+
)
+
if old not in self._keys:
+
raise KeyError(
+
f"Can't replace {old} with {new}, {old} is not a key"
+
)
+
+
self._keys.remove(old)
+
self._keys.add(new)
+
+
if self._right is not None and old in self._right.keys:
+
self._right.change_key(old, new)
+
+
# self._left should always be non-None
+
# if self._keys is non-empty.
+
elif isinstance(self._left, Cycler):
+
self._left.change_key(old, new)
+
else:
+
# It should be completely safe at this point to
+
# assume that the old key can be found in each
+
# iteration.
+
self._left = [{new: entry[old]} for entry in self._left]
+
+
+
@classmethod
+
def _from_iter(cls, label: K, itr: Iterable[V]) -> Cycler[K, V]:
+
"""
+
Class method to create 'base' Cycler objects
+
that do not have a 'right' or 'op' and for which
+
the 'left' object is not another Cycler.
+
+
Parameters
+
----------
+
label : hashable
+
The property key.
+
+
itr : iterable
+
Finite length iterable of the property values.
+
+
Returns
+
-------
+
`Cycler`
+
New 'base' cycler.
+
"""
+
ret: Cycler[K, V] = cls(None)
+
ret._left = list({label: v} for v in itr)
+
ret._keys = {label}
+
return ret
+
+
def __getitem__(self, key: slice) -> Cycler[K, V]:
+
# TODO : maybe add numpy style fancy slicing
+
if isinstance(key, slice):
+
trans = self.by_key()
+
return reduce(add, (_cycler(k, v[key]) for k, v in trans.items()))
+
else:
+
raise ValueError("Can only use slices with Cycler.__getitem__")
+
+
def __iter__(self) -> Generator[dict[K, V], None, None]:
+
if self._right is None:
+
for left in self._left:
+
yield dict(left)
+
else:
+
if self._op is None:
+
raise TypeError(
+
"Operation cannot be None when both left and right are defined"
+
)
+
for a, b in self._op(self._left, self._right):
+
out = {}
+
out.update(a)
+
out.update(b)
+
yield out
+
+
def __add__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]:
+
"""
+
Pair-wise combine two equal length cyclers (zip).
+
+
Parameters
+
----------
+
other : Cycler
+
"""
+
if len(self) != len(other):
+
raise ValueError(
+
f"Can only add equal length cycles, not {len(self)} and {len(other)}"
+
)
+
return Cycler(
+
cast(Cycler[Union[K, L], Union[V, U]], self),
+
cast(Cycler[Union[K, L], Union[V, U]], other),
+
zip
+
)
+
+
@overload
+
def __mul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]:
+
...
+
+
@overload
+
def __mul__(self, other: int) -> Cycler[K, V]:
+
...
+
+
def __mul__(self, other):
+
"""
+
Outer product of two cyclers (`itertools.product`) or integer
+
multiplication.
+
+
Parameters
+
----------
+
other : Cycler or int
+
"""
+
if isinstance(other, Cycler):
+
return Cycler(
+
cast(Cycler[Union[K, L], Union[V, U]], self),
+
cast(Cycler[Union[K, L], Union[V, U]], other),
+
product
+
)
+
elif isinstance(other, int):
+
trans = self.by_key()
+
return reduce(
+
add, (_cycler(k, v * other) for k, v in trans.items())
+
)
+
else:
+
return NotImplemented
+
+
@overload
+
def __rmul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]:
+
...
+
+
@overload
+
def __rmul__(self, other: int) -> Cycler[K, V]:
+
...
+
+
def __rmul__(self, other):
+
return self * other
+
+
def __len__(self) -> int:
+
op_dict: dict[Callable, Callable[[int, int], int]] = {zip: min, product: mul}
+
if self._right is None:
+
return len(self._left)
+
l_len = len(self._left)
+
r_len = len(self._right)
+
return op_dict[self._op](l_len, r_len)
+
+
# iadd and imul do not exapand the the type as the returns must be consistent with
+
# self, thus they flag as inconsistent with add/mul
+
def __iadd__(self, other: Cycler[K, V]) -> Cycler[K, V]: # type: ignore[misc]
+
"""
+
In-place pair-wise combine two equal length cyclers (zip).
+
+
Parameters
+
----------
+
other : Cycler
+
"""
+
if not isinstance(other, Cycler):
+
raise TypeError("Cannot += with a non-Cycler object")
+
# True shallow copy of self is fine since this is in-place
+
old_self = copy.copy(self)
+
self._keys = _process_keys(old_self, other)
+
self._left = old_self
+
self._op = zip
+
self._right = Cycler(other._left, other._right, other._op)
+
return self
+
+
def __imul__(self, other: Cycler[K, V] | int) -> Cycler[K, V]: # type: ignore[misc]
+
"""
+
In-place outer product of two cyclers (`itertools.product`).
+
+
Parameters
+
----------
+
other : Cycler
+
"""
+
if not isinstance(other, Cycler):
+
raise TypeError("Cannot *= with a non-Cycler object")
+
# True shallow copy of self is fine since this is in-place
+
old_self = copy.copy(self)
+
self._keys = _process_keys(old_self, other)
+
self._left = old_self
+
self._op = product
+
self._right = Cycler(other._left, other._right, other._op)
+
return self
+
+
def __eq__(self, other: object) -> bool:
+
if not isinstance(other, Cycler):
+
return False
+
if len(self) != len(other):
+
return False
+
if self.keys ^ other.keys:
+
return False
+
return all(a == b for a, b in zip(self, other))
+
+
__hash__ = None # type: ignore
+
+
def __repr__(self) -> str:
+
op_map = {zip: "+", product: "*"}
+
if self._right is None:
+
lab = self.keys.pop()
+
itr = list(v[lab] for v in self)
+
return f"cycler({lab!r}, {itr!r})"
+
else:
+
op = op_map.get(self._op, "?")
+
msg = "({left!r} {op} {right!r})"
+
return msg.format(left=self._left, op=op, right=self._right)
+
+
def _repr_html_(self) -> str:
+
# an table showing the value of each key through a full cycle
+
output = "<table>"
+
sorted_keys = sorted(self.keys, key=repr)
+
for key in sorted_keys:
+
output += f"<th>{key!r}</th>"
+
for d in iter(self):
+
output += "<tr>"
+
for k in sorted_keys:
+
output += f"<td>{d[k]!r}</td>"
+
output += "</tr>"
+
output += "</table>"
+
return output
+
+
+
[docs]
+
def by_key(self) -> dict[K, list[V]]:
+
"""
+
Values by key.
+
+
This returns the transposed values of the cycler. Iterating
+
over a `Cycler` yields dicts with a single value for each key,
+
this method returns a `dict` of `list` which are the values
+
for the given key.
+
+
The returned value can be used to create an equivalent `Cycler`
+
using only `+`.
+
+
Returns
+
-------
+
transpose : dict
+
dict of lists of the values for each key.
+
"""
+
+
# TODO : sort out if this is a bottle neck, if there is a better way
+
# and if we care.
+
+
keys = self.keys
+
out: dict[K, list[V]] = {k: list() for k in keys}
+
+
for d in self:
+
for k in keys:
+
out[k].append(d[k])
+
return out
+
+
+
# for back compatibility
+
_transpose = by_key
+
+
+
[docs]
+
def simplify(self) -> Cycler[K, V]:
+
"""
+
Simplify the cycler into a sum (but no products) of cyclers.
+
+
Returns
+
-------
+
simple : Cycler
+
"""
+
# TODO: sort out if it is worth the effort to make sure this is
+
# balanced. Currently it is is
+
# (((a + b) + c) + d) vs
+
# ((a + b) + (c + d))
+
# I would believe that there is some performance implications
+
trans = self.by_key()
+
return reduce(add, (_cycler(k, v) for k, v in trans.items()))
+
+
+
concat = concat
+
+
+
+@overload
+def cycler(arg: Cycler[K, V]) -> Cycler[K, V]:
+ ...
+
+
+@overload
+def cycler(**kwargs: Iterable[V]) -> Cycler[str, V]:
+ ...
+
+
+@overload
+def cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]:
+ ...
+
+
+
+
[docs]
+
def cycler(*args, **kwargs):
+
"""
+
Create a new `Cycler` object from a single positional argument,
+
a pair of positional arguments, or the combination of keyword arguments.
+
+
cycler(arg)
+
cycler(label1=itr1[, label2=iter2[, ...]])
+
cycler(label, itr)
+
+
Form 1 simply copies a given `Cycler` object.
+
+
Form 2 composes a `Cycler` as an inner product of the
+
pairs of keyword arguments. In other words, all of the
+
iterables are cycled simultaneously, as if through zip().
+
+
Form 3 creates a `Cycler` from a label and an iterable.
+
This is useful for when the label cannot be a keyword argument
+
(e.g., an integer or a name that has a space in it).
+
+
Parameters
+
----------
+
arg : Cycler
+
Copy constructor for Cycler (does a shallow copy of iterables).
+
label : name
+
The property key. In the 2-arg form of the function,
+
the label can be any hashable object. In the keyword argument
+
form of the function, it must be a valid python identifier.
+
itr : iterable
+
Finite length iterable of the property values.
+
Can be a single-property `Cycler` that would
+
be like a key change, but as a shallow copy.
+
+
Returns
+
-------
+
cycler : Cycler
+
New `Cycler` for the given property
+
+
"""
+
if args and kwargs:
+
raise TypeError(
+
"cycler() can only accept positional OR keyword arguments -- not both."
+
)
+
+
if len(args) == 1:
+
if not isinstance(args[0], Cycler):
+
raise TypeError(
+
"If only one positional argument given, it must "
+
"be a Cycler instance."
+
)
+
return Cycler(args[0])
+
elif len(args) == 2:
+
return _cycler(*args)
+
elif len(args) > 2:
+
raise TypeError(
+
"Only a single Cycler can be accepted as the lone "
+
"positional argument. Use keyword arguments instead."
+
)
+
+
if kwargs:
+
return reduce(add, (_cycler(k, v) for k, v in kwargs.items()))
+
+
raise TypeError("Must have at least a positional OR keyword arguments")
+
+
+
+def _cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]:
+ """
+ Create a new `Cycler` object from a property name and iterable of values.
+
+ Parameters
+ ----------
+ label : hashable
+ The property key.
+ itr : iterable
+ Finite length iterable of the property values.
+
+ Returns
+ -------
+ cycler : Cycler
+ New `Cycler` for the given property
+ """
+ if isinstance(itr, Cycler):
+ keys = itr.keys
+ if len(keys) != 1:
+ msg = "Can not create Cycler from a multi-property Cycler"
+ raise ValueError(msg)
+
+ lab = keys.pop()
+ # Doesn't need to be a new list because
+ # _from_iter() will be creating that new list anyway.
+ itr = (v[lab] for v in itr)
+
+ return Cycler._from_iter(label, itr)
+