Skip to content

Commit 99f8446

Browse files
jhammandcherian
andauthored
Attempt to reproduce #7079 in CI (#7488)
* tempoarily remove iris from ci, trying to reproduce #7079 * add parallel=True test when using dask cluster * lint * add local scheduler test * pin netcdf version >= 1.6.1 * Update ci/requirements/environment-py311.yml * Update ci/requirements/environment.yml * Update ci/requirements/environment.yml --------- Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
1 parent 2b444af commit 99f8446

File tree

1 file changed

+51
-4
lines changed

1 file changed

+51
-4
lines changed

xarray/tests/test_distributed.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ def test_dask_distributed_write_netcdf_with_dimensionless_variables(
126126

127127
@requires_cftime
128128
@requires_netCDF4
129-
def test_open_mfdataset_can_open_files_with_cftime_index(tmp_path):
129+
@pytest.mark.parametrize("parallel", (True, False))
130+
def test_open_mfdataset_can_open_files_with_cftime_index(parallel, tmp_path):
130131
T = xr.cftime_range("20010101", "20010501", calendar="360_day")
131132
Lon = np.arange(100)
132133
data = np.random.random((T.size, Lon.size))
@@ -135,9 +136,55 @@ def test_open_mfdataset_can_open_files_with_cftime_index(tmp_path):
135136
da.to_netcdf(file_path)
136137
with cluster() as (s, [a, b]):
137138
with Client(s["address"]):
138-
for parallel in (False, True):
139-
with xr.open_mfdataset(file_path, parallel=parallel) as tf:
140-
assert_identical(tf["test"], da)
139+
with xr.open_mfdataset(file_path, parallel=parallel) as tf:
140+
assert_identical(tf["test"], da)
141+
142+
143+
@requires_cftime
144+
@requires_netCDF4
145+
@pytest.mark.parametrize("parallel", (True, False))
146+
def test_open_mfdataset_multiple_files_parallel_distributed(parallel, tmp_path):
147+
lon = np.arange(100)
148+
time = xr.cftime_range("20010101", periods=100, calendar="360_day")
149+
data = np.random.random((time.size, lon.size))
150+
da = xr.DataArray(data, coords={"time": time, "lon": lon}, name="test")
151+
152+
fnames = []
153+
for i in range(0, 100, 10):
154+
fname = tmp_path / f"test_{i}.nc"
155+
da.isel(time=slice(i, i + 10)).to_netcdf(fname)
156+
fnames.append(fname)
157+
158+
with cluster() as (s, [a, b]):
159+
with Client(s["address"]):
160+
with xr.open_mfdataset(
161+
fnames, parallel=parallel, concat_dim="time", combine="nested"
162+
) as tf:
163+
assert_identical(tf["test"], da)
164+
165+
166+
# TODO: move this to test_backends.py
167+
@requires_cftime
168+
@requires_netCDF4
169+
@pytest.mark.parametrize("parallel", (True, False))
170+
def test_open_mfdataset_multiple_files_parallel(parallel, tmp_path):
171+
lon = np.arange(100)
172+
time = xr.cftime_range("20010101", periods=100, calendar="360_day")
173+
data = np.random.random((time.size, lon.size))
174+
da = xr.DataArray(data, coords={"time": time, "lon": lon}, name="test")
175+
176+
fnames = []
177+
for i in range(0, 100, 10):
178+
fname = tmp_path / f"test_{i}.nc"
179+
da.isel(time=slice(i, i + 10)).to_netcdf(fname)
180+
fnames.append(fname)
181+
182+
for get in [dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync, None]:
183+
with dask.config.set(scheduler=get):
184+
with xr.open_mfdataset(
185+
fnames, parallel=parallel, concat_dim="time", combine="nested"
186+
) as tf:
187+
assert_identical(tf["test"], da)
141188

142189

143190
@pytest.mark.parametrize("engine,nc_format", ENGINES_AND_FORMATS)

0 commit comments

Comments
 (0)