Skip to content

Added cumulative sum preprocessor #2642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions doc/recipe/preprocessor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2870,8 +2870,46 @@ Other

Miscellaneous functions that do not belong to any of the other categories.

Clip
----
.. _cumulative_sum:

``cumulative_sum``
------------------

This function calculates cumulative sums along a given coordinate.

The ``cumulative_sum`` preprocessor supports the following arguments in the
recipe:

* ``coord`` (:obj:`str`): Coordinate over which the cumulative sum is
calculated.
Must be 0D or 1D.
* ``weights`` (array-like, :obj:`bool`, or ``None``, default: ``None``):
Weights for the calculation of the cumulative sum.
Each element in the data is multiplied by the corresponding weight before
summing.
Can be an array of the same shape as the input data, ``False`` or ``None``
(no weighting), or ``True`` (calculate the weights from the coordinate
bounds; only works if each coordinate point has exactly 2 bounds).
* ``method`` (:obj:`str`, default: ``"sequential"``): Method used to perform
the cumulative sum.
Only relevant if the cube has `lazy data
<https://scitools-iris.readthedocs.io/en/stable/userguide/real_and_lazy_data.html>`__.
See :func:`dask.array.cumsum` for details.

Example:

.. code-block:: yaml

preprocessors:
preproc_cumulative_sum:
cumulative_sum:
coord: time
weights: true

See also :func:`esmvalcore.preprocessor.cumulative_sum`.

``clip``
--------

This function clips data values to a certain minimum, maximum or range. The function takes two
arguments:
Expand All @@ -2892,7 +2930,7 @@ The example below shows how to set all values below zero to zero.
.. _histogram:

``histogram``
-------------------
-------------

This function calculates histograms.

Expand Down
2 changes: 1 addition & 1 deletion esmvalcore/iris_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def add_leading_dim_to_cube(cube, dim_coord):

Raises
------
CoordinateMultiDimError
iris.exceptions.CoordinateMultiDimError
``dim_coord`` is not 1D.

"""
Expand Down
5 changes: 3 additions & 2 deletions esmvalcore/preprocessor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
mask_outside_range,
)
from ._multimodel import ensemble_statistics, multi_model_statistics
from ._other import clip, histogram
from ._other import clip, cumulative_sum, histogram
from ._regrid import (
extract_coordinate_points,
extract_levels,
Expand Down Expand Up @@ -146,7 +146,8 @@
# Other
"clip",
"rolling_window_statistics",
# Region selection
"cumulative_sum",
# Region operations
"extract_region",
"extract_shape",
"extract_volume",
Expand Down
86 changes: 84 additions & 2 deletions esmvalcore/preprocessor/_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import numpy as np
from iris.coords import Coord, DimCoord
from iris.cube import Cube
from iris.exceptions import CoordinateMultiDimError

from esmvalcore.iris_helpers import rechunk_cube
from esmvalcore.preprocessor._shared import (
get_all_coord_dims,
get_all_coords,
get_array_module,
get_coord_weights,
get_weights,
preserve_float_dtype,
)
Expand Down Expand Up @@ -58,6 +60,84 @@ def clip(cube, minimum=None, maximum=None):
return cube


@preserve_float_dtype
def cumulative_sum(
cube: Cube,
coord: Coord | str,
weights: np.ndarray | da.Array | bool | None = None,
method: Literal["sequential", "blelloch"] = "sequential",
) -> Cube:
"""Calculate cumulative sum of the elements along a given coordinate.

Parameters
----------
cube:
Input cube.
coord:
Coordinate over which the cumulative sum is calculated. Must be 0D or
1D.
weights:
Weights for the calculation of the cumulative sum. Each element in the
data is multiplied by the corresponding weight before summing. Can be
an array of the same shape as the input data, ``False`` or ``None`` (no
weighting), or ``True`` (calculate the weights from the coordinate
bounds; only works if each coordinate point has exactly 2 bounds).
method:
Method used to perform the cumulative sum. Only relevant if the cube
has `lazy data
<https://scitools-iris.readthedocs.io/en/stable/userguide/
real_and_lazy_data.html>`__. See :func:`dask.array.cumsum` for details.

Returns
-------
Cube
Cube of cumulative sum. Has same dimensions and coordinates of the
input cube.

Raises
------
iris.exceptions.CoordinateMultiDimError
``coord`` is not 0D or 1D.
iris.exceptions.CoordinateNotFoundError
``coord`` is not found in ``cube``.

