Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyfive/dataobjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def _get_attributes_from_attr_info(self, attrs, attr_info):
adict = dict()
for record in btree.iter_records():
data = heap.get_data(record['heapid'])
# in the unlikely event of loss of data
# just move on
if data is None:
continue
name, value = self._parse_attribute_msg(data,0)
adict[name] = value
return adict
Expand Down
140 changes: 119 additions & 21 deletions pyfive/misc_low_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from .core import InvalidHDF5File
from .core import UNDEFINED_ADDRESS
from .core import Reference
from .p5t import P5Type, P5CompoundType, P5EnumType, P5StringType, P5OpaqueType, P5ReferenceType, P5IntegerType, P5FloatType
from math import prod
import numpy as np

# uncomment this and use as shown in the FractalHeap if I/O diagnostic is needed
#from .utilities import Interceptor


class SuperBlock(object):
"""
Expand Down Expand Up @@ -168,11 +170,61 @@ def objects(self):

class FractalHeap(object):
"""
HDF5 Fractal Heap.
HDF5 Fractal Heap

The fractal heap implements the doubling table structure with indirect and direct blocks.
Indirect blocks in the heap do not actually contain data for objects in the heap,
their “size” is abstract - they represent the indexing structure for locating the direct blocks
in the doubling table. Direct blocks contain the actual data for objects stored in the heap.
They could be scattered all over the file unless the metadata is stored at the front by
carerful use of the HDF5 file creation properties.

The fractal heap ID can refer to a “tiny”, “huge”, or “managed” object.
If it's tiny, the ID contains the actual data and the heap itself does not need to be read from.
If it's huge, the ID contains the address on disk of the data or a b-tree key that can be used to find this address.
If it's managed, then it contains the offset and length within the virtual fractal heap address space
(i.e. inside a direct block, possibly indexed by one or more indirect blocks).

Which direct and indirect blocks contains the data, and the offset within the direct
block can be calculated by using the various parameters and algorithms described
at the start of the fractal heap section. It is an array of blocks of increasing size
within a linear address space.

Documentation lifted from the HDF5 file format documentation:

The number of rows of blocks, nrows, in an indirect block is calculated by the following expression:

nrows = (log2(iblock_size) - log2(<Starting Block Size>)) + 1

where block_size is the size of the block that the indirect block represents in the doubling table.
For example, to represent a block with block_size equals to 1024, and Starting Block Size equals to 256,
three rows are needed.

The maximum number of rows of direct blocks, max_dblock_rows, in any indirect block of a fractal heap
is given by the following expression:

max_dblock_rows = (log2(<Maximum Direct Block Size>) - log2(<Starting Block Size>)) + 2

Using the computed values for nrows and max_dblock_rows, along with the Width of the doubling table,
the number of direct and indirect block entries (K and N in the indirect block description, below) in
an indirect block can be computed:

K = MIN(nrows, max_dblock_rows) * Table Width

If nrows is less than or equal to max_dblock_rows, N is 0. Otherwise, N is simply computed:

N = K - (max_dblock_rows * Table Width)

The size of indirect blocks on disk is determined by the number of rows in the indirect block (computed above).
The size of direct blocks on disk is exactly the size of the block in the doubling table.

