Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/run-test-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ jobs:
use-mamba: true
- run: conda --version
- run: python -V
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- run: pip install -e .
- run: conda list
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ jobs:
use-mamba: true
- run: conda --version
- run: python -V
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- run: conda list
- run: pip install -e .
Expand All @@ -66,12 +66,12 @@ jobs:
use-mamba: true
- run: conda --version
- run: python -V
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- run: conda list
- run: mamba install -c conda-forge git
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test_s3_minio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ jobs:
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
use-mamba: true
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- name: Install PyActiveStorage
run: |
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test_s3_remote_reductionist.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ jobs:
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
use-mamba: true
- name: Install development version of bnlawrence/Pyfive:issue60
- name: Install development version of NCAS-CMS/Pyfive:h5netcdf
run: |
cd ..
git clone https://github.com/bnlawrence/pyfive.git
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout issue60
git checkout h5netcdf
pip install -e .
- name: Install PyActiveStorage
run: |
Expand Down
29 changes: 16 additions & 13 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import urllib
import pyfive
import time
from pyfive.h5d import StoreInfo

import s3fs

Expand Down Expand Up @@ -307,8 +308,8 @@ def _get_selection(self, *args):
name = self.ds.name
dtype = np.dtype(self.ds.dtype)
# hopefully fix pyfive to get a dtype directly
array = pyfive.ZarrArrayStub(self.ds.shape, self.ds.chunks)
ds = self.ds._dataobjects
array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks)
ds = self.ds.id

self.metric_data['args'] = args
self.metric_data['dataset shape'] = self.ds.shape
Expand All @@ -318,7 +319,7 @@ def _get_selection(self, *args):
else:
compressor, filters = decode_filters(ds.filter_pipeline , dtype.itemsize, name)

indexer = pyfive.OrthogonalIndexer(*args, array)
indexer = pyfive.indexing.OrthogonalIndexer(*args, array)
out_shape = indexer.shape
#stripped_indexer = [(a, b, c) for a,b,c in indexer]
drop_axes = indexer.drop_axes and keepdims
Expand All @@ -334,7 +335,7 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
out = []
counts = []
else:
out = np.empty(out_shape, dtype=out_dtype, order=ds.order)
out = np.empty(out_shape, dtype=out_dtype, order=ds._order)
counts = None # should never get touched with no method!

# Create a shared session object.
Expand Down Expand Up @@ -364,10 +365,10 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f

if ds.chunks is not None:
t1 = time.time()
ds._get_chunk_addresses()
# ds._get_chunk_addresses()
t2 = time.time() - t1
self.metric_data['indexing time (s)'] = t2
self.metric_data['chunk number'] = len(ds._zchunk_index)
# self.metric_data['chunk number'] = len(ds._zchunk_index)
chunk_count = 0
t1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor:
Expand Down Expand Up @@ -464,15 +465,17 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
#FIXME: Do, we, it's not actually used?

"""

offset, size, filter_mask = ds.get_chunk_details(chunk_coords)

# retrieve coordinates from chunk index
storeinfo = ds.get_chunk_info_from_chunk_coord(chunk_coords)
offset, size = storeinfo.byte_offset, storeinfo.size
self.data_read += size

if self.storage_type == 's3' and self._version == 1:

tmp, count = reduce_opens3_chunk(ds.fh, offset, size, compressor, filters,
tmp, count = reduce_opens3_chunk(ds._fh, offset, size, compressor, filters,
self.missing, ds.dtype,
chunks, ds.order,
chunks, ds._order,
chunk_selection, method=self.method
)

Expand All @@ -499,7 +502,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
size, compressor, filters,
self.missing, np.dtype(ds.dtype),
chunks,
ds.order,
ds._order,
chunk_selection,
operation=self._method)
else:
Expand All @@ -518,7 +521,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
size, compressor, filters,
self.missing, np.dtype(ds.dtype),
chunks,
ds.order,
ds._order,
chunk_selection,
operation=self._method)
elif self.storage_type=='ActivePosix' and self.version==2:
Expand All @@ -531,7 +534,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
# although we will version changes.
tmp, count = reduce_chunk(self.filename, offset, size, compressor, filters,
self.missing, ds.dtype,
chunks, ds.order,
chunks, ds._order,
chunk_selection, method=self.method)

if self.method is not None:
Expand Down
2 changes: 1 addition & 1 deletion activestorage/hdf2numcodec.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def decode_filters(filter_pipeline, itemsize, name):
for filter in filter_pipeline:

filter_id=filter['filter_id']
properties = filter['client_data_values']
properties = filter['client_data']


# We suppor the following
Expand Down
11 changes: 6 additions & 5 deletions tests/test_reductionist_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def __init__(self, f, v):
self.f = pyfive.File(f)
ds = self.f[v]
self.dtype = np.dtype(ds.dtype)
self.array = pyfive.ZarrArrayStub(ds.shape, ds.chunks or ds.shape)
self.array = pyfive.indexing.ZarrArrayStub(ds.shape, ds.chunks or ds.shape)
self.missing = get_missing_attributes(ds)
ds = ds._dataobjects
ds = ds.id
self.ds = ds
def __getitem__(self, args):
if self.ds.filter_pipeline is None:
Expand All @@ -30,12 +30,13 @@ def __getitem__(self, args):
if self.ds.chunks is not None:
self.ds._get_chunk_addresses()

indexer = pyfive.OrthogonalIndexer(args, self.array)
indexer = pyfive.indexing.OrthogonalIndexer(args, self.array)
for chunk_coords, chunk_selection, out_selection in indexer:
offset, size, filter_mask = self.ds.get_chunk_details(chunk_coords)
storeinfo = self.ds.get_chunk_info_from_chunk_coord(chunk_coords)
offset, size = storeinfo.byte_offset, storeinfo.size
jd = reductionist.build_request_data('a','b','c',
offset, size, compressor, filters, self.missing, self.dtype,
self.array._chunks,self.ds.order,chunk_selection)
self.array._chunks,self.ds._order,chunk_selection)
js = json.dumps(jd)
return None

Expand Down
Loading