-
Notifications
You must be signed in to change notification settings - Fork 25
/
_hdf_dataset.py
93 lines (78 loc) · 2.37 KB
/
_hdf_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator, List, Sequence
from urllib.parse import urlunparse
from event_model import (
ComposeStreamResource,
ComposeStreamResourceBundle,
StreamDatum,
StreamResource,
)
@dataclass
class HDFDataset:
data_key: str
dataset: str
shape: Sequence[int] = field(default_factory=tuple)
dtype_numpy: str = ""
multiplier: int = 1
swmr: bool = False
SLICE_NAME = "AD_HDF5_SWMR_SLICE"
class HDFFile:
"""
:param full_file_name: Absolute path to the file to be written
:param datasets: Datasets to write into the file
"""
def __init__(
self,
full_file_name: Path,
datasets: List[HDFDataset],
hostname: str = "localhost",
) -> None:
self._last_emitted = 0
self._hostname = hostname
if len(datasets) == 0:
self._bundles = []
return None
bundler_composer = ComposeStreamResource()
uri = urlunparse(
(
"file",
self._hostname,
str(full_file_name.absolute()),
"",
"",
None,
)
)
self._bundles: List[ComposeStreamResourceBundle] = [
bundler_composer(
mimetype="application/x-hdf5",
uri=uri,
data_key=ds.data_key,
parameters={
"dataset": ds.dataset,
"swmr": ds.swmr,
"multiplier": ds.multiplier,
},
uid=None,
validate=True,
)
for ds in datasets
]
def stream_resources(self) -> Iterator[StreamResource]:
for bundle in self._bundles:
yield bundle.stream_resource_doc
def stream_data(self, indices_written: int) -> Iterator[StreamDatum]:
# Indices are relative to resource
if indices_written > self._last_emitted:
indices = {
"start": self._last_emitted,
"stop": indices_written,
}
self._last_emitted = indices_written
for bundle in self._bundles:
yield bundle.compose_stream_datum(indices)
return None
def close(self) -> None:
for bundle in self._bundles:
bundle.close()