-
Notifications
You must be signed in to change notification settings - Fork 25
Test case for corner case file (buffer too small) #160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cf31602
c759097
d2d2f40
6f88227
55d0bc5
871bc03
99c2ea9
363e46e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,10 +12,12 @@ | |
| from .core import InvalidHDF5File | ||
| from .core import UNDEFINED_ADDRESS | ||
| from .core import Reference | ||
| from .p5t import P5Type, P5CompoundType, P5EnumType, P5StringType, P5OpaqueType, P5ReferenceType, P5IntegerType, P5FloatType | ||
| from math import prod | ||
| import numpy as np | ||
|
|
||
| # uncomment this and use as shown in the FractalHeap if I/O diagnostic is needed | ||
| #from .utilities import Interceptor | ||
|
|
||
|
|
||
| class SuperBlock(object): | ||
| """ | ||
|
|
@@ -168,11 +170,61 @@ def objects(self): | |
|
|
||
| class FractalHeap(object): | ||
| """ | ||
| HDF5 Fractal Heap. | ||
| HDF5 Fractal Heap | ||
|
|
||
| The fractal heap implements the doubling table structure with indirect and direct blocks. | ||
| Indirect blocks in the heap do not actually contain data for objects in the heap, | ||
| their “size” is abstract - they represent the indexing structure for locating the direct blocks | ||
| in the doubling table. Direct blocks contain the actual data for objects stored in the heap. | ||
| They could be scattered all over the file unless the metadata is stored at the front by | ||
| carerful use of the HDF5 file creation properties. | ||
|
|
||
| The fractal heap ID can refer to a “tiny”, “huge”, or “managed” object. | ||
| If it's tiny, the ID contains the actual data and the heap itself does not need to be read from. | ||
| If it's huge, the ID contains the address on disk of the data or a b-tree key that can be used to find this address. | ||
| If it's managed, then it contains the offset and length within the virtual fractal heap address space | ||
| (i.e. inside a direct block, possibly indexed by one or more indirect blocks). | ||
|
|
||
| Which direct and indirect blocks contains the data, and the offset within the direct | ||
| block can be calculated by using the various parameters and algorithms described | ||
| at the start of the fractal heap section. It is an array of blocks of increasing size | ||
| within a linear address space. | ||
|
|
||
| Documentation lifted from the HDF5 file format documentation: | ||
|
|
||
| The number of rows of blocks, nrows, in an indirect block is calculated by the following expression: | ||
|
|
||
| nrows = (log2(iblock_size) - log2(<Starting Block Size>)) + 1 | ||
|
|
||
| where block_size is the size of the block that the indirect block represents in the doubling table. | ||
| For example, to represent a block with block_size equals to 1024, and Starting Block Size equals to 256, | ||
| three rows are needed. | ||
|
|
||
| The maximum number of rows of direct blocks, max_dblock_rows, in any indirect block of a fractal heap | ||
| is given by the following expression: | ||
|
|
||
| max_dblock_rows = (log2(<Maximum Direct Block Size>) - log2(<Starting Block Size>)) + 2 | ||
|
|
||
| Using the computed values for nrows and max_dblock_rows, along with the Width of the doubling table, | ||
| the number of direct and indirect block entries (K and N in the indirect block description, below) in | ||
| an indirect block can be computed: | ||
|
|
||
| K = MIN(nrows, max_dblock_rows) * Table Width | ||
|
|
||
| If nrows is less than or equal to max_dblock_rows, N is 0. Otherwise, N is simply computed: | ||
|
|
||
| N = K - (max_dblock_rows * Table Width) | ||
|
|
||
| The size of indirect blocks on disk is determined by the number of rows in the indirect block (computed above). | ||
| The size of direct blocks on disk is exactly the size of the block in the doubling table. | ||
|
|
||
| """ | ||
|
|
||
| def __init__(self, fh, offset): | ||
|
|
||
| """ | ||
| Read the heap header and construct the linear block mapping | ||
| """ | ||
| #fh = Interceptor(fh) | ||
| fh.seek(offset) | ||
| header = _unpack_struct_from_file(FRACTAL_HEAP_HEADER, fh) | ||
| assert header['signature'] == b'FRHP' | ||
|
|
@@ -213,7 +265,7 @@ def __init__(self, fh, offset): | |
| start_block_size = header['starting_block_size'] | ||
| table_width = header['table_width'] | ||
| if not start_block_size: | ||
| assert NotImplementedError | ||
| raise NotImplementedError | ||
|
|
||
| log2_maximum_dblock_size = int(log2(maximum_dblock_size)) | ||
| assert 2**log2_maximum_dblock_size == maximum_dblock_size | ||
|
|
@@ -223,29 +275,62 @@ def __init__(self, fh, offset): | |
|
|
||
| log2_table_width = int(log2(table_width)) | ||
| assert 2**log2_table_width == table_width | ||
|
|
||
| # TODO: double check this calculation, the HDF5 docs say the | ||
| # number of nblocks, nrows, in an indirect block is calculated by the following expression | ||
| # nrows = (log2(iblock_size) - log2(<Starting Block Size>)) + 1 | ||
| # the question is, how is this used? | ||
|
|
||
| self._indirect_nrows_sub = log2_table_width + log2_start_block_size - 1 | ||
|
|
||
| self.header = header | ||
| self.nobjects = header["managed_object_count"] + header["huge_object_count"] + header["tiny_object_count"] | ||
|
|
||
| managed = [] | ||
| # while iterating over direct and indirect blocks we keep track of the heap_offset | ||
| # thus, we are able to map this later back to an offset into our managed heap buffer | ||
| blocks = [] | ||
| buffer_offset = 0 | ||
| root_address = header["root_block_address"] | ||
| if root_address: | ||
| nrows = header["indirect_current_rows_count"] | ||
| if nrows: | ||
| for data in self._iter_indirect_block(fh, root_address, nrows): | ||
| # Address of root block points to an indirect block | ||
| for data, heap_offset, block_size in self._iter_indirect_block(fh, root_address, nrows): | ||
| managed.append(data) | ||
| blocks.append((heap_offset, buffer_offset, block_size)) | ||
| buffer_offset += len(data) | ||
| else: | ||
| data = self._read_direct_block(fh, root_address, start_block_size) | ||
| # Address of root block points to a direct block | ||
| data, heap_offset = self._read_direct_block(fh, root_address, start_block_size) | ||
| managed.append(data) | ||
| blocks.append((heap_offset, buffer_offset, start_block_size)) | ||
| buffer_offset += len(data) | ||
|
|
||
| self.managed = b"".join(managed) | ||
| self.blocks = blocks | ||
|
|
||
| def _read_direct_block(self, fh, offset, block_size): | ||
| """ | ||
| Read FHDB - direct block - from heap and return data and heap offset | ||
| """ | ||
| fh.seek(offset) | ||
| data = fh.read(block_size) | ||
| header = _unpack_struct_from(self.direct_block_header, data) | ||
| header["signature"] == b"FHDB" | ||
| return data | ||
| assert header["signature"] == b"FHDB" | ||
| return data, int.from_bytes(header["block_offset"], | ||
| byteorder="little", signed=False) | ||
|
|
||
| def _heapid_to_buffer_offset(self, heapid_offset): | ||
| """ | ||
| Get offset into flat managed buffer from heapid offset | ||
| """ | ||
| for heap_offset, buffer_offset, block_size in self.blocks: | ||
| if heap_offset <= heapid_offset < heap_offset + block_size: | ||
| relative = heapid_offset - heap_offset | ||
| return buffer_offset + relative | ||
|
|
||
| raise KeyError("HeapID offset not inside any heap block") | ||
|
|
||
| def get_data(self, heapid): | ||
| firstbyte = heapid[0] | ||
|
|
@@ -262,7 +347,14 @@ def get_data(self, heapid): | |
| data_offset += nbytes | ||
| nbytes = self._managed_object_length_size | ||
| size = _unpack_integer(nbytes, heapid, data_offset) | ||
| return self.managed[offset:offset+size] | ||
|
|
||
| # map heap_id offset to flat buffer offset | ||
| offset = self._heapid_to_buffer_offset(offset) | ||
| if offset < len(self.managed): | ||
| return self.managed[offset:offset + size] | ||
|
|
||
| return None | ||
|
|
||
| case 1: # tiny | ||
| raise NotImplementedError | ||
| case 2: # huge | ||
|
|
@@ -288,34 +380,38 @@ def _read_integral(self, fh, nbytes): | |
| def _iter_indirect_block(self, fh, offset, nrows): | ||
| fh.seek(offset) | ||
| header = _unpack_struct_from_file(self.indirect_block_header, fh) | ||
| header["signature"] == b"FHIB" | ||
| assert header["signature"] == b"FHIB" | ||
| header["block_offset"] = int.from_bytes(header["block_offset"], byteorder="little", signed=False) | ||
| # todo: this isn't really clear how the number of ndirect is deduced | ||
| # at least, we need to derive the correct number by iterating over below | ||
| ndirect, nindirect = self._indirect_info(nrows) | ||
|
|
||
| direct_blocks = list() | ||
| for i in range(ndirect): | ||
| address = struct.unpack('<Q', fh.read(8))[0] | ||
| if address == UNDEFINED_ADDRESS: | ||
| break | ||
| # if there is no valid address, we move on to the next | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, this was the first issue: There are cases, where attributes are in higher rows, even if lower rows are not set. So iteration over all possible addresses is needed! |
||
| continue | ||
| block_size = self._calc_block_size(i) | ||
| direct_blocks.append((address, block_size)) | ||
|
|
||
| indirect_blocks = list() | ||
| for i in range(ndirect, ndirect+nindirect): | ||
| address = struct.unpack('<Q', fh.read(8))[0] | ||
| if address == UNDEFINED_ADDRESS: | ||
| break | ||
| # same here, move on to the next address | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||
| continue | ||
| block_size = self._calc_block_size(i) | ||
| nrows = self._iblock_nrows_from_block_size(block_size) | ||
| indirect_blocks.append((address, nrows)) | ||
| indirect_blocks.append((address, block_size, nrows)) | ||
|
|
||
| for address, block_size in direct_blocks: | ||
| obj = self._read_direct_block(fh, address, block_size) | ||
| yield obj | ||
| obj, heap_offset = self._read_direct_block(fh, address, block_size) | ||
| yield obj, heap_offset, block_size | ||
|
|
||
| for address, nrows in indirect_blocks: | ||
| for obj in self._iter_indirect_block(fh, address, nrows): | ||
| yield obj | ||
| for address, block_size, nrows in indirect_blocks: | ||
| for obj, heap_offset, _block_size in self._iter_indirect_block(fh, address, nrows): | ||
| yield obj, heap_offset, _block_size | ||
|
|
||
| def _calc_block_size(self, iblock): | ||
| row = iblock//self.header["table_width"] | ||
|
|
@@ -330,7 +426,9 @@ def _indirect_info(self, nrows): | |
| table_width = self.header['table_width'] | ||
| nobjects = nrows * table_width | ||
| ndirect_max = self._max_direct_nrows * table_width | ||
| if nrows <= ndirect_max: | ||
| # this info cannot tell the precise amount of blocks | ||
| # it can just tell us the maximum possible amount we should parse | ||
| if nobjects <= ndirect_max: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Third "flaw":
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: this was only a problem for many, many, many attributes and/or very, very large attributes |
||
| ndirect = nobjects | ||
| nindirect = 0 | ||
| else: | ||
|
|
@@ -575,8 +673,8 @@ def _decode_array(arr, decoded): | |
|
|
||
| ('managed_freespace_size', 'Q'), # 8 byte addressing | ||
| ('freespace_manager_address', 'Q'), # 8 byte addressing | ||
| ('managed_space_size', 'Q'), # 8 byte addressing | ||
| ('managed_alloc_size', 'Q'), # 8 byte addressing | ||
| ('managed_space_size', 'Q'), # 8 byte addressing; this is the upper bound in the heaps linear address space | ||
| ('managed_alloc_size', 'Q'), # 8 byte addressing; this is how much of that that is currently allocated to the heap. | ||
| ('next_directblock_iterator_address', 'Q'), # 8 byte addressing | ||
|
|
||
| ('managed_object_count', 'Q'), # 8 byte addressing | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| import inspect | ||
|
|
||
| class Interceptor: | ||
| """ | ||
| Intercepts file-io and logs what is going on. | ||
| Used in debugging file reading issues and optimisation. | ||
| """ | ||
| def __init__(self, fh, activated=True): | ||
| self._fh = fh | ||
| self.activated=activated | ||
| def seek(self, offset, whence=0): | ||
| if self.activated: | ||
| caller = inspect.currentframe().f_back | ||
| if caller is not None: | ||
| func = caller.f_code.co_name | ||
| fname = caller.f_code.co_filename | ||
| lineno = caller.f_lineno | ||
| else: | ||
| func, fname, lineno = "<module>", "<unknown>", 0 | ||
| print(f"seek: {offset}, {whence} (called from {func})") | ||
| return self._fh.seek(offset, whence) | ||
| def read(self, size=-1): | ||
| if self.activated: | ||
| caller = inspect.currentframe().f_back | ||
| if caller is not None: | ||
| func = caller.f_code.co_name | ||
| fname = caller.f_code.co_filename | ||
| lineno = caller.f_lineno | ||
| else: | ||
| func, fname, lineno = "<module>", "<unknown>", 0 | ||
| pos = self._fh.tell() | ||
| print(f"read: {size} bytes at {pos} (called from {func})") | ||
| return self._fh.read(size) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| import numpy as np | ||
| import h5py | ||
| import pytest | ||
| from contextlib import nullcontext | ||
| import pyfive | ||
|
|
||
| @pytest.fixture(scope='module') | ||
| def name(tmp_path_factory): | ||
| return tmp_path_factory.mktemp("temp") / "fractal_heap.hdf5" | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("payload_size", [4033, 4032]) | ||
| @pytest.mark.parametrize("n_attrs", [10, 11]) | ||
| def test_huge_object(name, payload_size, n_attrs): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is something a stumbled over while trying to generate a test. So for my linux these figures work, but we might need to generalize better. |
||
| # 4032/4033 is the huge object treshold | ||
| # it kicks in, if we have more than 10 attributes | ||
| # todo, this needs more check, | ||
| # might depend on heap sizes and other figures | ||
| if payload_size == 4033 and n_attrs == 11: | ||
| err = pytest.raises(NotImplementedError) | ||
| else: | ||
| err = nullcontext() | ||
|
|
||
| with h5py.File(name, "w", track_order=True) as f: | ||
| for i in range(n_attrs): | ||
| f.attrs[f"small_{i}"] = np.random.randint(low=0, high=255, size=payload_size, dtype=np.uint8) | ||
|
|
||
| with h5py.File(name, "r") as f: | ||
| attrs = dict(f.attrs) | ||
|
|
||
| with pyfive.File(name, "r") as f: | ||
| with err: | ||
| attrs2 = f.attrs | ||
| print(attrs2.keys()) | ||
|
|
||
| for k, v in attrs.items(): | ||
| np.testing.assert_equal(v, attrs2[k]) | ||
|
|
||
| @pytest.mark.parametrize("n_attrs", [115, 116]) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you debug iterating fractal heap, you will see, that 115 will not have another |
||
| def test_fractal_heap(name, n_attrs): | ||
|
|
||
| # att: the assumptions below might heavily rely on the | ||
| # file layout, heaps sizes and other figures | ||
| # todo: generalize this | ||
|
|
||
| with h5py.File(name, "w", track_order=True) as f: | ||
|
|
||
| # create enough attributes to trigger dense storage | ||
| # and indirect blocks | ||
| # using small payloads to control the block filling | ||
| # 115 attributes with 4032 bytes payload each | ||
| # will not create indirect blocks, 116 attributes will | ||
|
|
||
| # 4032 bytes, small enough for managed space | ||
| # from 4033 this will run into huge object space | ||
| payload_size = 4032 | ||
| for i in range(n_attrs): | ||
| f.attrs[f"attr_{i}"] = np.random.randint(low=0, high=255, size=payload_size, dtype=np.uint8) | ||
|
|
||
| with h5py.File(name, "r") as f: | ||
| attrs = dict(f.attrs) | ||
|
|
||
| with pyfive.File(name, "r") as f: | ||
| print("\n--- debug output for test -----------------------\n") | ||
| # since we can't get any information on the heap object from pyfive | ||
| attr_info = f._dataobjects.find_msg_type(0x0015) | ||
| offset = attr_info[0]['offset_to_message'] | ||
| data = pyfive.core._unpack_struct_from(pyfive.dataobjects.ATTR_INFO_MESSAGE, f._dataobjects.msg_data, offset) | ||
| heap_address = data['fractal_heap_address'] | ||
| heap = pyfive.misc_low_level.FractalHeap(f._fh, heap_address) | ||
|
|
||
| # nfortunately we can't get anything meaningful out of this | ||
| # to see that we actually read from another indirect block | ||
| # we would need to iterate and keep log of it | ||
| # so here we just see the heap header and our block mapping | ||
| print("heap header:", heap.header) | ||
| print("heap_blocks:", len(heap.blocks), heap.blocks) | ||
| print(heap._indirect_nrows_sub) | ||
| print(heap._max_direct_nrows) | ||
|
|
||
| attrs2 = f.attrs | ||
|
|
||
| for k, v in attrs.items(): | ||
| np.testing.assert_equal(v, attrs2[k]) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is the second issue: The original code extracts offset and size into the heap, not into out flat managed buffer. When we have attributes in higher rows, this would break. We somehow have to get the correct addresses, we do it by storing them on the heap object.