diff --git a/README.md b/README.md
index effe9d9..d6b5374 100644
--- a/README.md
+++ b/README.md
@@ -37,14 +37,25 @@ or from conda:
conda install -c conda-forge nd2
```
-### extras
+### Legacy nd2 file support
-Legacy nd2 (JPEG2000) files are also supported, but require `imagecodecs`. To install with support for these files use:
+Legacy nd2 (JPEG2000) files are also supported, but require `imagecodecs`. To
+install with support for these files use the `legacy` extra:
```sh
pip install nd2[legacy]
```
+### Faster XML parsing
+
+Much of the metadata in the file stored as XML. If found in the environment,
+`nd2` will use [`lxml`](https://pypi.org/project/lxml/) which is much faster
+than the built-in `xml` module. To install with support for `lxml` use:
+
+```sh
+pip install nd2 lxml
+```
+
## usage and API
```python
diff --git a/pyproject.toml b/pyproject.toml
index f857fb5..84ffc94 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -9,7 +9,7 @@ description = "Yet another nd2 (Nikon NIS Elements) file reader"
readme = "README.md"
requires-python = ">=3.7"
license = { text = "BSD 3-Clause License" }
-authors = [{ email = "talley.lambert@gmail.com" }, { name = "Talley Lambert" }]
+authors = [{ email = "talley.lambert@gmail.com", name = "Talley Lambert" }]
classifiers = [
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: BSD License",
@@ -71,35 +71,27 @@ version-file = "src/nd2/_version.py"
only-include = ["src"]
sources = ["src"]
-# https://pycqa.github.io/isort/docs/configuration/options.html
-[tool.isort]
-profile = "black"
-src_paths = ["src/nd2", "tests"]
-
-# https://github.com/charliermarsh/ruff
+# https://beta.ruff.rs/docs/rules/
[tool.ruff]
line-length = 88
target-version = "py37"
src = ["src/nd2", "tests"]
select = [
- "E", # style errors
- "F", # flakes
- "D", # pydocstyle
- "I", # isort
- "UP", # pyupgrade
- "S", # bandit
- "C", # flake8-comprehensions
- "B", # flake8-bugbear
- "A001", # flake8-builtins
- "RUF", # ruff-specific rules
- "TCH", # flake8-type-checking
+ "E", # style errors
+ "F", # flakes
+ "D", # pydocstyle
+ "I", # isort
+ "UP", # pyupgrade
+ "S", # bandit
+ "C4", # flake8-comprehensions
+ "B", # flake8-bugbear
+ "A001", # flake8-builtins
+ "RUF", # ruff-specific rules
+ "SIM105", # contextlib.suppress
+ "TID", # tidy imports
+ "TCH", # flake8-type-checking
]
ignore = [
- # these should be fixed
- "D101",
- "D105",
- "D103",
- ###
"D100", # Missing docstring in public module
"D107", # Missing docstring in __init__
"D203", # 1 blank line required before class docstring
@@ -113,13 +105,14 @@ ignore = [
]
[tool.ruff.per-file-ignores]
+"src/nd2/structures.py" = ["D101", "D105"] # Fix someday
"tests/*.py" = ["D", "S"]
"scripts/*.py" = ["D", "S"]
# https://docs.pytest.org/en/6.2.x/customize.html
[tool.pytest.ini_options]
minversion = "6.0"
-addopts = '--color=yes'
+addopts = '--color=yes --cov-config=pyproject.toml'
testpaths = ["tests"]
filterwarnings = [
"error",
@@ -143,6 +136,7 @@ ignore_missing_imports = true
# https://coverage.readthedocs.io/en/6.4/config.html
[tool.coverage.report]
+show_missing = true
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
@@ -153,7 +147,7 @@ exclude_lines = [
]
[tool.coverage.run]
-omit = ["tests"]
+source = ["src"]
# https://github.com/mgedmin/check-manifest#configuration
[tool.check-manifest]
diff --git a/scripts/gather.py b/scripts/gather.py
index d35c137..b7eaccc 100644
--- a/scripts/gather.py
+++ b/scripts/gather.py
@@ -1,4 +1,5 @@
"""gather metadata from all files in test/data with all nd readers."""
+import contextlib
import json
from pathlib import Path
@@ -19,10 +20,9 @@ def get_nd2_stats(file) -> dict:
d["pixel_size"] = m.channels[0].volume.axesCalibration
d["shape"] = fh.shape
d["axes"] = fh.axes
- try:
+ with contextlib.suppress(Exception):
d["dtype"] = str(fh.dtype)
- except Exception:
- pass
+
fh.close()
return d
diff --git a/src/nd2/_binary.py b/src/nd2/_binary.py
index 2829eab..411aa26 100644
--- a/src/nd2/_binary.py
+++ b/src/nd2/_binary.py
@@ -157,7 +157,7 @@ def asarray(self) -> np.ndarray:
@classmethod
def from_nd2file(cls, nd2file: ND2File) -> BinaryLayers | None:
"""Extract binary layers from an ND2 file."""
- if nd2file.is_legacy:
+ if nd2file.is_legacy: # pragma: no cover
warnings.warn(
"`binary_data` is not supported for legacy ND2 files",
UserWarning,
@@ -166,15 +166,16 @@ def from_nd2file(cls, nd2file: ND2File) -> BinaryLayers | None:
return None
rdr = cast("LatestSDKReader", nd2file._rdr)
- binary_meta = rdr._decoded_custom_data_chunk(
- b"BinaryMetadata_v1!", strip_prefix=True
- )
-
- if not binary_meta:
+ try:
+ binary_meta = rdr._decode_chunk(
+ b"CustomDataVar|BinaryMetadata_v1!", strip_prefix=True
+ )
+ except KeyError:
return None
+
try:
items: dict = binary_meta["BinaryMetadata_v1"]
- except KeyError:
+ except KeyError: # pragma: no cover
warnings.warn(
"Could not find 'BinaryMetadata_v1' tag, please open an "
"issue with this file at https://github.com/tlambert03/nd2/issues/new",
@@ -182,14 +183,14 @@ def from_nd2file(cls, nd2file: ND2File) -> BinaryLayers | None:
)
return None
- binseqs = sorted(x for x in rdr._meta_map if "RleZipBinarySequence" in x)
+ binseqs = sorted(x for x in rdr.chunkmap if b"RleZipBinarySequence" in x)
mask_items = []
for _, item in sorted(items.items()):
- key = item["FileTag"]
+ key = item["FileTag"].encode()
_masks: list[np.ndarray | None] = []
for bs in binseqs:
if key in bs:
- data = rdr._load_chunk(f"{bs}!".encode())[4:]
+ data = rdr._load_chunk(bs)[4:]
_masks.append(_decode_binary_mask(data) if data else None)
mask_items.append(
BinaryLayer(
@@ -216,7 +217,6 @@ def _unpack(stream: io.BufferedIOBase, strct: struct.Struct) -> tuple:
def _decode_binary_mask(data: bytes, dtype: DTypeLike = "uint16") -> np.ndarray:
# this receives data as would be extracted from a
# `CustomDataSeq|RleZipBinarySequence...` section in the metadata
- # data = f._rdr._get_meta_chunk('CustomDataSeq|RleZipBinarySequence_1_v1|0')[:4]
# NOTE it is up to ND2File to strip the first 4 bytes... and not call this if there
# is no data (i.e. if the chunk is just '\x00')
diff --git a/src/nd2/_clx_lite.py b/src/nd2/_clx_lite.py
index 52f7f98..e1ec6b9 100644
--- a/src/nd2/_clx_lite.py
+++ b/src/nd2/_clx_lite.py
@@ -115,6 +115,8 @@ def _chunk_name_and_dtype(
data_type, name_length = strctBB.unpack(header)
if data_type == ELxLiteVariantType.COMPRESS:
+ # NOTE: the rois.nd2 test file has compressed metadata
+ # in b'CustomData|CustomDescriptionV1_0!'
raise NotImplementedError("Compressed metadata not yet implemented.")
if data_type in (ELxLiteVariantType.DEPRECATED, ELxLiteVariantType.UNKNOWN):
raise ValueError(f"Unknown data type in metadata header: {data_type}")
diff --git a/src/nd2/_clx_xml.py b/src/nd2/_clx_xml.py
index b949915..1109b3b 100644
--- a/src/nd2/_clx_xml.py
+++ b/src/nd2/_clx_xml.py
@@ -10,10 +10,11 @@
import lxml.etree
Element = Union[xml.etree.ElementTree.Element, lxml.etree._Element]
- Parser = Callable[[bytes], Element]
+ Parser = Callable[[bytes | str], Element]
Scalar = Union[float, str, int, bytearray, bool]
JsonValue = Union[Scalar, dict[str, "JsonValue"]]
XML: Parser
+ ParseError: Exception
else:
try:
@@ -73,7 +74,15 @@ def json_from_clx_variant(
on the XML structure. (A ... is the most
likely case where a scalar is returned.)
"""
- node = parser(bxml.split(b"?>", 1)[-1]) # strip xml header
+ if bxml.startswith(b"", 1)[-1] # strip xml header
+
+ try:
+ node = parser(bxml)
+ except SyntaxError: # when there are invalid characters in the XML
+ # could go straight to this ... not sure if it's slower
+ node = parser(bxml.decode(encoding="utf-8", errors="ignore"))
+
is_legacy = node.attrib.get("_VERSION") == "1.000000"
name, val = _node_name_value(node, strip_prefix, include_attrs=is_legacy)
@@ -123,7 +132,17 @@ def _node_name_value(
# NOTE: "no_name" is the standard name for a list-type node
# "BinaryItem" is a special case found in the BinaryMetadata_v1 tag...
# without special handling, you would only get the last item in the list
- if cname in ("no_name", None, "", "BinaryItem", "TextInfoItem"):
+ # FIXME: handle the special cases below "" better.
+ if cname in (
+ "no_name",
+ None,
+ "",
+ "BinaryItem",
+ "TextInfoItem",
+ "Wavelength",
+ "MinSrc",
+ "MaxSrc",
+ ):
if not cval:
# skip empty nodes ... the sdk does this too
continue
diff --git a/src/nd2/_legacy/_legacy.py b/src/nd2/_legacy/_legacy.py
index 5f67df7..6f523bc 100644
--- a/src/nd2/_legacy/_legacy.py
+++ b/src/nd2/_legacy/_legacy.py
@@ -13,8 +13,9 @@
import numpy as np
-from .. import structures as strct
-from .._util import AXIS, VoxelSize
+from nd2 import structures as strct
+from nd2._util import AXIS, VoxelSize
+
from ._legacy_xml import parse_xml_block
if TYPE_CHECKING:
diff --git a/src/nd2/_pysdk/_parse.py b/src/nd2/_pysdk/_parse.py
index 6c7118f..0fa710f 100644
--- a/src/nd2/_pysdk/_parse.py
+++ b/src/nd2/_pysdk/_parse.py
@@ -26,6 +26,7 @@
RawAttributesDict,
RawExperimentDict,
RawMetaDict,
+ RawTextInfoDict,
SpectLoopPars,
SpectrumDict,
TimeLoopPars,
@@ -237,7 +238,7 @@ def _load_single_experiment_loop(
count = loop_params.get("pPlanes", {}).get("uiCount", count)
return strct.SpectLoop(count=count)
- raise NotImplementedError(
+ raise NotImplementedError( # pragma: no cover
f"We've never seen a file like this! (loop_type={loop_type!r}). We'd "
"appreciate it if you would submit this file at "
"https://github.com/tlambert03/nd2/issues/new",
@@ -347,28 +348,29 @@ def _get_spectrum_max(item: SpectrumDict | None) -> float:
return max(spectrum, key=lambda x: x[0])[1] if spectrum else 0.0
-def load_text_info(src: dict) -> strct.TextInfo:
- # we only want keys that are present in the src
+def load_text_info(raw_txt_info: RawTextInfoDict) -> strct.TextInfo:
+ # we only want keys that are present in the raw_txt_info
+
out = {
- key: src[lookup]
+ key: raw_txt_info.get(lookup)
for key, lookup in (
- ("appVersion", "TextInfoItem_14"),
+ ("imageId", "TextInfoItem_0"),
+ ("type", "TextInfoItem_1"),
+ ("group", "TextInfoItem_2"),
+ ("sampleId", "TextInfoItem_3"),
("author", "TextInfoItem_4"),
+ ("description", "TextInfoItem_5"),
("capturing", "TextInfoItem_6"),
- ("conclusion", "TextInfoItem_10"),
+ ("sampling", "TextInfoItem_7"),
+ ("location", "TextInfoItem_8"),
("date", "TextInfoItem_9"),
- ("description", "TextInfoItem_5"),
- ("group", "TextInfoItem_2"),
- ("imageId", "TextInfoItem_0"),
+ ("conclusion", "TextInfoItem_10"),
("info1", "TextInfoItem_11"),
("info2", "TextInfoItem_12"),
- ("location", "TextInfoItem_8"),
("optics", "TextInfoItem_13"),
- ("sampleId", "TextInfoItem_3"),
- ("sampling", "TextInfoItem_7"),
- ("type", "TextInfoItem_1"),
+ ("appVersion", "TextInfoItem_14"),
)
- if src.get(lookup)
+ if raw_txt_info.get(lookup)
}
return cast(strct.TextInfo, out)
diff --git a/src/nd2/_pysdk/_pysdk.py b/src/nd2/_pysdk/_pysdk.py
index fb15136..206ccdc 100644
--- a/src/nd2/_pysdk/_pysdk.py
+++ b/src/nd2/_pysdk/_pysdk.py
@@ -39,6 +39,7 @@
RawAttributesDict,
RawExperimentDict,
RawMetaDict,
+ RawTextInfoDict,
)
StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes]
@@ -53,6 +54,7 @@ def __init__(
self._fh: BufferedReader | None = None
self._mmap: mmap.mmap | None = None
self._chunkmap: ChunkMap = {}
+ self._cached_decoded_chunks: dict[bytes, Any] = {}
self._error_radius: int | None = (
search_window * 1000 if validate_frames else None
)
@@ -62,16 +64,18 @@ def __init__(
self._experiment: list[structures.ExpLoop] | None = None
self._text_info: structures.TextInfo | None = None
self._metadata: structures.Metadata | None = None
- self._raw_attributes: RawAttributesDict | None = None
- self._raw_experiment: RawExperimentDict | None = None
- self._raw_text_info: dict | None = None
- self._raw_image_metadata: RawMetaDict | None = None
+
self._global_metadata: GlobalMetadata | None = None
- self._frame_offsets_: dict[int, int] | None = None
+ self._cached_frame_offsets: dict[int, int] | None = None
self._raw_frame_shape_: tuple[int, ...] | None = None
self._dtype_: np.dtype | None = None
self._strides_: tuple[int, ...] | None = None
self._frame_times: list[float] | None = None
+ # these caches could be removed... they aren't really used
+ self._raw_attributes: RawAttributesDict | None = None
+ self._raw_experiment: RawExperimentDict | None = None
+ self._raw_text_info: RawTextInfoDict | None = None
+ self._raw_image_metadata: RawMetaDict | None = None
self.open()
@@ -97,28 +101,25 @@ def __exit__(self, *_: Any) -> None:
@property
def chunkmap(self) -> ChunkMap:
+ """Load and return the chunkmap.
+
+ a Chunkmap is mapping of chunk names (bytes) to (offset, size) pairs.
+ {
+ b'ImageTextInfoLV!': (13041664, 2128),
+ b'ImageTextInfo!': (13037568, 1884),
+ b'ImageMetadataSeq|0!': (237568, 33412),
+ ...
+ }
+ """
if not self._chunkmap:
if self._fh is None:
raise OSError("File not open")
self._chunkmap = get_chunkmap(self._fh, error_radius=self._error_radius)
return self._chunkmap
- @property
- def _frame_offsets(self) -> dict[int, int]:
- if self._frame_offsets_ is None:
- DEFAULT_SHIFT = 4072
- offsets = [
- (int(key[13:-1]), pos)
- for key, (pos, _) in sorted(self.chunkmap.items())
- if key.startswith(b"ImageDataSeq|")
- ]
- # if validate_frames:
- # return _validate_frames(fh, image_map, kbrange=search_window), meta_ma
- self._frame_offsets_ = {f: int(o + 24 + DEFAULT_SHIFT) for f, o in offsets}
- return self._frame_offsets_
-
@property
def attributes(self) -> structures.Attributes:
+ """Load and return the image attributes."""
if self._attributes is None:
k = b"ImageAttributesLV!" if self.version >= (3, 0) else b"ImageAttributes!"
attrs = self._decode_chunk(k, strip_prefix=False)
@@ -130,24 +131,51 @@ def attributes(self) -> structures.Attributes:
return self._attributes
def _load_chunk(self, name: bytes) -> bytes:
+ """Load raw bytes from a specific chunk in the chunkmap.
+
+ `name` must be a valid key in the chunkmap.
+ """
if self._fh is None:
raise OSError("File not open")
- offset, _ = self.chunkmap[name]
- # TODO: there's a possibility of speed up here since we're rereading the header
+
+ try:
+ offset = self.chunkmap[name][0]
+ except KeyError as e:
+ raise KeyError(
+ f"Chunk key {name!r} not found in chunkmap: {set(self.chunkmap)}"
+ ) from e
+
if self._error_radius is None:
return read_nd2_chunk(self._fh, offset)
return _robustly_read_named_chunk(
self._fh, offset, expect_name=name, search_radius=self._error_radius
)
- def _decode_chunk(self, name: bytes, strip_prefix: bool = True) -> dict:
- data = self._load_chunk(name)
- if self.version < (3, 0):
- return cast("dict", json_from_clx_variant(data, strip_prefix=strip_prefix))
- return json_from_clx_lite_variant(data, strip_prefix=strip_prefix)
+ def _decode_chunk(self, name: bytes, strip_prefix: bool = True) -> dict | Any:
+ """Convert raw chunk bytes to a Python object.
+
+ Parameters
+ ----------
+ name : bytes
+ The name of the chunk to load. Must be a valid key in the chunkmap.
+ strip_prefix : bool, optional
+ If True, strip the lowercase "type" prefix from the tag names, by default
+ False.
+ """
+ if name not in self._cached_decoded_chunks:
+ data = self._load_chunk(name)
+ if data.startswith(b"<"):
+ decoded: Any = json_from_clx_variant(data, strip_prefix=strip_prefix)
+ elif self.version < (3, 0):
+ decoded = json_from_clx_variant(data, strip_prefix=strip_prefix)
+ else:
+ decoded = json_from_clx_lite_variant(data, strip_prefix=strip_prefix)
+ self._cached_decoded_chunks[name] = decoded
+ return self._cached_decoded_chunks[name]
@property
def version(self) -> tuple[int, int]:
+ """Return the file format version as a tuple of ints."""
if self._version is None:
try:
self._version = get_version(self._fh or self._filename)
@@ -208,8 +236,8 @@ def text_info(self) -> structures.TextInfo:
else:
info = self._decode_chunk(k, strip_prefix=False)
info = info.get("SLxImageTextInfo", info) # for v3 only
- self._raw_text_info = info
- self._text_info = load_text_info(info)
+ self._raw_text_info = cast("RawTextInfoDict", info)
+ self._text_info = load_text_info(self._raw_text_info)
return self._text_info
def experiment(self) -> list[structures.ExpLoop]:
@@ -248,8 +276,6 @@ def voxel_size(self) -> tuple[float, float, float]:
def channel_names(self) -> list[str]:
return [c.channel.name for c in self.metadata().channels or []]
- # -----------
-
def _coords_from_seq_index(self, seq_index: int) -> tuple[int, ...]:
"""Convert a sequence index to a coordinate tuple."""
coords: list[int] = []
@@ -268,6 +294,20 @@ def _coord_info(self) -> list[tuple[int, str, int]]:
def _seq_count(self) -> int:
return int(np.prod([x.count for x in self.experiment()]))
+ @property
+ def _frame_offsets(self) -> dict[int, int]:
+ """Return map of frame number to offset in the file."""
+ if self._cached_frame_offsets is None:
+ # image frames are stored in the chunkmap as "ImageDataSeq|!"
+ # and their data is stored 24 + 4072 bytes after the chunkmap offset
+ data_offset = 24 + 4072
+ self._cached_frame_offsets = {
+ int(chunk_key[13:-1]): int(offset + data_offset)
+ for chunk_key, (offset, _) in sorted(self.chunkmap.items())
+ if chunk_key.startswith(b"ImageDataSeq|")
+ }
+ return self._cached_frame_offsets
+
def _read_image(self, index: int) -> np.ndarray:
"""Read a chunk directly without using SDK."""
if index > self._seq_count():
@@ -363,62 +403,13 @@ def _actual_frame_shape(self) -> tuple[int, ...]:
attr.componentCount // (attr.channelCount or 1),
)
- def _get_meta_chunk(self, key: str) -> bytes:
- # deprecated
- return self._load_chunk(f"{key}!".encode())
-
- @property
- def _meta_map(self) -> dict[str, int]:
- # deprecated
- return {k.decode()[:-1]: v for k, (v, _) in self.chunkmap.items()}
-
def _custom_data(self) -> dict[str, Any]:
return {
- k.decode()[14:-1]: json_from_clx_variant(self._load_chunk(k))
+ k.decode()[14:-1]: self._decode_chunk(k)
for k in self.chunkmap
if k.startswith(b"CustomDataVar|")
}
- # probably a temporary method, for testing
- def _raw_meta(self) -> dict:
- k = b"ImageAttributesLV!" if self.version >= (3, 0) else b"ImageAttributes!"
- attrs = self._decode_chunk(k, strip_prefix=False) if k in self.chunkmap else {}
- attrs = attrs.get("SLxImageAttributes", attrs)
-
- k = b"ImageTextInfoLV!" if self.version >= (3, 0) else b"ImageTextInfo!"
- ti = self._decode_chunk(k, strip_prefix=False) if k in self.chunkmap else {}
- ti = ti.get("SLxImageTextInfo", ti)
-
- k = b"ImageMetadataLV!" if self.version >= (3, 0) else b"ImageMetadata!"
- exp = self._decode_chunk(k, strip_prefix=False) if k in self.chunkmap else {}
- exp = exp.get("SLxExperiment", exp)
-
- k = (
- b"ImageMetadataSeqLV|0!"
- if self.version >= (3, 0)
- else b"ImageMetadataSeq|0!"
- )
- meta = self._decode_chunk(k, strip_prefix=False) if k in self.chunkmap else {}
- meta = meta.get("SLxPictureMetadata", meta)
-
- return {
- "Attributes": attrs,
- "Experiment": exp,
- "Metadata": meta,
- "TextInfo": ti,
- }
-
- # TODO: merge with decode_chunk
- def _decoded_custom_data_chunk(
- self, key: bytes, strip_prefix: bool = False
- ) -> dict:
- k = b"CustomDataVar|" + key
- if k not in self.chunkmap:
- return {}
-
- bytes_ = self._load_chunk(k)
- return cast("dict", json_from_clx_variant(bytes_, strip_prefix=strip_prefix))
-
def recorded_data(self) -> dict[str, np.ndarray | Sequence]:
"""Return tabular data recorded for each frame of the experiment.
@@ -431,7 +422,11 @@ def recorded_data(self) -> dict[str, np.ndarray | Sequence]:
Legacy ND2 files are not supported.
"""
- cd = self._decoded_custom_data_chunk(b"CustomDataV2_0!")
+ try:
+ cd = self._decode_chunk(b"CustomDataVar|CustomDataV2_0!")
+ except KeyError:
+ return {}
+
if not cd:
return {}
diff --git a/src/nd2/_pysdk/_sdk_types.py b/src/nd2/_pysdk/_sdk_types.py
index ad6e76e..88fa981 100644
--- a/src/nd2/_pysdk/_sdk_types.py
+++ b/src/nd2/_pysdk/_sdk_types.py
@@ -83,6 +83,22 @@ class RawExperimentDict(TypedDict):
vectStimulationConfigurationsSize: NotRequired[int]
wsCameraName: NotRequired[str]
+ class RawTextInfoDict(TypedDict):
+ TextInfoItem_0: str
+ TextInfoItem_1: str
+ TextInfoItem_2: str
+ TextInfoItem_3: str
+ TextInfoItem_4: str
+ TextInfoItem_5: str
+ TextInfoItem_6: str
+ TextInfoItem_7: str
+ TextInfoItem_8: str
+ TextInfoItem_9: str
+ TextInfoItem_10: str
+ TextInfoItem_11: str
+ TextInfoItem_12: str
+ TextInfoItem_13: str
+
class TimeLoopPars(TypedDict):
bDurationPref: NotRequired[bool]
dAvgPeriodDiff: float
diff --git a/src/nd2/nd2file.py b/src/nd2/nd2file.py
index b3c86e8..f6289c5 100644
--- a/src/nd2/nd2file.py
+++ b/src/nd2/nd2file.py
@@ -4,10 +4,11 @@
import warnings
from itertools import product
from pathlib import Path
-from typing import TYPE_CHECKING, cast, no_type_check, overload
+from typing import TYPE_CHECKING, cast, overload
import numpy as np
+from ._pysdk._chunk_decode import ND2_FILE_SIGNATURE
from ._util import AXIS, VoxelSize, get_reader, is_supported_file
from .structures import ROI
@@ -33,7 +34,6 @@
ExpLoop,
FrameMetadata,
Metadata,
- Position,
TextInfo,
XYPosLoop,
)
@@ -42,6 +42,28 @@
class ND2File:
+ """Main objecting for opening and extracting data from an nd2 file.
+
+ Parameters
+ ----------
+ path : Path | str
+ Filename of an nd2 file.
+ validate_frames : bool
+ Whether to verify (and attempt to fix) frames whose positions have been
+ shifted relative to the predicted offset (i.e. in a corrupted file).
+ This comes at a slight performance penalty at file open, but may "rescue"
+ some corrupt files. by default False.
+ search_window : int
+ When validate_frames is true, this is the search window (in KB) that will
+ be used to try to find the actual chunk position. by default 100 KB
+ read_using_sdk : Optional[bool]
+ DEPRECATED. No longer does anything.
+ If `True`, use the SDK to read the file. If `False`, inspects the chunkmap
+ and reads from a `numpy.memmap`. If `None` (the default), uses the SDK if
+ the file is compressed, otherwise uses the memmap. Note: using
+ `read_using_sdk=False` on a compressed file will result in a ValueError.
+ """
+
_memmap: mmap.mmap
_is_legacy: bool
@@ -53,28 +75,6 @@ def __init__(
search_window: int = 100,
read_using_sdk: bool | None = None,
) -> None:
- """Open an nd2 file.
-
- Parameters
- ----------
- path : Union[Path, str]
- Filename of an nd2 file.
- validate_frames : bool
- Whether to verify (and attempt to fix) frames whose positions have been
- shifted relative to the predicted offset (i.e. in a corrupted file).
- This comes at a slight performance penalty at file open, but may "rescue"
- some corrupt files. by default False.
- search_window : int
- When validate_frames is true, this is the search window (in KB) that will
- be used to try to find the actual chunk position. by default 100 KB
- read_using_sdk : Optional[bool]
- DEPRECATED. No longer does anything.
- If `True`, use the SDK to read the file. If `False`, inspects the chunkmap
- and reads from a `numpy.memmap`. If `None` (the default), uses the SDK if
- the file is compressed, otherwise uses the memmap. Note: using
- `read_using_sdk=False` on a compressed file will result in a ValueError.
-
- """
if read_using_sdk is not None:
warnings.warn(
"The `read_using_sdk` argument is deprecated and will be removed in "
@@ -125,6 +125,7 @@ def closed(self) -> bool:
return self._closed
def __enter__(self) -> ND2File:
+ """Open file for reading."""
self.open()
return self
@@ -139,15 +140,18 @@ def __del__(self) -> None:
self._rdr.close()
def __exit__(self, *_: Any) -> None:
+ """Exit context manager and close file."""
self.close()
def __getstate__(self) -> dict[str, Any]:
+ """Return state for pickling."""
state = self.__dict__.copy()
del state["_rdr"]
del state["_lock"]
return state
def __setstate__(self, d: dict[str, Any]) -> None:
+ """Load state from pickling."""
self.__dict__ = d
self._lock = threading.RLock()
self._rdr = get_reader(self._path)
@@ -167,18 +171,18 @@ def text_info(self) -> TextInfo | dict:
@cached_property
def rois(self) -> dict[int, ROI]:
"""Return dict of {id: ROI} for all ROIs found in the metadata."""
- ROI_METADATA = "CustomData|RoiMetadata_v1"
- if self.is_legacy or ROI_METADATA not in self._rdr._meta_map: # type: ignore
- return {}
- data = self.unstructured_metadata(include={ROI_METADATA})
- data = data.get(ROI_METADATA, {}).get("RoiMetadata_v1", {})
+ key = b"CustomData|RoiMetadata_v1!"
+ if self.is_legacy or key not in self._rdr.chunkmap: # type: ignore
+ return {} # pragma: no cover
+
+ data = cast("LatestSDKReader", self._rdr)._decode_chunk(key)
+ data = data.get("RoiMetadata_v1", {}).copy()
data.pop("Global_Size", None)
try:
- _rois = (ROI._from_meta_dict(d) for d in data.values())
- rois = {r.id: r for r in _rois}
- except Exception as e:
+ _rois = [ROI._from_meta_dict(d) for d in data.values()]
+ except Exception as e: # pragma: no cover
raise ValueError(f"Could not parse ROI metadata: {e}") from e
- return rois
+ return {r.id: r for r in _rois}
@cached_property
def experiment(self) -> list[ExpLoop]:
@@ -226,22 +230,25 @@ def unstructured_metadata(
metadata chunk (things like 'CustomData|RoiMetadata_v1' or
'ImageMetadataLV'), and values that are associated metadata chunk.
"""
- if self.is_legacy:
+ if self.is_legacy: # pragma: no cover
raise NotImplementedError(
"unstructured_metadata not available for legacy files"
)
- from ._clx_lite import json_from_clx_lite_variant
if unnest is not None:
warnings.warn(
"The unnest parameter is deprecated, and no longer has any effect.",
+ FutureWarning,
stacklevel=2,
)
- output: dict[str, Any] = {}
-
rdr = cast("LatestSDKReader", self._rdr)
- keys = set(rdr._meta_map)
+ keys = {
+ k.decode()[:-1]
+ for k in rdr.chunkmap
+ if not k.startswith((b"ImageDataSeq", b"CustomData", ND2_FILE_SIGNATURE))
+ }
+
if include:
_keys: set[str] = set()
for i in include:
@@ -253,20 +260,13 @@ def unstructured_metadata(
if exclude:
keys = {k for k in keys if k not in exclude}
+ output: dict[str, Any] = {}
for key in sorted(keys):
+ name = f"{key}!".encode()
try:
- meta: bytes = rdr._get_meta_chunk(key)
- if meta.startswith(b"<"):
- # probably xml
- decoded: Any = meta.decode("utf-8")
- else:
- decoded = json_from_clx_lite_variant(
- meta, strip_prefix=strip_prefix
- )
- except Exception:
- decoded = meta
-
- output[key] = decoded
+ output[key] = rdr._decode_chunk(name, strip_prefix=strip_prefix)
+ except Exception: # pragma: no cover
+ output[key] = rdr._load_chunk(name)
return output
@cached_property
@@ -408,7 +408,7 @@ def asarray(self, position: int | None = None) -> np.ndarray:
try:
pidx = list(self.sizes).index(AXIS.POSITION)
except ValueError as exc:
- if position > 0:
+ if position > 0: # pragma: no cover
raise IndexError(
f"Position {position} is out of range. "
f"Only 1 position available"
@@ -416,7 +416,7 @@ def asarray(self, position: int | None = None) -> np.ndarray:
seqs = range(self._frame_count)
else:
if position >= self.sizes[AXIS.POSITION]:
- raise IndexError(
+ raise IndexError( # pragma: no cover
f"Position {position} is out of range. "
f"Only {self.sizes[AXIS.POSITION]} positions available"
)
@@ -492,7 +492,7 @@ def _seq_index_from_coords(self, coords: Sequence) -> Sequence[int] | SupportsIn
def _dask_block(self, copy: bool, block_id: tuple[int]) -> np.ndarray:
if isinstance(block_id, np.ndarray):
- return
+ return None
with self._lock:
was_closed = self.closed
if self.closed:
@@ -502,7 +502,7 @@ def _dask_block(self, copy: bool, block_id: tuple[int]) -> np.ndarray:
idx = self._seq_index_from_coords(block_id[:ncoords])
if idx == self._NO_IDX:
- if any(block_id):
+ if any(block_id): # pragma: no cover
raise ValueError(
f"Cannot get chunk {block_id} for single frame image."
)
@@ -652,6 +652,7 @@ def _expand_coords(self, squeeze: bool = True) -> dict:
]
# fix for Z axis missing from experiment:
+ # TODO: this isn't hit by coverage... maybe it's not needed?
if AXIS.Z in self.sizes and AXIS.Z not in coords:
coords[AXIS.Z] = np.arange(self.sizes[AXIS.Z]) * dz
@@ -674,6 +675,7 @@ def _channel_names(self) -> list[str]:
return self._rdr.channel_names()
def __repr__(self) -> str:
+ """Return a string representation of the ND2File."""
try:
details = " (closed)" if self.closed else f" {self.dtype}: {self.sizes!r}"
extra = f": {Path(self.path).name!r}{details}"
@@ -694,7 +696,7 @@ def recorded_data(self) -> dict[str, np.ndarray | Sequence]:
Legacy ND2 files are not supported.
"""
- if self.is_legacy:
+ if self.is_legacy: # pragma: no cover
warnings.warn(
"`recorded_data` is not supported for legacy ND2 files",
UserWarning,
@@ -829,20 +831,3 @@ def imread(
return nd2.to_dask()
else:
return nd2.asarray()
-
-
-@no_type_check
-def _fix_names(xy_exp, points: list[Position]) -> None:
- """Attempt to fix missing XYPosLoop position names."""
- if not isinstance(xy_exp, dict) or xy_exp.get("Type", "") != 2:
- raise ValueError("Invalid XY experiment")
- _points = xy_exp["LoopPars"]["Points"]
- if len(_points) == 1 and "" in _points:
- _points = _points[""]
- if not isinstance(_points, list):
- _points = [_points]
- _names = {(p["PosX"], p["PosY"], p["PosZ"]): p["PosName"] for p in _points}
-
- for p in points:
- if p.name is None:
- p.name = _names.get(tuple(p.stagePositionUm), p.name)
diff --git a/src/nd2/structures.py b/src/nd2/structures.py
index ab9378e..5322ad7 100644
--- a/src/nd2/structures.py
+++ b/src/nd2/structures.py
@@ -3,7 +3,7 @@
import builtins
from dataclasses import dataclass, field
from enum import IntEnum
-from typing import NamedTuple, Union, cast
+from typing import NamedTuple, Union
from typing_extensions import Literal, TypeAlias, TypedDict
@@ -93,19 +93,6 @@ class _Loop:
parameters: LoopParams
type: LoopTypeString
- @classmethod
- def create(cls, obj: dict) -> ExpLoop:
- type_ = obj.pop("type")
- if type_ in ("TimeLoop", LoopType.TimeLoop):
- return TimeLoop(**obj)
- elif type_ in ("NETimeLoop", LoopType.NETimeLoop):
- return NETimeLoop(**obj)
- elif type_ in ("XYPosLoop", LoopType.XYPosLoop):
- return XYPosLoop(**obj)
- elif type_ in ("ZStackLoop", LoopType.ZStackLoop):
- return ZStackLoop(**obj)
- return cast("ExpLoop", globals()[obj["type"]](**obj))
-
@dataclass
class SpectLoop:
diff --git a/tests/conftest.py b/tests/conftest.py
index b2fa107..16212a3 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -20,7 +20,7 @@
SINGLE = DATA / "dims_t3c2y32x32.nd2"
-@pytest.fixture
+@pytest.fixture()
def single_nd2():
return SINGLE
@@ -46,7 +46,7 @@ def old_nd2(request):
@pytest.fixture(autouse=True)
-def no_files_left_open():
+def _assert_no_files_left_open():
files_before = {p for p in psutil.Process().open_files() if p.path.endswith("nd2")}
yield
files_after = {p for p in psutil.Process().open_files() if p.path.endswith("nd2")}
diff --git a/tests/test_aicsimage.py b/tests/test_aicsimage.py
index 46c83be..5873862 100644
--- a/tests/test_aicsimage.py
+++ b/tests/test_aicsimage.py
@@ -16,14 +16,16 @@
@pytest.mark.parametrize(
- "filename, "
- "set_scene, "
- "expected_scenes, "
- "expected_shape, "
- "expected_dtype, "
- "expected_dims_order, "
- "expected_channel_names, "
- "expected_physical_pixel_sizes",
+ (
+ "filename",
+ "set_scene",
+ "expected_scenes",
+ "expected_shape",
+ "expected_dtype",
+ "expected_dims_order",
+ "expected_channel_names",
+ "expected_physical_pixel_sizes",
+ ),
[
pytest.param(
"ND2_aryeh_but3_cont200-1.nd2",
diff --git a/tests/test_deprecations.py b/tests/test_deprecations.py
new file mode 100644
index 0000000..0faf367
--- /dev/null
+++ b/tests/test_deprecations.py
@@ -0,0 +1,14 @@
+import nd2
+import pytest
+
+
+def test_read_using_sdk(single_nd2):
+ with pytest.warns(FutureWarning, match="read_using_sdk"):
+ f = nd2.ND2File(single_nd2, read_using_sdk=True)
+ f.close()
+
+
+def test_unnest_param(single_nd2):
+ with nd2.ND2File(single_nd2) as f:
+ with pytest.warns(FutureWarning, match="unnest"):
+ f.unstructured_metadata(unnest=True)
diff --git a/tests/test_metadata.py b/tests/test_metadata.py
index ce2bd7d..b4f3da8 100644
--- a/tests/test_metadata.py
+++ b/tests/test_metadata.py
@@ -2,7 +2,11 @@
import sys
from pathlib import Path
+import dask.array as da
import pytest
+import xarray as xr
+from nd2 import ND2File, structures
+from nd2._pysdk._chunk_decode import ND2_FILE_SIGNATURE
sys.path.append(str(Path(__file__).parent.parent / "scripts"))
from nd2_describe import get_nd2_stats # noqa: E402
@@ -10,6 +14,8 @@
with open("tests/samples_metadata.json") as f:
EXPECTED = json.load(f)
+DATA = Path(__file__).parent / "data"
+
@pytest.mark.parametrize("path", EXPECTED, ids=lambda x: f'{x}_{EXPECTED[x]["ver"]}')
def test_metadata_integrity(path: str):
@@ -33,3 +39,110 @@ def _clear_names(*exps):
if item["type"] == "XYPosLoop":
for point in item["parameters"]["points"]:
point.pop("name", None)
+
+
+def test_decode_all_chunks(new_nd2):
+ with ND2File(new_nd2) as f:
+ for key in f._rdr.chunkmap:
+ if not key.startswith((b"ImageDataSeq", b"CustomData", ND2_FILE_SIGNATURE)):
+ f._rdr._decode_chunk(key)
+
+
+def test_metadata_extraction(new_nd2: Path):
+ assert ND2File.is_supported_file(new_nd2)
+ with ND2File(new_nd2) as nd:
+ assert repr(nd)
+ assert nd.path == str(new_nd2)
+ assert not nd.closed
+
+ assert isinstance(nd._rdr._seq_count(), int)
+ assert isinstance(nd.attributes, structures.Attributes)
+
+ # TODO: deal with typing when metadata is completely missing
+ assert isinstance(nd.metadata, structures.Metadata)
+ assert isinstance(nd.frame_metadata(0), structures.FrameMetadata)
+ assert isinstance(nd.experiment, list)
+ assert isinstance(nd.text_info, dict)
+ assert isinstance(nd.sizes, dict)
+ assert isinstance(nd.custom_data, dict)
+ assert isinstance(nd.shape, tuple)
+ assert isinstance(nd.size, int)
+ assert isinstance(nd.closed, bool)
+ assert isinstance(nd.ndim, int)
+ _bd = nd.binary_data
+ assert isinstance(nd.is_rgb, bool)
+ assert isinstance(nd.nbytes, int)
+
+ assert isinstance(nd.unstructured_metadata(), dict)
+ assert isinstance(nd.recorded_data, dict)
+
+ assert nd.closed
+
+
+def test_metadata_extraction_legacy(old_nd2):
+ assert ND2File.is_supported_file(old_nd2)
+ with ND2File(old_nd2) as nd:
+ assert repr(nd)
+ assert nd.path == str(old_nd2)
+ assert not nd.closed
+
+ assert isinstance(nd.attributes, structures.Attributes)
+
+ # # TODO: deal with typing when metadata is completely missing
+ # assert isinstance(nd.metadata, structures.Metadata)
+ assert isinstance(nd.experiment, list)
+ assert isinstance(nd.text_info, dict)
+ xarr = nd.to_xarray()
+ assert isinstance(xarr, xr.DataArray)
+ assert isinstance(xarr.data, da.Array)
+
+ assert nd.closed
+
+
+def test_recorded_data() -> None:
+ # this method is smoke-tested for every file above...
+ # but specific values are asserted here:
+ with ND2File(DATA / "cluster.nd2") as f:
+ rd = f.recorded_data
+ headers = list(rd)
+ row_0 = [rd[h][0] for h in headers]
+ assert headers == [
+ "Time [s]",
+ "Z-Series",
+ "Camera 1 Temperature [°C]",
+ "Laser Power; 1.channel [%]",
+ "High Voltage; 1.channel",
+ "Laser Power; 2.channel [%]",
+ "High Voltage; 2.channel",
+ "Laser Power; 3.channel [%]",
+ "High Voltage; 3.channel",
+ "Laser Power; 4.channel [%]",
+ "High Voltage; 4.channel",
+ "Camera 1 Exposure Time [ms]",
+ "High Voltage; TD",
+ "PFS Offset",
+ "PFS Status",
+ "X Coord [µm]",
+ "Y Coord [µm]",
+ "Ti ZDrive [µm]",
+ ]
+ assert row_0 == [
+ 0.44508349828422067,
+ -2.0,
+ -5.0,
+ 0.0,
+ 0,
+ 0.5,
+ 37,
+ 10.758400000000002,
+ 137,
+ 9.0,
+ 75,
+ 8.1,
+ 0,
+ -1,
+ 7,
+ -26056.951209195162,
+ -4155.462732842248,
+ 3916.7250000000004,
+ ]
diff --git a/tests/test_parse.py b/tests/test_parse.py
index 4ddf7fa..ca31a39 100644
--- a/tests/test_parse.py
+++ b/tests/test_parse.py
@@ -14,8 +14,14 @@ def readlim_output():
def test_parse_raw_metadata(new_nd2: Path):
expected = readlim_output()
- with ND2Reader(new_nd2) as f:
- meta = f._raw_meta()
+ with ND2Reader(new_nd2) as rdr:
+ rdr._cached_global_metadata() # force metadata to be read
+ meta = {
+ "Attributes": rdr._raw_attributes,
+ "Experiment": rdr._raw_experiment,
+ "Metadata": rdr._raw_image_metadata,
+ "TextInfo": rdr._raw_text_info,
+ }
lim_meta = expected[new_nd2.name]["raw_metadata"]
_assert_lim_close_enough(meta, lim_meta)
diff --git a/tests/test_reader.py b/tests/test_reader.py
index 4083d04..5f3ab5c 100644
--- a/tests/test_reader.py
+++ b/tests/test_reader.py
@@ -8,66 +8,20 @@
import numpy as np
import pytest
import xarray as xr
-from nd2 import ND2File, imread, structures
+from nd2 import ND2File, imread
from nd2._util import AXIS
from resource_backed_dask_array import ResourceBackedDaskArray
DATA = Path(__file__).parent / "data"
-def test_metadata_extraction(new_nd2: Path):
- assert ND2File.is_supported_file(new_nd2)
- with ND2File(new_nd2) as nd:
- assert nd.path == str(new_nd2)
- assert not nd.closed
-
- assert isinstance(nd._rdr._seq_count(), int)
- assert isinstance(nd.attributes, structures.Attributes)
-
- # TODO: deal with typing when metadata is completely missing
- assert isinstance(nd.metadata, structures.Metadata)
- assert isinstance(nd.frame_metadata(0), structures.FrameMetadata)
- assert isinstance(nd.experiment, list)
- assert isinstance(nd.text_info, dict)
- assert isinstance(nd.sizes, dict)
- assert isinstance(nd.custom_data, dict)
- assert isinstance(nd.shape, tuple)
- assert isinstance(nd.size, int)
- assert isinstance(nd.closed, bool)
- assert isinstance(nd.ndim, int)
-
- assert isinstance(nd.unstructured_metadata(), dict)
- assert isinstance(nd.recorded_data, dict)
-
- assert nd.closed
-
-
-def test_metadata_extraction_legacy(old_nd2):
- assert ND2File.is_supported_file(old_nd2)
- with ND2File(old_nd2) as nd:
- assert nd.path == str(old_nd2)
- assert not nd.closed
-
- assert isinstance(nd.attributes, structures.Attributes)
-
- # # TODO: deal with typing when metadata is completely missing
- # assert isinstance(nd.metadata, structures.Metadata)
- assert isinstance(nd.experiment, list)
- assert isinstance(nd.text_info, dict)
- xarr = nd.to_xarray()
- assert isinstance(xarr, xr.DataArray)
- assert isinstance(xarr.data, da.Array)
-
- assert nd.closed
-
-
def test_read_safety(new_nd2: Path):
with ND2File(new_nd2) as nd:
for i in range(nd._frame_count):
nd._rdr._read_image(i)
-def test_position(new_nd2):
+def test_position(new_nd2: Path):
"""use position to extract a single stage position with asarray."""
if new_nd2.stat().st_size > 250_000_000:
pytest.skip("skipping read on big files")
@@ -156,7 +110,7 @@ def test_imread():
assert d.shape == (4, 5, 520, 696)
-@pytest.fixture
+@pytest.fixture()
def bfshapes():
with open(DATA / "bf_shapes.json") as f:
return json.load(f)
@@ -164,16 +118,14 @@ def bfshapes():
def test_bioformats_parity(new_nd2: Path, bfshapes: dict):
"""Testing that match bioformats shapes (or better when bioformats misses it)."""
- if new_nd2.name in {
+ if new_nd2.name.startswith("JOBS_") or new_nd2.name in {
"dims_rgb_t3p2c2z3x64y64.nd2", # bioformats seems to miss the RGB
"dims_rgb_c2x64y64.nd2", # bioformats seems to miss the RGB
"dims_t3y32x32.nd2", # bioformats misses T
"jonas_3.nd2", # bioformats misses Z
"cluster.nd2", # bioformats misses both Z and T
}:
- pytest.xfail()
- if new_nd2.name.startswith("JOBS_"):
- pytest.xfail() # bioformats misses XY position info in JOBS files
+ return # bioformats misses XY position info in JOBS files
try:
bf_info = {k: v for k, v in bfshapes[new_nd2.name]["shape"].items() if v > 1}
except KeyError:
@@ -245,7 +197,7 @@ def test_pickle_dask_wrapper(single_nd2):
)
-@pytest.mark.parametrize("fname, sizes", OLD_SDK_MISSES_COORDS)
+@pytest.mark.parametrize(("fname", "sizes"), OLD_SDK_MISSES_COORDS)
def test_sizes(fname, sizes):
with ND2File(DATA / fname) as f:
assert f.sizes == sizes
@@ -303,55 +255,6 @@ def test_extra_width_bytes():
np.testing.assert_array_equal(im[0, 0, :4, :4], expected)
-def test_recorded_data() -> None:
- # this method is smoke-tested for every file above...
- # but specific values are asserted here:
- with ND2File(DATA / "cluster.nd2") as f:
- rd = f.recorded_data
- headers = list(rd)
- row_0 = [rd[h][0] for h in headers]
- assert headers == [
- "Time [s]",
- "Z-Series",
- "Camera 1 Temperature [°C]",
- "Laser Power; 1.channel [%]",
- "High Voltage; 1.channel",
- "Laser Power; 2.channel [%]",
- "High Voltage; 2.channel",
- "Laser Power; 3.channel [%]",
- "High Voltage; 3.channel",
- "Laser Power; 4.channel [%]",
- "High Voltage; 4.channel",
- "Camera 1 Exposure Time [ms]",
- "High Voltage; TD",
- "PFS Offset",
- "PFS Status",
- "X Coord [µm]",
- "Y Coord [µm]",
- "Ti ZDrive [µm]",
- ]
- assert row_0 == [
- 0.44508349828422067,
- -2.0,
- -5.0,
- 0.0,
- 0,
- 0.5,
- 37,
- 10.758400000000002,
- 137,
- 9.0,
- 75,
- 8.1,
- 0,
- -1,
- 7,
- -26056.951209195162,
- -4155.462732842248,
- 3916.7250000000004,
- ]
-
-
def test_gc_triggers_cleanup(single_nd2):
# this test takes advantage of the `no_files_left_open``
# fixture in conftest to ensure that the file is closed
diff --git a/tests/test_rescue.py b/tests/test_rescue.py
index a0dab54..e007140 100644
--- a/tests/test_rescue.py
+++ b/tests/test_rescue.py
@@ -4,7 +4,7 @@
from nd2._pysdk._chunk_decode import get_chunkmap
-@pytest.fixture
+@pytest.fixture()
def broken_nd2(tmp_path, single_nd2):
with open(single_nd2, "rb") as f:
data = f.read()
@@ -28,12 +28,12 @@ def test_rescue(broken_nd2, single_nd2, capsys):
# we can't do too much magic about guessing shape and dtype since some files
# may not have that information intact
- with pytest.raises(ValueError, match="appears to be corrupt. Expected "):
- with open(broken_nd2, "rb") as f2:
+ with open(broken_nd2, "rb") as f2:
+ with pytest.raises(ValueError, match="appears to be corrupt. Expected "):
cm2 = get_chunkmap(f2)
- with pytest.raises(ValueError, match="Also looked in the surrounding 1000 bytes"):
- with open(broken_nd2, "rb") as f2:
+ with open(broken_nd2, "rb") as f2:
+ with pytest.raises(ValueError, match="Also looked in the surrounding 1000"):
# where 1000 is less than N above in broken_nd2
cm2 = get_chunkmap(f2, error_radius=1000)