6
6
import re
7
7
import textwrap
8
8
import uuid
9
+ from collections .abc import Iterable
9
10
from copy import deepcopy
10
11
from fnmatch import fnmatchcase
11
12
from itertools import groupby
12
13
from pathlib import Path
13
- from typing import Any , Iterator , Sequence , Union
14
+ from typing import Any , Iterator , Sequence , TypeVar , Union
14
15
15
16
import dask
17
+ from dask .delayed import Delayed
16
18
from iris .cube import Cube
17
19
18
20
from esmvalcore import esgf , local
@@ -80,8 +82,12 @@ def _ismatch(facet_value: FacetValue, pattern: FacetValue) -> bool:
80
82
and fnmatchcase (facet_value , pattern ))
81
83
82
84
83
- def _first (elems ):
84
- return elems [0 ]
85
+ T = TypeVar ('T' )
86
+
87
+
88
+ def _first (elems : Iterable [T ]) -> T :
89
+ """Return the first element."""
90
+ return next (iter (elems ))
85
91
86
92
87
93
class Dataset :
@@ -669,16 +675,16 @@ def files(self) -> Sequence[File]:
669
675
def files (self , value ):
670
676
self ._files = value
671
677
672
- def load (self , compute = True ) -> Cube :
678
+ def load (self , compute = True ) -> Cube | Delayed :
673
679
"""Load dataset.
674
680
675
681
Parameters
676
682
----------
677
683
compute:
678
- If :obj:`True`, return the cube immediately. If :obj:`False`,
679
- return a :class:`~dask.delayed.Delayed` object that can be used
680
- to load the cube by calling its
681
- :func :`~dask.delayed.Delayed.compute` method. Multiple datasets
684
+ If :obj:`True`, return the :class:`~iris.cube.Cube` immediately.
685
+ If :obj:`False`, return a :class:`~dask.delayed.Delayed` object
686
+ that can be used to load the cube by calling its
687
+ :meth :`~dask.delayed.Delayed.compute` method. Multiple datasets
682
688
can be loaded in parallel by passing a list of such delayeds
683
689
to :func:`dask.compute`.
684
690
@@ -731,7 +737,14 @@ def _load(self) -> Cube:
731
737
msg = "\n " .join (lines )
732
738
raise InputFilesNotFound (msg )
733
739
740
+ input_files = [
741
+ file .local_file (self .session ['download_dir' ]) if isinstance (
742
+ file , esgf .ESGFFile ) else file for file in self .files
743
+ ]
734
744
output_file = _get_output_file (self .facets , self .session .preproc_dir )
745
+ debug = self .session ['save_intermediary_cubes' ]
746
+
747
+ # Load all input files and concatenate them.
735
748
fix_dir_prefix = Path (
736
749
self .session ._fixed_file_dir ,
737
750
self ._get_joined_summary_facets ('_' , join_lists = True ) + '_' ,
@@ -757,36 +770,6 @@ def _load(self) -> Cube:
757
770
settings ['concatenate' ] = {
758
771
'check_level' : self .session ['check_level' ]
759
772
}
760
- settings ['cmor_check_metadata' ] = {
761
- 'check_level' : self .session ['check_level' ],
762
- 'cmor_table' : self .facets ['project' ],
763
- 'mip' : self .facets ['mip' ],
764
- 'frequency' : self .facets ['frequency' ],
765
- 'short_name' : self .facets ['short_name' ],
766
- }
767
- if 'timerange' in self .facets :
768
- settings ['clip_timerange' ] = {
769
- 'timerange' : self .facets ['timerange' ],
770
- }
771
- settings ['fix_data' ] = {
772
- 'check_level' : self .session ['check_level' ],
773
- 'session' : self .session ,
774
- ** self .facets ,
775
- }
776
- settings ['cmor_check_data' ] = {
777
- 'check_level' : self .session ['check_level' ],
778
- 'cmor_table' : self .facets ['project' ],
779
- 'mip' : self .facets ['mip' ],
780
- 'frequency' : self .facets ['frequency' ],
781
- 'short_name' : self .facets ['short_name' ],
782
- }
783
-
784
- input_files = [
785
- file .local_file (self .session ['download_dir' ]) if isinstance (
786
- file , esgf .ESGFFile ) else file for file in self .files
787
- ]
788
-
789
- debug = self .session ['save_intermediary_cubes' ]
790
773
791
774
result = []
792
775
for input_file in input_files :
@@ -798,6 +781,7 @@ def _load(self) -> Cube:
798
781
debug = debug ,
799
782
** settings ['fix_file' ],
800
783
)
784
+ # Multiple cubes may be present in a file.
801
785
cubes = dask .delayed (preprocess )(
802
786
files ,
803
787
'load' ,
@@ -806,6 +790,7 @@ def _load(self) -> Cube:
806
790
debug = debug ,
807
791
** settings ['load' ],
808
792
)
793
+ # Combine the cubes into a single cube per file.
809
794
cubes = dask .delayed (preprocess )(
810
795
cubes ,
811
796
'fix_metadata' ,
@@ -817,6 +802,7 @@ def _load(self) -> Cube:
817
802
cube = dask .delayed (_first )(cubes )
818
803
result .append (cube )
819
804
805
+ # Concatenate the cubes from all files.
820
806
result = dask .delayed (preprocess )(
821
807
result ,
822
808
'concatenate' ,
@@ -825,7 +811,34 @@ def _load(self) -> Cube:
825
811
debug = debug ,
826
812
** settings ['concatenate' ],
827
813
)
828
- for step , kwargs in dict (tuple (settings .items ())[4 :]).items ():
814
+
815
+ # At this point `result` is a list containing a single cube. Apply the
816
+ # remaining preprocessor functions to this cube.
817
+ settings .clear ()
818
+ settings ['cmor_check_metadata' ] = {
819
+ 'check_level' : self .session ['check_level' ],
820
+ 'cmor_table' : self .facets ['project' ],
821
+ 'mip' : self .facets ['mip' ],
822
+ 'frequency' : self .facets ['frequency' ],
823
+ 'short_name' : self .facets ['short_name' ],
824
+ }
825
+ if 'timerange' in self .facets :
826
+ settings ['clip_timerange' ] = {
827
+ 'timerange' : self .facets ['timerange' ],
828
+ }
829
+ settings ['fix_data' ] = {
830
+ 'check_level' : self .session ['check_level' ],
831
+ 'session' : self .session ,
832
+ ** self .facets ,
833
+ }
834
+ settings ['cmor_check_data' ] = {
835
+ 'check_level' : self .session ['check_level' ],
836
+ 'cmor_table' : self .facets ['project' ],
837
+ 'mip' : self .facets ['mip' ],
838
+ 'frequency' : self .facets ['frequency' ],
839
+ 'short_name' : self .facets ['short_name' ],
840
+ }
841
+ for step , kwargs in settings .items ():
829
842
result = dask .delayed (preprocess )(
830
843
result ,
831
844
step ,
0 commit comments