"""
cube = cube.copy()

# Only 0D and 1D coordinates are supported
coord = cube.coord(coord)
if coord.ndim > 1:
raise CoordinateMultiDimError(coord)

# Weighting, make sure to adapt cube standard name and units in this case
if weights is True:
weights = get_coord_weights(cube, coord, broadcast=True)
if isinstance(weights, (np.ndarray, da.Array)):
cube.data = cube.core_data() * weights
cube.standard_name = None
cube.units = cube.units * coord.units

axes = get_all_coord_dims(cube, [coord])

# For 0D coordinates, cumulative_sum is a no-op (this aligns with
# numpy's/dask's behavior)
if axes:
if cube.has_lazy_data():
cube.data = da.cumsum(
cube.core_data(), axis=axes[0], method=method
)
else:
cube.data = np.cumsum(cube.core_data(), axis=axes[0])

# Adapt cube metadata
if cube.var_name is not None:
cube.var_name = f"cumulative_{cube.var_name}"
if cube.long_name is not None:
cube.long_name = f"Cumulative {cube.long_name}"

return cube


@preserve_float_dtype
def histogram(
cube: Cube,
Expand Down Expand Up @@ -133,8 +213,10 @@ def histogram(
Invalid `normalization` or `bin_range` given or `bin_range` is ``None``
and data is fully masked.
iris.exceptions.CoordinateNotFoundError
`longitude` is not found in cube if `weights=True`, `latitude` is in
`coords`, and no `cell_area` is given as
A given coordinate of ``coords`` is not found in ``cube``.
iris.exceptions.CoordinateNotFoundError
`longitude` is not found in cube if ``weights=True``, `latitude` is in
``coords``, and no `cell_area` is given as
:ref:`supplementary_variables`.

"""
Expand Down
88 changes: 57 additions & 31 deletions esmvalcore/preprocessor/_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,11 @@ def get_weights(
"""Calculate suitable weights for given coordinates."""
npx = get_array_module(cube.core_data())
weights = npx.ones_like(cube.core_data())
coords = [c.name() if hasattr(c, "name") else c for c in coords]

# Time weights: lengths of time interval
if "time" in coords:
weights = weights * broadcast_to_shape(
npx.array(get_time_weights(cube)),
cube.shape,
cube.coord_dims("time"),
chunks=cube.lazy_data().chunks if cube.has_lazy_data() else None,
)
weights = weights * get_coord_weights(cube, "time", broadcast=True)

# Latitude weights: cell areas
if "latitude" in coords:
Expand All @@ -319,9 +315,8 @@ def get_weights(
):
raise CoordinateNotFoundError(
f"Cube {cube.summary(shorten=True)} needs a `longitude` "
f"coordinate to calculate cell area weights for weighted "
f"distance metric over coordinates {coords} (alternatively, "
f"a `cell_area` can be given to the cube as supplementary "
f"coordinate to calculate cell area weights (alternatively, a "
f"`cell_area` can be given to the cube as supplementary "
f"variable)"
)
try_adding_calculated_cell_area(cube)
Expand All @@ -341,43 +336,74 @@ def get_weights(
return weights


def get_time_weights(cube: Cube) -> np.ndarray | da.core.Array:
"""Compute the weighting of the time axis.
def get_coord_weights(
cube: Cube,
coord: str | Coord,
broadcast: bool = False,
) -> np.ndarray | da.core.Array:
"""Compute weighting for an arbitrary coordinate.

Weights are calculated as the difference between the upper and lower
bounds.

Parameters
----------
cube:
Input cube.
coord:
Coordinate which is used to calculate the weights. Must have bounds
array with 2 bounds per point.
broadcast:
If ``False``, weights have the shape of ``coord``. If ``True``,
broadcast weights to shape of cube.

Returns
-------
np.ndarray or da.Array
Array of time weights for averaging. Returns a
:class:`dask.array.Array` if the input cube has lazy data; a
:class:`numpy.ndarray` otherwise.
Array of axis weights. Returns a :class:`dask.array.Array` if the input
cube has lazy data; a :class:`numpy.ndarray` otherwise.

"""
time = cube.coord("time")
coord_dims = cube.coord_dims("time")
coord = cube.coord(coord)
coord_dims = cube.coord_dims(coord)

# Multidimensional time coordinates are not supported: In this case,
# weights cannot be simply calculated as difference between the bounds
if len(coord_dims) > 1:
# Coordinate needs bounds of size 2
if not coord.has_bounds():
raise ValueError(
f"Cannot calculate weights for coordinate '{coord.name()}' "
f"without bounds"
)
if coord.core_bounds().shape[-1] != 2:
raise ValueError(
f"Weighted statistical operations are not supported for "
f"{len(coord_dims):d}D time coordinates, expected 0D or 1D"
f"Cannot calculate weights for coordinate '{coord.name()}' "
f"with {coord.core_bounds().shape[-1]} bounds per point, expected "
f"2 bounds per point"
)

# Extract 1D time weights (= lengths of time intervals)
time_weights = time.lazy_bounds()[:, 1] - time.lazy_bounds()[:, 0]
if cube.has_lazy_data():
# Align the weight chunks with the data chunks to avoid excessively
# large chunks as a result of broadcasting.
time_chunks = cube.lazy_data().chunks[coord_dims[0]]
time_weights = time_weights.rechunk(time_chunks)
else:
time_weights = time_weights.compute()
return time_weights
# Calculate weights of same shape as coordinate and make sure to use
# identical chunks as parent cube for non-scalar lazy data
weights = np.abs(coord.lazy_bounds()[:, 1] - coord.lazy_bounds()[:, 0])
if cube.has_lazy_data() and coord_dims:
coord_chunks = tuple(cube.lazy_data().chunks[d] for d in coord_dims)
weights = weights.rechunk(coord_chunks)
if not cube.has_lazy_data():
weights = weights.compute()

# Broadcast to cube shape if desired; scalar arrays needs special treatment
# since iris.broadcast_to_shape cannot handle this
if broadcast:
chunks = cube.lazy_data().chunks if cube.has_lazy_data() else None
if coord_dims:
weights = broadcast_to_shape(
weights, cube.shape, coord_dims, chunks=chunks
)
else:
if cube.has_lazy_data():
weights = da.broadcast_to(weights, cube.shape, chunks=chunks)
else:
weights = np.broadcast_to(weights, cube.shape)

return weights


def try_adding_calculated_cell_area(cube: Cube) -> None:
Expand Down
4 changes: 2 additions & 2 deletions esmvalcore/preprocessor/_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
from esmvalcore.cmor.fixes import get_next_month, get_time_bounds
from esmvalcore.iris_helpers import date2num, rechunk_cube
from esmvalcore.preprocessor._shared import (
get_coord_weights,
get_iris_aggregator,
get_time_weights,
preserve_float_dtype,
update_weights_kwargs,
)
Expand Down Expand Up @@ -867,7 +867,7 @@ def climate_statistics(
def _add_time_weights_coord(cube):
"""Add time weight coordinate to cube (in-place)."""
time_weights_coord = AuxCoord(
get_time_weights(cube),
get_coord_weights(cube, "time"),
long_name="_time_weights_",
units=cube.coord("time").units,
)
Expand Down
18 changes: 8 additions & 10 deletions esmvalcore/preprocessor/_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
from iris.cube import Cube
from iris.util import broadcast_to_shape

from ._shared import (
from esmvalcore.preprocessor._shared import (
get_coord_weights,
get_iris_aggregator,
get_normalized_cube,
preserve_float_dtype,
try_adding_calculated_cell_area,
update_weights_kwargs,
)
from ._supplementary_vars import register_supplementaries
from esmvalcore.preprocessor._supplementary_vars import (
register_supplementaries,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -379,7 +382,6 @@ def axis_statistics(
cube,
_add_axis_stats_weights_coord,
coord=coord,
coord_dims=coord_dims,
)

with warnings.catch_warnings():
Expand All @@ -406,19 +408,15 @@ def axis_statistics(
return result


def _add_axis_stats_weights_coord(cube, coord, coord_dims):
def _add_axis_stats_weights_coord(cube, coord):
"""Add weights for axis_statistics to cube (in-place)."""
weights = np.abs(coord.lazy_bounds()[:, 1] - coord.lazy_bounds()[:, 0])
if cube.has_lazy_data():
coord_chunks = tuple(cube.lazy_data().chunks[d] for d in coord_dims)
weights = weights.rechunk(coord_chunks)
else:
weights = weights.compute()
weights = get_coord_weights(cube, coord)
weights_coord = AuxCoord(
weights,
long_name="_axis_statistics_weights_",
units=coord.units,
)
coord_dims = cube.coord_dims(coord)
cube.add_aux_coord(weights_coord, coord_dims)


Expand Down
Loading