Source code for sigmaepsilon.math.linalg.sparse.jaggedarray

from typing import Iterable, Union
import numpy as np
from numpy import concatenate as join, ndarray, array_repr
import awkward as ak
from awkward import unflatten, Array as akarray
from numpy.lib.mixins import NDArrayOperatorsMixin
from scipy.sparse import csr_matrix as csr_scipy

from sigmaepsilon.core import Wrapper
from ...arraysetops import unique2d
from .utils import count_cols
from .csr import csr_matrix


__all__ = ["JaggedArray"]


def shape(arr):
    return arr.shape[:2]


def cut(shp):
    return np.full(shp[0], shp[1], dtype=int)


def flatten(arr):
    return arr.flatten()


def get_data(
    data: Iterable, cuts: Iterable = None, fallback: bool = False
) -> Union[ndarray, akarray]:
    if isinstance(data, np.ndarray):
        nD = len(data.shape)
        assert nD <= 2, "Only 1 and 2 dimensional arrays are supported!"
        if nD == 1:
            assert isinstance(cuts, Iterable)
            cuts = np.array(cuts).astype(int)
            data = unflatten(data, cuts)
        return data
    elif isinstance(data, list):
        try:
            if fallback:
                raise NotImplementedError
            # list of lists
            return ak.Array(data)
        except Exception:
            # list of 2d NumPy arrays
            assert all(
                map(lambda arr: len(arr.shape) == 2, data)
            ), "Only 2 dimensional arrays are supported!"
            # NOTE - implementaion 1
            # > Through the back door, but this is probably the cleanest solution of all.
            # > It only requires to create one python list, without further operations on it.
            # NOTE This line is one of the most unreadable things I've ever done.
            data = unflatten(
                join(list(map(flatten, data))), join(list(map(cut, map(shape, data))))
            )
            # NOTE - implementaion 2
            # from operator import add
            # from functools import reduce
            # > This also works, but requires data to jump back and forth just to
            # > have a merged list of lists. It also requires to add nested python lists,
            # > which is probably not the quickest operation in the computational world.
            # data = ak.from_iter(reduce(add, map(lambda arr : ak.Array(arr).to_list(), data)))
            # NOTE - implementaion 3
            # > This also works at creation, but fails later at some operation due to
            # > the specific layout generated by ak.concatenate
            # data = ak.concatenate(list(map(lambda arr : ak.Array(arr), data)))
            return data
    elif isinstance(data, akarray):
        return data
    else:
        if data:
            raise TypeError(f"Invalid type {type(data)}")


HANDLED_FUNCTIONS = {}


