diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 37f1aff..9c394a3 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -25,11 +25,16 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | + sudo apt install -y libopenmpi-dev libhdf5-mpi-dev python -m pip install --upgrade pip python -m pip install pylint pytest pytest-cov python -m pip install jupyter if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + CC="mpicc" HDF5_MPI="ON" pip install --upgrade --force-reinstall --no-binary=h5py h5py python setup.py install + env: + CC: mpicc + HDF5_MPI: ON - name: Lint with pylint run: | # stop the build if there are Python syntax errors or undefined names diff --git a/requirements.txt b/requirements.txt index 5e9952d..0d5e71d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,8 +3,9 @@ numpy>=1.21.0;python_version>="3.8" numpy<1.22.*;python_version<"3.8" astropy;python_version>="3.8" astropy<5.*;python_version<"3.8" -h5py +h5py>=2.9=mpi* pandas;python_version>="3.8" pandas<1.4.*;python_version<"3.8" pyarrow tables +mpi4py diff --git a/setup.py b/setup.py index ab9ebf7..2592729 100644 --- a/setup.py +++ b/setup.py @@ -24,9 +24,10 @@ "numpy<1.22.*;python_version<'3.8'", "astropy;python_version>='3.8'", "astropy<5.*;python_version<'3.8'", - "h5py", + "h5py>=2.9=mpi*", "pandas;python_version>='3.8'", "pandas<1.4.*;python_version<'3.8'", "pyarrow", - "tables"] + "tables", + "mpi4py"] ) diff --git a/tables_io/ioUtils.py b/tables_io/ioUtils.py index fb4c9df..248fec3 100644 --- a/tables_io/ioUtils.py +++ b/tables_io/ioUtils.py @@ -123,13 +123,15 @@ def initializeHdf5WriteSingle(filepath, groupname=None, **kwds): return group, outf -def initializeHdf5Write(filepath, **kwds): +def initializeHdf5Write(filepath, comm=None, **kwds): """ Prepares an hdf5 file for output Parameters ---------- filepath : `str` The output file name + comm: `communicator` + MPI commuticator to do parallel writing Returns ------- @@ -161,7 +163,12 @@ def initializeHdf5Write(filepath, **kwds): outdir = os.path.dirname(os.path.abspath(filepath)) if not os.path.exists(outdir): #pragma: no cover os.makedirs(outdir, exist_ok=True) - outf = h5py.File(filepath, "w") + if comm == None: + outf = h5py.File(filepath, "w") + else: + if not h5py.get_config().mpi: + raise TypeError(f"hdf5py module not prepared for parallel writing.") #pragma: no cover + outf = h5py.File(filepath, "w",driver='mpio', comm=comm) groups = {} for k, v in kwds.items(): group = outf.create_group(k) diff --git a/tests/test_fileIO.py b/tests/test_fileIO.py index 002f857..9f2e634 100644 --- a/tests/test_fileIO.py +++ b/tests/test_fileIO.py @@ -57,7 +57,23 @@ def test_write_output_file(): groups, outf = io.initializeHdf5Write(test_outfile, data = dict(photoz_mode=((npdf,), 'f4'), photoz_pdf=((npdf, nbins), 'f4'))) io.writeDictToHdf5Chunk(groups, data_dict, 0, npdf, zmode='photoz_mode', pz_pdf='photoz_pdf') io.finalizeHdf5Write(outf, 'md', zgrid=zgrid) + os.unlink(test_outfile) +def test_write_output_parallel_file(): + from mpi4py import MPI + """ Testing parallel write """ + comm = MPI.COMM_WORLD + npdf = 40 + nbins = 21 + pz_pdf = np.random.uniform(size=(npdf, nbins)) + zgrid = np.linspace(0, 4, nbins) + zmode = zgrid[np.argmax(pz_pdf, axis=1)] + + data_dict = {'data': dict(zmode=zmode, pz_pdf=pz_pdf)} + + groups, outf = io.initializeHdf5Write(test_outfile, data = dict(photoz_mode=((npdf,), 'f4'), photoz_pdf=((npdf, nbins), 'f4')), comm=comm) + io.writeDictToHdf5Chunk(groups, data_dict, 0, npdf, zmode='photoz_mode', pz_pdf='photoz_pdf') + io.finalizeHdf5Write(outf, 'md', zgrid=zgrid) os.unlink(test_outfile) def test_write_output_file_single():