Skip to content

Commit 4c25b6e

Browse files
committed
Parallel load datasets
1 parent 2247a29 commit 4c25b6e

File tree

1 file changed

+67
-9
lines changed

1 file changed

+67
-9
lines changed

esmvalcore/dataset.py

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from pathlib import Path
1313
from typing import Any, Iterator, Sequence, Union
1414

15+
import dask
1516
from iris.cube import Cube
1617

1718
from esmvalcore import esgf, local
@@ -79,6 +80,10 @@ def _ismatch(facet_value: FacetValue, pattern: FacetValue) -> bool:
7980
and fnmatchcase(facet_value, pattern))
8081

8182

83+
def _first(elems):
84+
return elems[0]
85+
86+
8287
class Dataset:
8388
"""Define datasets, find the related files, and load them.
8489
@@ -664,9 +669,19 @@ def files(self) -> Sequence[File]:
664669
def files(self, value):
665670
self._files = value
666671

667-
def load(self) -> Cube:
672+
def load(self, compute=True) -> Cube:
668673
"""Load dataset.
669674
675+
Parameters
676+
----------
677+
compute:
678+
If :obj:`True`, return the cube immediately. If :obj:`False`,
679+
return a :class:`~dask.delayed.Delayed` object that can be used
680+
to load the cube by calling its
681+
:func:`~dask.delayed.Delayed.compute` method. Multiple datasets
682+
can be loaded in parallel by passing a list of such delayeds
683+
to :func:`dask.compute`.
684+
670685
Raises
671686
------
672687
InputFilesNotFound
@@ -689,7 +704,7 @@ def load(self) -> Cube:
689704
supplementary_cubes.append(supplementary_cube)
690705

691706
output_file = _get_output_file(self.facets, self.session.preproc_dir)
692-
cubes = preprocess(
707+
cubes = dask.delayed(preprocess)(
693708
[cube],
694709
'add_supplementary_variables',
695710
input_files=input_files,
@@ -698,7 +713,10 @@ def load(self) -> Cube:
698713
supplementary_cubes=supplementary_cubes,
699714
)
700715

701-
return cubes[0]
716+
cube = dask.delayed(_first)(cubes)
717+
if compute:
718+
return cube.compute()
719+
return cube
702720

703721
def _load(self) -> Cube:
704722
"""Load self.files into an iris cube and return it."""
@@ -763,21 +781,61 @@ def _load(self) -> Cube:
763781
'short_name': self.facets['short_name'],
764782
}
765783

766-
result = [
784+
input_files = [
767785
file.local_file(self.session['download_dir']) if isinstance(
768786
file, esgf.ESGFFile) else file for file in self.files
769787
]
770-
for step, kwargs in settings.items():
771-
result = preprocess(
788+
789+
debug = self.session['save_intermediary_cubes']
790+
791+
result = []
792+
for input_file in input_files:
793+
files = dask.delayed(preprocess)(
794+
[input_file],
795+
'fix_file',
796+
input_files=[input_file],
797+
output_file=output_file,
798+
debug=debug,
799+
**settings['fix_file'],
800+
)
801+
cubes = dask.delayed(preprocess)(
802+
files,
803+
'load',
804+
input_files=[input_file],
805+
output_file=output_file,
806+
debug=debug,
807+
**settings['load'],
808+
)
809+
cubes = dask.delayed(preprocess)(
810+
cubes,
811+
'fix_metadata',
812+
input_files=[input_file],
813+
output_file=output_file,
814+
debug=debug,
815+
**settings['fix_metadata'],
816+
)
817+
cube = dask.delayed(_first)(cubes)
818+
result.append(cube)
819+
820+
result = dask.delayed(preprocess)(
821+
result,
822+
'concatenate',
823+
input_files=input_files,
824+
output_file=output_file,
825+
debug=debug,
826+
**settings['concatenate'],
827+
)
828+
for step, kwargs in dict(tuple(settings.items())[4:]).items():
829+
result = dask.delayed(preprocess)(
772830
result,
773831
step,
774-
input_files=self.files,
832+
input_files=input_files,
775833
output_file=output_file,
776-
debug=self.session['save_intermediary_cubes'],
834+
debug=debug,
777835
**kwargs,
778836
)
779837

780-
cube = result[0]
838+
cube = dask.delayed(_first)(result)
781839
return cube
782840

783841
def from_ranges(self) -> list['Dataset']:

0 commit comments

Comments
 (0)