"""

def __init__(self, fh, offset):

"""
Read the heap header and construct the linear block mapping
"""
#fh = Interceptor(fh)
fh.seek(offset)
header = _unpack_struct_from_file(FRACTAL_HEAP_HEADER, fh)
assert header['signature'] == b'FRHP'
Expand Down Expand Up @@ -213,7 +265,7 @@ def __init__(self, fh, offset):
start_block_size = header['starting_block_size']
table_width = header['table_width']
if not start_block_size:
assert NotImplementedError
raise NotImplementedError

log2_maximum_dblock_size = int(log2(maximum_dblock_size))
assert 2**log2_maximum_dblock_size == maximum_dblock_size
Expand All @@ -223,29 +275,62 @@ def __init__(self, fh, offset):

log2_table_width = int(log2(table_width))
assert 2**log2_table_width == table_width

# TODO: double check this calculation, the HDF5 docs say the
# number of nblocks, nrows, in an indirect block is calculated by the following expression
# nrows = (log2(iblock_size) - log2(<Starting Block Size>)) + 1
# the question is, how is this used?

self._indirect_nrows_sub = log2_table_width + log2_start_block_size - 1

self.header = header
self.nobjects = header["managed_object_count"] + header["huge_object_count"] + header["tiny_object_count"]

managed = []
# while iterating over direct and indirect blocks we keep track of the heap_offset
# thus, we are able to map this later back to an offset into our managed heap buffer
blocks = []
buffer_offset = 0
root_address = header["root_block_address"]
if root_address:
nrows = header["indirect_current_rows_count"]
if nrows:
for data in self._iter_indirect_block(fh, root_address, nrows):
# Address of root block points to an indirect block
for data, heap_offset, block_size in self._iter_indirect_block(fh, root_address, nrows):
managed.append(data)
blocks.append((heap_offset, buffer_offset, block_size))
buffer_offset += len(data)
else:
data = self._read_direct_block(fh, root_address, start_block_size)
# Address of root block points to a direct block
data, heap_offset = self._read_direct_block(fh, root_address, start_block_size)
managed.append(data)
blocks.append((heap_offset, buffer_offset, start_block_size))
buffer_offset += len(data)

self.managed = b"".join(managed)
self.blocks = blocks

def _read_direct_block(self, fh, offset, block_size):
"""
Read FHDB - direct block - from heap and return data and heap offset
"""
fh.seek(offset)
data = fh.read(block_size)
header = _unpack_struct_from(self.direct_block_header, data)
header["signature"] == b"FHDB"
return data
assert header["signature"] == b"FHDB"
return data, int.from_bytes(header["block_offset"],
byteorder="little", signed=False)

def _heapid_to_buffer_offset(self, heapid_offset):
"""
Get offset into flat managed buffer from heapid offset
"""
for heap_offset, buffer_offset, block_size in self.blocks:
if heap_offset <= heapid_offset < heap_offset + block_size:
relative = heapid_offset - heap_offset
return buffer_offset + relative

raise KeyError("HeapID offset not inside any heap block")

def get_data(self, heapid):
firstbyte = heapid[0]
Expand All @@ -262,7 +347,14 @@ def get_data(self, heapid):
data_offset += nbytes
nbytes = self._managed_object_length_size
size = _unpack_integer(nbytes, heapid, data_offset)
return self.managed[offset:offset+size]

# map heap_id offset to flat buffer offset
offset = self._heapid_to_buffer_offset(offset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is the second issue: The original code extracts offset and size into the heap, not into out flat managed buffer. When we have attributes in higher rows, this would break. We somehow have to get the correct addresses, we do it by storing them on the heap object.

if offset < len(self.managed):
return self.managed[offset:offset + size]

return None

case 1: # tiny
raise NotImplementedError
case 2: # huge
Expand All @@ -288,34 +380,38 @@ def _read_integral(self, fh, nbytes):
def _iter_indirect_block(self, fh, offset, nrows):
fh.seek(offset)
header = _unpack_struct_from_file(self.indirect_block_header, fh)
header["signature"] == b"FHIB"
assert header["signature"] == b"FHIB"
header["block_offset"] = int.from_bytes(header["block_offset"], byteorder="little", signed=False)
# todo: this isn't really clear how the number of ndirect is deduced
# at least, we need to derive the correct number by iterating over below
ndirect, nindirect = self._indirect_info(nrows)

direct_blocks = list()
for i in range(ndirect):
address = struct.unpack('<Q', fh.read(8))[0]
if address == UNDEFINED_ADDRESS:
break
# if there is no valid address, we move on to the next
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this was the first issue: There are cases, where attributes are in higher rows, even if lower rows are not set. So iteration over all possible addresses is needed!

continue
block_size = self._calc_block_size(i)
direct_blocks.append((address, block_size))

indirect_blocks = list()
for i in range(ndirect, ndirect+nindirect):
address = struct.unpack('<Q', fh.read(8))[0]
if address == UNDEFINED_ADDRESS:
break
# same here, move on to the next address
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

continue
block_size = self._calc_block_size(i)
nrows = self._iblock_nrows_from_block_size(block_size)
indirect_blocks.append((address, nrows))
indirect_blocks.append((address, block_size, nrows))

for address, block_size in direct_blocks:
obj = self._read_direct_block(fh, address, block_size)
yield obj
obj, heap_offset = self._read_direct_block(fh, address, block_size)
yield obj, heap_offset, block_size

for address, nrows in indirect_blocks:
for obj in self._iter_indirect_block(fh, address, nrows):
yield obj
for address, block_size, nrows in indirect_blocks:
for obj, heap_offset, _block_size in self._iter_indirect_block(fh, address, nrows):
yield obj, heap_offset, _block_size

def _calc_block_size(self, iblock):
row = iblock//self.header["table_width"]
Expand All @@ -330,7 +426,9 @@ def _indirect_info(self, nrows):
table_width = self.header['table_width']
nobjects = nrows * table_width
ndirect_max = self._max_direct_nrows * table_width
if nrows <= ndirect_max:
# this info cannot tell the precise amount of blocks
# it can just tell us the maximum possible amount we should parse
if nobjects <= ndirect_max:
Copy link
Collaborator

@kmuehlbauer kmuehlbauer Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third "flaw":
This prevented decoding, so we need to take table_width into account.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this was only a problem for many, many, many attributes and/or very, very large attributes

ndirect = nobjects
nindirect = 0
else:
Expand Down Expand Up @@ -575,8 +673,8 @@ def _decode_array(arr, decoded):

('managed_freespace_size', 'Q'), # 8 byte addressing
('freespace_manager_address', 'Q'), # 8 byte addressing
('managed_space_size', 'Q'), # 8 byte addressing
('managed_alloc_size', 'Q'), # 8 byte addressing
('managed_space_size', 'Q'), # 8 byte addressing; this is the upper bound in the heaps linear address space
('managed_alloc_size', 'Q'), # 8 byte addressing; this is how much of that that is currently allocated to the heap.
('next_directblock_iterator_address', 'Q'), # 8 byte addressing

('managed_object_count', 'Q'), # 8 byte addressing
Expand Down
33 changes: 33 additions & 0 deletions pyfive/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import inspect

class Interceptor:
"""
Intercepts file-io and logs what is going on.
Used in debugging file reading issues and optimisation.
"""
def __init__(self, fh, activated=True):
self._fh = fh
self.activated=activated
def seek(self, offset, whence=0):
if self.activated:
caller = inspect.currentframe().f_back
if caller is not None:
func = caller.f_code.co_name
fname = caller.f_code.co_filename
lineno = caller.f_lineno
else:
func, fname, lineno = "<module>", "<unknown>", 0
print(f"seek: {offset}, {whence} (called from {func})")
return self._fh.seek(offset, whence)
def read(self, size=-1):
if self.activated:
caller = inspect.currentframe().f_back
if caller is not None:
func = caller.f_code.co_name
fname = caller.f_code.co_filename
lineno = caller.f_lineno
else:
func, fname, lineno = "<module>", "<unknown>", 0
pos = self._fh.tell()
print(f"read: {size} bytes at {pos} (called from {func})")
return self._fh.read(size)
Binary file not shown.
11 changes: 11 additions & 0 deletions tests/test_buffer_issue.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ def test_buffer_issue():
print("File with issue da193a_25_6hr_t_pt_cordex__198807-198807.nc")
print("Variable m01s30i111")
_load_nc_file('m01s30i111')


def test_buffer_issue_ukesm():
"""Test with yet another corner case file."""
fp = "tests/data/noy_AERmonZ_UKESM1-0-LL_piControl_r1i1p1f2_gnz_200001-200012.nc"
with pyfive.File(fp) as pfile:
print(pfile["noy"])
attrs = pfile["noy"].attrs
print(len(attrs))
print(attrs.keys())

84 changes: 84 additions & 0 deletions tests/test_fractal_heap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import numpy as np
import h5py
import pytest
from contextlib import nullcontext
import pyfive

@pytest.fixture(scope='module')
def name(tmp_path_factory):
return tmp_path_factory.mktemp("temp") / "fractal_heap.hdf5"


@pytest.mark.parametrize("payload_size", [4033, 4032])
@pytest.mark.parametrize("n_attrs", [10, 11])
def test_huge_object(name, payload_size, n_attrs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something a stumbled over while trying to generate a test. So for my linux these figures work, but we might need to generalize better.

# 4032/4033 is the huge object treshold
# it kicks in, if we have more than 10 attributes
# todo, this needs more check,
# might depend on heap sizes and other figures
if payload_size == 4033 and n_attrs == 11:
err = pytest.raises(NotImplementedError)
else:
err = nullcontext()

with h5py.File(name, "w", track_order=True) as f:
for i in range(n_attrs):
f.attrs[f"small_{i}"] = np.random.randint(low=0, high=255, size=payload_size, dtype=np.uint8)

with h5py.File(name, "r") as f:
attrs = dict(f.attrs)

with pyfive.File(name, "r") as f:
with err:
attrs2 = f.attrs
print(attrs2.keys())

for k, v in attrs.items():
np.testing.assert_equal(v, attrs2[k])

@pytest.mark.parametrize("n_attrs", [115, 116])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you debug iterating fractal heap, you will see, that 115 will not have another FHIB whereas 116 there is another FHIB with one nested FHDB. Seems there is no way of checking other than iterating again.

def test_fractal_heap(name, n_attrs):

# att: the assumptions below might heavily rely on the
# file layout, heaps sizes and other figures
# todo: generalize this

with h5py.File(name, "w", track_order=True) as f:

# create enough attributes to trigger dense storage
# and indirect blocks
# using small payloads to control the block filling
# 115 attributes with 4032 bytes payload each
# will not create indirect blocks, 116 attributes will

# 4032 bytes, small enough for managed space
# from 4033 this will run into huge object space
payload_size = 4032
for i in range(n_attrs):
f.attrs[f"attr_{i}"] = np.random.randint(low=0, high=255, size=payload_size, dtype=np.uint8)

with h5py.File(name, "r") as f:
attrs = dict(f.attrs)

with pyfive.File(name, "r") as f:
print("\n--- debug output for test -----------------------\n")
# since we can't get any information on the heap object from pyfive
attr_info = f._dataobjects.find_msg_type(0x0015)
offset = attr_info[0]['offset_to_message']
data = pyfive.core._unpack_struct_from(pyfive.dataobjects.ATTR_INFO_MESSAGE, f._dataobjects.msg_data, offset)
heap_address = data['fractal_heap_address']
heap = pyfive.misc_low_level.FractalHeap(f._fh, heap_address)

# nfortunately we can't get anything meaningful out of this
# to see that we actually read from another indirect block
# we would need to iterate and keep log of it
# so here we just see the heap header and our block mapping
print("heap header:", heap.header)
print("heap_blocks:", len(heap.blocks), heap.blocks)
print(heap._indirect_nrows_sub)
print(heap._max_direct_nrows)

attrs2 = f.attrs

for k, v in attrs.items():
np.testing.assert_equal(v, attrs2[k])
Loading