Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pysteps.io with xarray #219

Merged
merged 17 commits into from
Aug 4, 2021
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ dependencies:
- pillow
- pyproj
- scipy
- xarray
1 change: 1 addition & 0 deletions environment_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ dependencies:
- cartopy>=0.18
- scikit-image
- pandas
- xarray
74 changes: 62 additions & 12 deletions pysteps/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""
import inspect
import uuid
import xarray as xr
from collections import defaultdict
from functools import wraps

Expand Down Expand Up @@ -67,7 +68,11 @@ def _postprocess_import(importer):
@wraps(importer)
def _import_with_postprocessing(*args, **kwargs):

precip, *other_args = importer(*args, **kwargs)
data_array = importer(*args, **kwargs)

if not isinstance(data_array, xr.DataArray):
array, _, metadata = data_array
data_array = _to_xarray(array, metadata)

_dtype = kwargs.get("dtype", dtype)

Expand All @@ -78,18 +83,13 @@ def _import_with_postprocessing(*args, **kwargs):
"The accepted values are: " + str(accepted_precisions)
)

if isinstance(precip, np.ma.MaskedArray):
invalid_mask = np.ma.getmaskarray(precip)
precip.data[invalid_mask] = fillna
else:
# If plain numpy arrays are used, the importers should indicate
# the invalid values with np.nan.
_fillna = kwargs.get("fillna", fillna)
if _fillna is not np.nan:
mask = ~np.isfinite(precip)
precip[mask] = _fillna
_fillna = kwargs.get("fillna", fillna)
if _fillna is not np.nan:
data_array = data_array.fillna(_fillna)

return (precip.astype(_dtype),) + tuple(other_args)
data_array = data_array.astype(_dtype)

return data_array

extra_kwargs_doc = """
Other Parameters
Expand Down Expand Up @@ -283,3 +283,53 @@ def _func_with_cache(*args, **kwargs):
return _func_with_cache

return _memoize


def _to_xarray(array, metadata):
"""Convert to xarray DataArray."""
x1, x2 = metadata["x1"], metadata["x2"]
y1, y2 = metadata["y1"], metadata["y2"]
xsize, ysize = metadata["xpixelsize"], metadata["ypixelsize"]
# x_coords = np.arange(x1, x2, xsize) + xsize / 2
# y_coords = np.arange(y1, y2, ysize) + ysize / 2
x_coords = np.arange(x1, x1 + xsize * array.shape[1], xsize) + xsize / 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the same result as the commented line in line 293, right? Well, we could remove one of the options in that case. :)

In addition, just to check, this line is meant to point to the cell centers? (in that case everything is fine!)

Copy link
Member

@cvelascof cvelascof Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should stay with DataArray for the moment for the core rutines. Said that however, xarray opens netCDF as Datasets so eventually user may need to select the variable from a dataset to run our core routines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About 'projection', CF conventtions stat that "A grid mapping variable may be referenced by a data variable in order to explicitly declare the coordinate reference system (CRS) used for the horizontal spatial coordinate values"
https://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#grid-mappings-and-projections
so if outputs are to be netCDF, it would be useful to have Datasets with both the variable and the 'grid mapping variable' (aka projection)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the same result as the commented line in line 293, right? Well, we could remove one of the options in that case. :)

It should, but at the moment the commented lines do not work because some importers wrongly report the x1, x2, y1, y2 coordinates as the coordinates of the corner pixels instead of the corners themselves... but we should use the commented code (it's safer), which is why I would prefer to leave them there for the time being.

In addition, just to check, this line is meant to point to the cell centers? (in that case everything is fine!)
Exactly!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should stay with DataArray for the moment for the core rutines. Said that however, xarray opens netCDF as Datasets so eventually user may need to select the variable from a dataset to run our core routines.

OK, let's stick to DataArrays for now, but we can easily switch to dataset at any moment before realising V2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About 'projection', CF conventtions stat that "A grid mapping variable may be referenced by a data variable in order to explicitly declare the coordinate reference system (CRS) used for the horizontal spatial coordinate values"
https://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#grid-mappings-and-projections
so if outputs are to be netCDF, it would be useful to have Datasets with both the variable and the 'grid mapping variable' (aka projection)

Certainly very important for the netcdf exporters, perhaps less internally to the code itself?

y_coords = np.arange(y1, y1 + ysize * array.shape[0], ysize) + ysize / 2

data_array = xr.DataArray(
data=array,
dims=("y", "x"),
coords=dict(
x=("x", x_coords),
y=("y", y_coords),
),
)

data_array.attrs.update(
{
"unit": metadata["unit"],
"accutime": metadata["accutime"],
"transform": metadata["transform"],
"zerovalue": metadata["zerovalue"],
"threshold": metadata["threshold"],
"zr_a": metadata.get("zr_a", None),
"zr_b": metadata.get("zr_b", None),
"institution": metadata["institution"],
"projection": metadata["projection"],
}
)

data_array.x.attrs.update(
{
"standard_name": "projection_x_coordinate",
"units": metadata["cartesian_unit"],
}
)

data_array.y.attrs.update(
{
"standard_name": "projection_y_coordinate",
"units": metadata["cartesian_unit"],
}
)

return data_array
18 changes: 12 additions & 6 deletions pysteps/io/importers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from functools import partial

import numpy as np
import xarray as xr
from matplotlib.pyplot import imread

from pysteps.decorators import postprocess_import
Expand Down Expand Up @@ -392,22 +393,27 @@ def import_mrms_grib(filename, extent=None, window_size=4, **kwargs):
pr = pyproj.Proj(proj_params)
proj_def = " ".join([f"+{key}={value} " for key, value in proj_params.items()])

xsize = grib_msg["iDirectionIncrementInDegrees"] * window_size[0]
ysize = grib_msg["jDirectionIncrementInDegrees"] * window_size[1]

x1, y1 = pr(ul_lon, lr_lat)
x2, y2 = pr(lr_lon, ul_lat)

metadata = dict(
xpixelsize=grib_msg["iDirectionIncrementInDegrees"] * window_size[0],
ypixelsize=grib_msg["jDirectionIncrementInDegrees"] * window_size[1],
institution="NOAA National Severe Storms Laboratory",
xpixelsize=xsize,
ypixelsize=ysize,
unit="mm/h",
accutime=2.0,
transform=None,
zerovalue=0,
projection=proj_def.strip(),
yorigin="upper",
threshold=_get_threshold_value(precip),
x1=x1,
x2=x2,
y1=y1,
y2=y2,
x1=x1 - xsize / 2,
x2=x2 + xsize / 2,
y1=y1 - ysize / 2,
y2=y2 + ysize / 2,
cartesian_unit="degrees",
)

Expand Down
58 changes: 25 additions & 33 deletions pysteps/io/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
"""