[docs] class JaggedArray(NDArrayOperatorsMixin, Wrapper): """ A NumPy-compliant class that handles 2d matrices with a variable number of columns per row. The class is actually an interface to `awkward.Array`, with some additional features, specific to 2d jagged arrays. At the moment a JaggedArray can be constructed from * a flattened 2d jagged array with 'cuts' through unflatteing (we use Awkward here) * a list of 1d lists * a list of 2d NumPy arrays Parameters ---------- data : Iterable A 2d numpy array or a list of them. cuts : Iterable, Optional An iterable that tells how to unflatten an 1d array into a 2d jagged shape. Only if 'data' is an 1d array. Default is None. force_numpy : bool, Optional Forces dense inputs to be NumPy arrays in the background. Default is True. Examples -------- The following defines a dense matrix from a flattened shape: >>> import numpy as np >>> from sigmaepsilon.math.linalg import JaggedArray >>> data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) >>> JaggedArray(data, cuts=[3, 3, 3]) array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) Since we use the Awkward library to deflatten arrays, if we provide the argument `force_numpy=False`, which is the default behaviour, the data is stored as an Awkward array: >>> JaggedArray(data, cuts=[3, 3, 3], force_numpy=False) JaggedArray([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) The next example defines a jagged array. In this case the data is inevitably stored as an Awkward array. >>> data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) >>> JaggedArray(data, cuts=[3, 3, 4]) JaggedArray([[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]) >>> JaggedArray([[1, 2], [3, 4, 5]]) JaggedArray([[1, 2], [3, 4, 5]]) >>> JaggedArray([np.eye(2), np.eye(3)]) JaggedArray([[[1, 0], [0, 1]], [[1, 0, 0], [0, 1, 0], [0, 0, 1]]]) See also -------- :class:`~sigmaepsilon.math.linalg.sparse.csr.csr_matrix` :class:`awkward.Array` """ def __init__( self, data: Iterable = None, *, cuts: Iterable = None, force_numpy: bool = True, **kwargs, ): wrap = kwargs.get("wrap", None) if wrap is None: assert isinstance(data, Iterable) wrap = get_data(data, cuts) if isinstance(wrap, akarray) and force_numpy: N = len(np.unique(count_cols(wrap))) if N == 1: wrap = wrap.to_numpy() super(JaggedArray, self).__init__(wrap=wrap) def __repr__(self): if isinstance(self._wrapped, ndarray): return array_repr(self._wrapped) else: return f"{self.__class__.__name__}({self._wrapped})" def __array__(self): return self._wrapped def __getitem__(self, key): return self._wrapped.__getitem__(key) def __setitem__(self, key, value): return self._wrapped.__setitem__(key, value) def __len__(self): return len(self._wrapped)
[docs] def to_csr(self) -> csr_matrix: """ Returns the topology as a csr_matrix. See also -------- :class:`~sigmaepsilon.math.linalg.sparse.csr.csr_matrix` """ return csr_matrix(self._wrapped)
[docs] def to_scipy(self) -> csr_scipy: """ Returns the array as a sparse SciPy CSR matrix. See also -------- :class:`scipy.sparse.csr_matrix` """ return self.to_csr().to_scipy()
[docs] def to_ak(self) -> akarray: """ Returns underlying data as an Awkward array. See also -------- :class:`awkward.Array` """ if isinstance(self._wrapped, akarray): return self._wrapped else: return akarray(self._wrapped)
[docs] def to_numpy(self) -> ndarray: """ Returns underlying data as a NumPy array. This is only possible for regular topologies. """ if isinstance(self._wrapped, ndarray): return self.__array__() else: return self.to_csr().to_numpy()
[docs] def to_array(self) -> Union[akarray, ndarray]: """ Returns the underlying data, which either an Awkward or a NumPy array. """ return self.__array__()
[docs] def to_list(self): """ Returns underlying data as lists. .. versionadded:: 0.0.8 """ if isinstance(self._wrapped, ndarray): return self.__array__().tolist() else: return self.__array__().to_list()
[docs] def unique(self, *args, **kwargs): """ Returns unique elements, by generalizing the functionality provided by :func:`numpy.unique`, see its documentation for the details. """ return np.unique(self, *args, **kwargs)
[docs] def is_jagged(self) -> bool: """ Returns True if the topology is jagged, False otherwise. """ widths = self.widths() return not np.all(widths == widths[0])
[docs] def widths(self) -> ndarray: """ Returns the number of columns for each row. """ return count_cols(self._wrapped)
@property def size(self) -> int: """ Number of elements in the array. """ return np.sum(self.widths())
[docs] def flatten(self, return_cuts: bool = False): """ Returns the flattened equivalent of the array. """ if isinstance(self._wrapped, akarray): if return_cuts: topo = ak.flatten(self._wrapped).to_numpy() return self.widths(), topo else: topo = ak.flatten(self._wrapped).to_numpy() return topo else: if return_cuts: return self.widths(), self.to_numpy().flatten() else: return self.to_numpy().flatten()
@property def shape(self): """ Returns the shape of the data as a tuple. If the topology is jagged, the second item is an iterable. .. versionadded:: 0.0.8 """ if isinstance(self._wrapped, ndarray): return self._wrapped.shape else: return len(self), self.widths() def __array_function__(self, func, types, args, kwargs): if func not in HANDLED_FUNCTIONS: arrs = [arg.to_ak() for arg in args] return func(*arrs, **kwargs) if len(args) == 1 and isinstance(args[0], (list, tuple)): if not all(isinstance(x, self.__class__) for x in args[0]): raise TypeError("All object must be instances of JaggedArray.") else: if not all(isinstance(x, self.__class__) for x in args): raise TypeError("All object must be instances of JaggedArray.") return HANDLED_FUNCTIONS[func](*args, **kwargs)
def implements(numpy_function): """ Register an __array_function__ implementation for JaggedArray objects. """ def decorator(func): HANDLED_FUNCTIONS[numpy_function] = func return func return decorator @implements(np.unique) def unique(*args, **kwargs): return unique2d(args[0]._wrapped, **kwargs) @implements(np.vstack) def vstack(*args, **kwargs): data = np.concatenate(list(t.flatten() for t in args[0])) cuts = np.concatenate(list(t.widths() for t in args[0])) return JaggedArray(data, cuts=cuts) @implements(np.concatenate) def concatenate(*args, **kwargs): data = np.concatenate(list(t.flatten() for t in args[0])) cuts = np.concatenate(list(t.widths() for t in args[0])) return JaggedArray(data, cuts=cuts)