-
Notifications
You must be signed in to change notification settings - Fork 53
/
format.py
340 lines (281 loc) · 11.4 KB
/
format.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
"""Hierarchy of format OME-Zarr implementations."""
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterator, List, Optional
from zarr.storage import FSStore
LOGGER = logging.getLogger("ome_zarr.format")
def format_from_version(version: str) -> "Format":
for fmt in format_implementations():
# Support floating-point versions like `0.2`
if isinstance(version, float):
version = str(version)
if fmt.version == version:
return fmt
raise ValueError(f"Version {version} not recognized")
def format_implementations() -> Iterator["Format"]:
"""
Return an instance of each format implementation, newest to oldest.
"""
yield FormatV04()
yield FormatV03()
yield FormatV02()
yield FormatV01()
def detect_format(metadata: dict, default: "Format") -> "Format":
"""
Give each format implementation a chance to take ownership of the
given metadata. If none matches, the default value will be returned.
"""
if metadata:
for fmt in format_implementations():
if fmt.matches(metadata):
return fmt
return default
class Format(ABC):
@property
@abstractmethod
def version(self) -> str: # pragma: no cover
raise NotImplementedError()
@abstractmethod
def matches(self, metadata: dict) -> bool: # pragma: no cover
raise NotImplementedError()
@abstractmethod
def init_store(self, path: str, mode: str = "r") -> FSStore:
raise NotImplementedError()
# @abstractmethod
def init_channels(self) -> None: # pragma: no cover
raise NotImplementedError()
def _get_metadata_version(self, metadata: dict) -> Optional[str]:
"""
Checks the metadata dict for a version
Returns the version of the first object found in the metadata,
checking for 'multiscales', 'plate', 'well' etc
"""
multiscales = metadata.get("multiscales", [])
if multiscales:
dataset = multiscales[0]
return dataset.get("version", None)
for name in ["plate", "well", "image-label"]:
obj = metadata.get(name, None)
if obj:
return obj.get("version", None)
return None
def __repr__(self) -> str:
return self.__class__.__name__
def __eq__(self, other: object) -> bool:
return self.__class__ == other.__class__
@abstractmethod
def generate_well_dict(
self, well: str, rows: List[str], columns: List[str]
) -> dict: # pragma: no cover
raise NotImplementedError()
@abstractmethod
def validate_well_dict(
self, well: dict, rows: List[str], columns: List[str]
) -> None: # pragma: no cover
raise NotImplementedError()
@abstractmethod
def generate_coordinate_transformations(
self, shapes: List[tuple]
) -> Optional[List[List[Dict[str, Any]]]]: # pragma: no cover
raise NotImplementedError()
@abstractmethod
def validate_coordinate_transformations(
self,
ndim: int,
nlevels: int,
coordinate_transformations: Optional[List[List[Dict[str, Any]]]] = None,
) -> Optional[List[List[Dict[str, Any]]]]: # pragma: no cover
raise NotImplementedError()
class FormatV01(Format):
"""
Initial format. (2020)
"""
REQUIRED_PLATE_WELL_KEYS: Dict[str, type] = {"path": str}
@property
def version(self) -> str:
return "0.1"
def matches(self, metadata: dict) -> bool:
version = self._get_metadata_version(metadata)
LOGGER.debug("%s matches %s?", self.version, version)
return version == self.version
def init_store(self, path: str, mode: str = "r") -> FSStore:
store = FSStore(path, mode=mode, dimension_separator=".")
LOGGER.debug("Created legacy flat FSStore(%s, %s)", path, mode)
return store
def generate_well_dict(
self, well: str, rows: List[str], columns: List[str]
) -> dict:
return {"path": str(well)}
def validate_well_dict(
self, well: dict, rows: List[str], columns: List[str]
) -> None:
if any(e not in self.REQUIRED_PLATE_WELL_KEYS for e in well.keys()):
LOGGER.debug("%s contains unspecified keys", well)
for key, key_type in self.REQUIRED_PLATE_WELL_KEYS.items():
if key not in well:
raise ValueError(
"%s must contain a %s key of type %s", well, key, key_type
)
if not isinstance(well[key], key_type):
raise ValueError("%s path must be of %s type", well, key_type)
def generate_coordinate_transformations(
self, shapes: List[tuple]
) -> Optional[List[List[Dict[str, Any]]]]:
return None
def validate_coordinate_transformations(
self,
ndim: int,
nlevels: int,
coordinate_transformations: Optional[List[List[Dict[str, Any]]]] = None,
) -> None:
return None
class FormatV02(FormatV01):
"""
Changelog: move to nested storage (April 2021)
"""
@property
def version(self) -> str:
return "0.2"
def init_store(self, path: str, mode: str = "r") -> FSStore:
"""
Not ideal. Stores should remain hidden
TODO: could also check dimension_separator
"""
kwargs = {
"dimension_separator": "/",
"normalize_keys": False,
}
mkdir = True
if "r" in mode or path.startswith("http") or path.startswith("s3"):
# Could be simplified on the fsspec side
mkdir = False
if mkdir:
kwargs["auto_mkdir"] = True
store = FSStore(
path,
mode=mode,
**kwargs,
) # TODO: open issue for using Path
LOGGER.debug("Created nested FSStore(%s, %s, %s)", path, mode, kwargs)
return store
class FormatV03(FormatV02): # inherits from V02 to avoid code duplication
"""
Changelog: variable number of dimensions (up to 5),
introduce axes field in multiscales (June 2021)
"""
@property
def version(self) -> str:
return "0.3"
class FormatV04(FormatV03):
"""
Changelog: axes is list of dicts,
introduce coordinate_transformations in multiscales (Nov 2021)
"""
REQUIRED_PLATE_WELL_KEYS = {"path": str, "rowIndex": int, "columnIndex": int}
@property
def version(self) -> str:
return "0.4"
def generate_well_dict(
self, well: str, rows: List[str], columns: List[str]
) -> dict:
row, column = well.split("/")
if row not in rows:
raise ValueError("%s is not defined in the list of rows", row)
rowIndex = rows.index(row)
if column not in columns:
raise ValueError("%s is not defined in the list of columns", column)
columnIndex = columns.index(column)
return {"path": str(well), "rowIndex": rowIndex, "columnIndex": columnIndex}
def validate_well_dict(
self, well: dict, rows: List[str], columns: List[str]
) -> None:
super().validate_well_dict(well, rows, columns)
if len(well["path"].split("/")) != 2:
raise ValueError("%s path must exactly be composed of 2 groups", well)
row, column = well["path"].split("/")
if row not in rows:
raise ValueError("%s is not defined in the plate rows", row)
if well["rowIndex"] != rows.index(row):
raise ValueError("Mismatching row index for %s", well)
if column not in columns:
raise ValueError("%s is not defined in the plate columns", column)
if well["columnIndex"] != columns.index(column):
raise ValueError("Mismatching column index for %s", well)
def generate_coordinate_transformations(
self, shapes: List[tuple]
) -> Optional[List[List[Dict[str, Any]]]]:
data_shape = shapes[0]
coordinate_transformations: List[List[Dict[str, Any]]] = []
# calculate minimal 'scale' transform based on pyramid dims
for shape in shapes:
assert len(shape) == len(data_shape)
scale = [full / level for full, level in zip(data_shape, shape)]
coordinate_transformations.append([{"type": "scale", "scale": scale}])
return coordinate_transformations
def validate_coordinate_transformations(
self,
ndim: int,
nlevels: int,
coordinate_transformations: Optional[List[List[Dict[str, Any]]]] = None,
) -> None:
"""
Validates that a list of dicts contains a 'scale' transformation
Raises ValueError if no 'scale' found or doesn't match ndim
:param ndim: Number of image dimensions
"""
if coordinate_transformations is None:
raise ValueError("coordinate_transformations must be provided")
ct_count = len(coordinate_transformations)
if ct_count != nlevels:
raise ValueError(
"coordinate_transformations count: %s must match datasets %s"
% (ct_count, nlevels)
)
for transformations in coordinate_transformations:
assert isinstance(transformations, list)
types = [t.get("type", None) for t in transformations]
if any([t is None for t in types]):
raise ValueError("Missing type in: %s" % transformations)
# validate scales...
if sum(t == "scale" for t in types) != 1:
raise ValueError(
"Must supply 1 'scale' item in coordinate_transformations"
)
# first transformation must be scale
if types[0] != "scale":
raise ValueError("First coordinate_transformations must be 'scale'")
first = transformations[0]
if "scale" not in transformations[0]:
raise ValueError("Missing scale argument in: %s" % first)
scale = first["scale"]
if len(scale) != ndim:
raise ValueError(
"'scale' list %s must match number of image dimensions: %s"
% (scale, ndim)
)
for value in scale:
if not isinstance(value, (float, int)):
raise ValueError(f"'scale' values must all be numbers: {scale}")
# validate translations...
translation_types = [t == "translation" for t in types]
if sum(translation_types) > 1:
raise ValueError(
"Must supply 0 or 1 'translation' item in"
"coordinate_transformations"
)
elif sum(translation_types) == 1:
transformation = transformations[types.index("translation")]
if "translation" not in transformation:
raise ValueError("Missing scale argument in: %s" % first)
translation = transformation["translation"]
if len(translation) != ndim:
raise ValueError(
"'translation' list %s must match image dimensions count: %s"
% (translation, ndim)
)
for value in translation:
if not isinstance(value, (float, int)):
raise ValueError(
f"'translation' values must all be numbers: {translation}"
)
CurrentFormat = FormatV04