import numpy as np
import xarray as xr


def read_timeseries(inputfns, importer, **kwargs):
def read_timeseries(input_fns, importer, **kwargs):
"""Read a time series of input files using the methods implemented in the
:py:mod:`pysteps.io.importers` module and stack them into a 3d array of
shape (num_timesteps, height, width).
:py:mod:`pysteps.io.importers` module and concatenate them into a 3d array
of shape (t, y, x).

Parameters
----------
inputfns: tuple
input_fns: tuple
Input files returned by a function implemented in the
:py:mod:`pysteps.io.archive` module.
importer: function
Expand All @@ -31,50 +32,41 @@ def read_timeseries(inputfns, importer, **kwargs):

Returns
-------
out: tuple
A three-element tuple containing the read data and quality rasters and
out: xr.Dataset
A xarray Dataset containing the read data and quality rasters and
dnerini marked this conversation as resolved.
Show resolved Hide resolved
associated metadata. If an input file name is None, the corresponding
precipitation and quality fields are filled with nan values. If all
input file names are None or if the length of the file name list is
zero, a three-element tuple containing None values is returned.

fields are filled with nan values.
If all input file names are None or if the length of the file name list
is zero, None is returned.
"""

# check for missing data
precip_ref = None
if all(ifn is None for ifn in inputfns):
return None, None, None
if all(ifn is None for ifn in input_fns):
return None
else:
if len(inputfns[0]) == 0:
return None, None, None
for ifn in inputfns[0]:
if len(input_fns[0]) == 0:
return None
for ifn in input_fns[0]:
if ifn is not None:
precip_ref, quality_ref, metadata = importer(ifn, **kwargs)
precip_ref = importer(ifn, **kwargs)
break

if precip_ref is None:
return None, None, None
return None

