-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
common.py
231 lines (180 loc) · 7.32 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import itertools
import logging
import time
import traceback
import threading
from collections import Mapping
from distutils.version import StrictVersion
from ..conventions import cf_encoder
from ..core.utils import FrozenOrderedDict
from ..core.pycompat import iteritems, dask_array_type, OrderedDict
# Create a logger object, but don't add any handlers. Leave that to user code.
logger = logging.getLogger(__name__)
NONE_VAR_NAME = '__values__'
def _encode_variable_name(name):
if name is None:
name = NONE_VAR_NAME
return name
def _decode_variable_name(name):
if name == NONE_VAR_NAME:
name = None
return name
def robust_getitem(array, key, catch=Exception, max_retries=6,
initial_delay=500):
"""
Robustly index an array, using retry logic with exponential backoff if any
of the errors ``catch`` are raised. The initial_delay is measured in ms.
With the default settings, the maximum delay will be in the range of 32-64
seconds.
"""
assert max_retries >= 0
for n in range(max_retries + 1):
try:
return array[key]
except catch:
if n == max_retries:
raise
base_delay = initial_delay * 2 ** n
next_delay = base_delay + np.random.randint(base_delay)
msg = ('getitem failed, waiting %s ms before trying again '
'(%s tries remaining). Full traceback: %s' %
(next_delay, max_retries - n, traceback.format_exc()))
logger.debug(msg)
time.sleep(1e-3 * next_delay)
class AbstractDataStore(Mapping):
def __iter__(self):
return iter(self.variables)
def __getitem__(self, key):
return self.variables[key]
def __len__(self):
return len(self.variables)
def get_attrs(self): # pragma: no cover
raise NotImplementedError
def get_variables(self): # pragma: no cover
raise NotImplementedError
def load(self):
"""
This loads the variables and attributes simultaneously.
A centralized loading function makes it easier to create
data stores that do automatic encoding/decoding.
For example:
class SuffixAppendingDataStore(AbstractDataStore):
def load(self):
variables, attributes = AbstractDataStore.load(self)
variables = {'%s_suffix' % k: v
for k, v in iteritems(variables)}
attributes = {'%s_suffix' % k: v
for k, v in iteritems(attributes)}
return variables, attributes
This function will be called anytime variables or attributes
are requested, so care should be taken to make sure its fast.
"""
variables = FrozenOrderedDict((_decode_variable_name(k), v)
for k, v in iteritems(self.get_variables()))
attributes = FrozenOrderedDict(self.get_attrs())
return variables, attributes
@property
def variables(self):
# Because encoding/decoding might happen which may require both the
# attributes and the variables, and because a store may be updated
# we need to load both the attributes and variables
# anytime either one is requested.
variables, _ = self.load()
return variables
@property
def attrs(self):
# Because encoding/decoding might happen which may require both the
# attributes and the variables, and because a store may be updated
# we need to load both the attributes and variables
# anytime either one is requested.
_, attributes = self.load()
return attributes
@property
def dimensions(self):
return self.get_dimensions()
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
self.close()
class ArrayWriter(object):
def __init__(self):
self.sources = []
self.targets = []
def add(self, source, target):
if isinstance(source, dask_array_type):
self.sources.append(source)
self.targets.append(target)
else:
target[...] = source
def sync(self):
if self.sources:
import dask.array as da
import dask
if StrictVersion(dask.__version__) > StrictVersion('0.8.1'):
da.store(self.sources, self.targets, lock=threading.Lock())
else:
da.store(self.sources, self.targets)
self.sources = []
self.targets = []
class AbstractWritableDataStore(AbstractDataStore):
def __init__(self, writer=None):
if writer is None:
writer = ArrayWriter()
self.writer = writer
def set_dimension(self, d, l): # pragma: no cover
raise NotImplementedError
def set_attribute(self, k, v): # pragma: no cover
raise NotImplementedError
def set_variable(self, k, v): # pragma: no cover
raise NotImplementedError
def sync(self):
self.writer.sync()
def store_dataset(self, dataset):
# in stores variables are all variables AND coordinates
# in xarray.Dataset variables are variables NOT coordinates,
# so here we pass the whole dataset in instead of doing
# dataset.variables
self.store(dataset, dataset.attrs)
def store(self, variables, attributes, check_encoding_set=frozenset()):
self.set_attributes(attributes)
self.set_variables(variables, check_encoding_set)
def set_attributes(self, attributes):
for k, v in iteritems(attributes):
self.set_attribute(k, v)
def set_variables(self, variables, check_encoding_set):
for vn, v in iteritems(variables):
name = _encode_variable_name(vn)
check = vn in check_encoding_set
target, source = self.prepare_variable(name, v, check)
self.writer.add(source, target)
def set_necessary_dimensions(self, variable):
for d, l in zip(variable.dims, variable.shape):
if d not in self.dimensions:
self.set_dimension(d, l)
class WritableCFDataStore(AbstractWritableDataStore):
def store(self, variables, attributes, check_encoding_set=frozenset()):
# All NetCDF files get CF encoded by default, without this attempting
# to write times, for example, would fail.
cf_variables, cf_attrs = cf_encoder(variables, attributes)
AbstractWritableDataStore.store(self, cf_variables, cf_attrs,
check_encoding_set)
class DataStorePickleMixin(object):
"""Subclasses must define `ds`, `_opener` and `_mode` attributes.
Do not subclass this class: it is not part of xarray's external API.
"""
def __getstate__(self):
state = self.__dict__.copy()
del state['ds']
if self._mode == 'w':
# file has already been created, don't override when restoring
state['_mode'] = 'a'
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.ds = self._opener(mode=self._mode)