from typing import Iterable, Union
import numpy as np
from numpy import concatenate as join, ndarray, array_repr
import awkward as ak
from awkward import unflatten, Array as akarray
from numpy.lib.mixins import NDArrayOperatorsMixin
from scipy.sparse import csr_matrix as csr_scipy
from sigmaepsilon.core import Wrapper
from ...arraysetops import unique2d
from .utils import count_cols
from .csr import csr_matrix
__all__ = ["JaggedArray"]
def shape(arr):
return arr.shape[:2]
def cut(shp):
return np.full(shp[0], shp[1], dtype=int)
def flatten(arr):
return arr.flatten()
def get_data(
data: Iterable, cuts: Iterable = None, fallback: bool = False
) -> Union[ndarray, akarray]:
if isinstance(data, np.ndarray):
nD = len(data.shape)
assert nD <= 2, "Only 1 and 2 dimensional arrays are supported!"
if nD == 1:
assert isinstance(cuts, Iterable)
cuts = np.array(cuts).astype(int)
data = unflatten(data, cuts)
return data
elif isinstance(data, list):
try:
if fallback:
raise NotImplementedError
# list of lists
return ak.Array(data)
except Exception:
# list of 2d NumPy arrays
assert all(
map(lambda arr: len(arr.shape) == 2, data)
), "Only 2 dimensional arrays are supported!"
# NOTE - implementaion 1
# > Through the back door, but this is probably the cleanest solution of all.
# > It only requires to create one python list, without further operations on it.
# NOTE This line is one of the most unreadable things I've ever done.
data = unflatten(
join(list(map(flatten, data))), join(list(map(cut, map(shape, data))))
)
# NOTE - implementaion 2
# from operator import add
# from functools import reduce
# > This also works, but requires data to jump back and forth just to
# > have a merged list of lists. It also requires to add nested python lists,
# > which is probably not the quickest operation in the computational world.
# data = ak.from_iter(reduce(add, map(lambda arr : ak.Array(arr).to_list(), data)))
# NOTE - implementaion 3
# > This also works at creation, but fails later at some operation due to
# > the specific layout generated by ak.concatenate
# data = ak.concatenate(list(map(lambda arr : ak.Array(arr), data)))
return data
elif isinstance(data, akarray):
return data
else:
if data:
raise TypeError(f"Invalid type {type(data)}")
HANDLED_FUNCTIONS = {}
[docs]
class JaggedArray(NDArrayOperatorsMixin, Wrapper):
"""
A NumPy-compliant class that handles 2d matrices with a variable
number of columns per row.
The class is actually an interface to `awkward.Array`,
with some additional features, specific to 2d jagged arrays.
At the moment a JaggedArray can be constructed from
* a flattened 2d jagged array with 'cuts' through unflatteing (we use Awkward here)
* a list of 1d lists
* a list of 2d NumPy arrays
Parameters
----------
data : Iterable
A 2d numpy array or a list of them.
cuts : Iterable, Optional
An iterable that tells how to unflatten an 1d array into a
2d jagged shape. Only if 'data' is an 1d array. Default is None.
force_numpy : bool, Optional
Forces dense inputs to be NumPy arrays in the background.
Default is True.
Examples
--------
The following defines a dense matrix from a flattened shape:
>>> import numpy as np
>>> from sigmaepsilon.math.linalg import JaggedArray
>>> data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> JaggedArray(data, cuts=[3, 3, 3])
array([[1, 2, 3],
[4, 5, 6],
[7, 8, 9]])
Since we use the Awkward library to deflatten arrays, if we provide
the argument `force_numpy=False`, which is the default behaviour, the data
is stored as an Awkward array:
>>> JaggedArray(data, cuts=[3, 3, 3], force_numpy=False)
JaggedArray([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
The next example defines a jagged array. In this case the data is inevitably
stored as an Awkward array.
>>> data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
>>> JaggedArray(data, cuts=[3, 3, 4])
JaggedArray([[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]])
>>> JaggedArray([[1, 2], [3, 4, 5]])
JaggedArray([[1, 2], [3, 4, 5]])
>>> JaggedArray([np.eye(2), np.eye(3)])
JaggedArray([[[1, 0], [0, 1]], [[1, 0, 0], [0, 1, 0], [0, 0, 1]]])
See also
--------
:class:`~sigmaepsilon.math.linalg.sparse.csr.csr_matrix`
:class:`awkward.Array`
"""
def __init__(
self,
data: Iterable = None,
*,
cuts: Iterable = None,
force_numpy: bool = True,
**kwargs,
):
wrap = kwargs.get("wrap", None)
if wrap is None:
assert isinstance(data, Iterable)
wrap = get_data(data, cuts)
if isinstance(wrap, akarray) and force_numpy:
N = len(np.unique(count_cols(wrap)))
if N == 1:
wrap = wrap.to_numpy()
super(JaggedArray, self).__init__(wrap=wrap)
def __repr__(self):
if isinstance(self._wrapped, ndarray):
return array_repr(self._wrapped)
else:
return f"{self.__class__.__name__}({self._wrapped})"
def __array__(self):
return self._wrapped
def __getitem__(self, key):
return self._wrapped.__getitem__(key)
def __setitem__(self, key, value):
return self._wrapped.__setitem__(key, value)
def __len__(self):
return len(self._wrapped)
[docs]
def to_csr(self) -> csr_matrix:
"""
Returns the topology as a csr_matrix.
See also
--------
:class:`~sigmaepsilon.math.linalg.sparse.csr.csr_matrix`
"""
return csr_matrix(self._wrapped)
[docs]
def to_scipy(self) -> csr_scipy:
"""
Returns the array as a sparse SciPy CSR matrix.
See also
--------
:class:`scipy.sparse.csr_matrix`
"""
return self.to_csr().to_scipy()
[docs]
def to_ak(self) -> akarray:
"""
Returns underlying data as an Awkward array.
See also
--------
:class:`awkward.Array`
"""
if isinstance(self._wrapped, akarray):
return self._wrapped
else:
return akarray(self._wrapped)
[docs]
def to_numpy(self) -> ndarray:
"""
Returns underlying data as a NumPy array. This is only possible
for regular topologies.
"""
if isinstance(self._wrapped, ndarray):
return self.__array__()
else:
return self.to_csr().to_numpy()
[docs]
def to_array(self) -> Union[akarray, ndarray]:
"""
Returns the underlying data, which either an Awkward or a NumPy array.
"""
return self.__array__()
[docs]
def to_list(self):
"""
Returns underlying data as lists.
.. versionadded:: 0.0.8
"""
if isinstance(self._wrapped, ndarray):
return self.__array__().tolist()
else:
return self.__array__().to_list()
[docs]
def unique(self, *args, **kwargs):
"""
Returns unique elements, by generalizing the functionality provided
by :func:`numpy.unique`, see its documentation for the details.
"""
return np.unique(self, *args, **kwargs)
[docs]
def is_jagged(self) -> bool:
"""
Returns True if the topology is jagged, False otherwise.
"""
widths = self.widths()
return not np.all(widths == widths[0])
[docs]
def widths(self) -> ndarray:
"""
Returns the number of columns for each row.
"""
return count_cols(self._wrapped)
@property
def size(self) -> int:
"""
Number of elements in the array.
"""
return np.sum(self.widths())
[docs]
def flatten(self, return_cuts: bool = False):
"""
Returns the flattened equivalent of the array.
"""
if isinstance(self._wrapped, akarray):
if return_cuts:
topo = ak.flatten(self._wrapped).to_numpy()
return self.widths(), topo
else:
topo = ak.flatten(self._wrapped).to_numpy()
return topo
else:
if return_cuts:
return self.widths(), self.to_numpy().flatten()
else:
return self.to_numpy().flatten()
@property
def shape(self):
"""
Returns the shape of the data as a tuple. If the topology is
jagged, the second item is an iterable.
.. versionadded:: 0.0.8
"""
if isinstance(self._wrapped, ndarray):
return self._wrapped.shape
else:
return len(self), self.widths()
def __array_function__(self, func, types, args, kwargs):
if func not in HANDLED_FUNCTIONS:
arrs = [arg.to_ak() for arg in args]
return func(*arrs, **kwargs)
if len(args) == 1 and isinstance(args[0], (list, tuple)):
if not all(isinstance(x, self.__class__) for x in args[0]):
raise TypeError("All object must be instances of JaggedArray.")
else:
if not all(isinstance(x, self.__class__) for x in args):
raise TypeError("All object must be instances of JaggedArray.")
return HANDLED_FUNCTIONS[func](*args, **kwargs)
def implements(numpy_function):
"""
Register an __array_function__ implementation for JaggedArray
objects.
"""
def decorator(func):
HANDLED_FUNCTIONS[numpy_function] = func
return func
return decorator
@implements(np.unique)
def unique(*args, **kwargs):
return unique2d(args[0]._wrapped, **kwargs)
@implements(np.vstack)
def vstack(*args, **kwargs):
data = np.concatenate(list(t.flatten() for t in args[0]))
cuts = np.concatenate(list(t.widths() for t in args[0]))
return JaggedArray(data, cuts=cuts)
@implements(np.concatenate)
def concatenate(*args, **kwargs):
data = np.concatenate(list(t.flatten() for t in args[0]))
cuts = np.concatenate(list(t.widths() for t in args[0]))
return JaggedArray(data, cuts=cuts)