precip = []
quality = []
timestamps = []
for i, ifn in enumerate(inputfns[0]):
for i, ifn in enumerate(input_fns[0]):
if ifn is not None:
precip_, quality_, _ = importer(ifn, **kwargs)
precip_ = importer(ifn, **kwargs)
precip.append(precip_)
quality.append(quality_)
timestamps.append(inputfns[1][i])
timestamps.append(input_fns[1][i])
else:
precip.append(precip_ref * np.nan)
if quality_ref is not None:
quality.append(quality_ref * np.nan)
else:
quality.append(None)
timestamps.append(inputfns[1][i])
precip.append(xr.full_like(precip_ref, np.nan))
timestamps.append(input_fns[1][i])

# Replace this with stack?
precip = np.concatenate([precip_[None, :, :] for precip_ in precip])
# TODO: Q should be organized as R, but this is not trivial as Q_ can be also None or a scalar
metadata["timestamps"] = np.array(timestamps)
precip = xr.concat(precip, "t")
precip = precip.assign_coords({"t": timestamps})

return precip, quality, metadata
return precip
33 changes: 8 additions & 25 deletions pysteps/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def get_precipitation_fields(
num_prev_files=0,
num_next_files=0,
return_raw=False,
metadata=False,
upscale=None,
source="mch",
**importer_kwargs,
Expand Down Expand Up @@ -72,9 +71,6 @@ def get_precipitation_fields(
The pre-processing steps are: 1) Convert to mm/h,
2) Mask invalid values, 3) Log-transform the data [dBR].

metadata: bool, optional
If True, also return file metadata.

upscale: float or None, optional
Upscale fields in space during the pre-processing steps.
If it is None, the precipitation field is not
Expand All @@ -93,9 +89,7 @@ def get_precipitation_fields(

Returns
-------
reference_field : array

metadata : dict
data_array : xr.DataArray
"""

if source == "bom":
Expand Down Expand Up @@ -153,42 +147,31 @@ def get_precipitation_fields(
# Read the radar composites
importer = io.get_method(importer_name, "importer")

reference_field, __, ref_metadata = io.read_timeseries(
fns, importer, **_importer_kwargs
)
data_array = io.read_timeseries(fns, importer, **_importer_kwargs)

if not return_raw:

if (num_prev_files == 0) and (num_next_files == 0):
# Remove time dimension
reference_field = np.squeeze(reference_field)
reference_field = np.squeeze(data_array)
dnerini marked this conversation as resolved.
Show resolved Hide resolved

# Convert to mm/h
reference_field, ref_metadata = stp.utils.to_rainrate(
reference_field, ref_metadata
)
data_array = stp.utils.to_rainrate(data_array)

# Upscale data to 2 km
reference_field, ref_metadata = aggregate_fields_space(
reference_field, ref_metadata, upscale
)
data_array = aggregate_fields_space(data_array, upscale)

# Mask invalid values
reference_field = np.ma.masked_invalid(reference_field)
data_array = np.ma.masked_invalid(data_array)

# Log-transform the data [dBR]
reference_field, ref_metadata = stp.utils.dB_transform(
reference_field, ref_metadata, threshold=0.1, zerovalue=-15.0
)
data_array = stp.utils.dB_transform(data_array, threshold=0.1, zerovalue=-15.0)

# Set missing values with the fill value
np.ma.set_fill_value(reference_field, -15.0)
reference_field.data[reference_field.mask] = -15.0
dnerini marked this conversation as resolved.
Show resolved Hide resolved

if metadata:
return reference_field, ref_metadata

return reference_field
return data_array
dnerini marked this conversation as resolved.
Show resolved Hide resolved


def smart_assert(actual_value, expected, tolerance=None):
Expand Down
35 changes: 35 additions & 0 deletions pysteps/tests/test_io_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from datetime import datetime

import pytest

import pysteps


def test_find_by_date_mch():

pytest.importorskip("PIL")

date = datetime.strptime("201505151630", "%Y%m%d%H%M")
data_source = pysteps.rcparams.data_sources["mch"]
root_path = data_source["root_path"]
path_fmt = data_source["path_fmt"]
fn_pattern = data_source["fn_pattern"]
fn_ext = data_source["fn_ext"]
timestep = data_source["timestep"]

fns = pysteps.io.archive.find_by_date(
date,
root_path,
path_fmt,
fn_pattern,
fn_ext,
timestep=timestep,
num_prev_files=1,
num_next_files=1,
)

assert len(fns) == 2
assert len(fns[0]) == 3
assert len(fns[1]) == 3
assert isinstance(fns[0][0], str)
assert isinstance(fns[1][0], datetime)
Loading