Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0d766fe
Changing particlefile format to parquet
erikvansebille Jul 16, 2023
33845b1
Replacing zarr with pyarrow dependency
erikvansebille Jul 16, 2023
5b43478
Adding parquet files to .gitignore
erikvansebille Jul 16, 2023
3448091
Further implementing parquet in baseparticlefile
erikvansebille Jul 17, 2023
12a7375
Updating test_particle_file to parquet format
erikvansebille Jul 17, 2023
b88fe76
Updating unit and example tests
erikvansebille Jul 17, 2023
14c35c4
Updating particleset.from_particlefile to work with parquet
erikvansebille Jul 17, 2023
541fe58
Merge branch 'master' into parquet-writing
erikvansebille Jul 17, 2023
d29bce0
Cleaning up to_write='once' references
erikvansebille Jul 17, 2023
83aefa3
Removing pfile.close() statements
erikvansebille Jul 17, 2023
0783681
Adding support for np.datetime64 to Parquet
erikvansebille Jul 18, 2023
246352d
Removing __del__ from particlefileaos and -soa
erikvansebille Jul 18, 2023
5057987
Adding support for metadata in pyarrow.parquet.Table.schema.metadata
erikvansebille Jul 18, 2023
8b97773
removing variables_attribute_dict
erikvansebille Jul 18, 2023
1168429
Removing particlefile._set_calendar
erikvansebille Jul 18, 2023
de52511
Fixing rounding errors in np.timedelta64 for negative times
erikvansebille Jul 18, 2023
376cded
Adding implementation for cf_calendars not supported in Parquet
erikvansebille Jul 19, 2023
3a18c03
Casting fields directly to float32 on reading, to fix numpy RuntimeWa…
erikvansebille Jul 19, 2023
1c39751
Fixing nan dtypes in particlefile zarr array extending
erikvansebille Jul 19, 2023
501d5cd
Removing debug-code line
erikvansebille Jul 19, 2023
5b50e92
Merge branch 'fix_casting_warnings' into parquet-writing
erikvansebille Jul 19, 2023
883e442
Update baseparticlefile.py
erikvansebille Jul 19, 2023
bde07a3
Using trajectory and time as multiindex (and thus not using obs)
erikvansebille Jul 19, 2023
23483c5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ out-*
*.pyc
*.dSYM/*
**/*.zarr/*
**/*.parq
**/*.parquet
**/*.pqt

.idea/*
.env
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/example_decaying_moving_eddy.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def decaying_moving_example(fieldset, outfile, mode='scipy', method=AdvectionRK4

@pytest.mark.parametrize('mode', ['scipy', 'jit'])
def test_rotation_example(mode, tmpdir):
outfile = tmpdir.join('DecayingMovingParticle.zarr')
outfile = tmpdir.join('DecayingMovingParticle.parquet')
fieldset = decaying_moving_eddy_fieldset()
pset = decaying_moving_example(fieldset, outfile, mode=mode)
vals = true_values(pset[0].time, start_lon, start_lat) # Calculate values for the particle.
Expand All @@ -75,7 +75,7 @@ def test_rotation_example(mode, tmpdir):

def main():
fset_filename = 'decaying_moving_eddy'
outfile = 'DecayingMovingParticle.zarr'
outfile = 'DecayingMovingParticle.parquet'
fieldset = decaying_moving_eddy_fieldset()
fieldset.write(fset_filename)

Expand Down
3 changes: 1 addition & 2 deletions docs/examples/example_globcurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,13 @@ def DeleteParticle(particle, fieldset, time):
@pytest.mark.parametrize('dt', [-300, 300])
@pytest.mark.parametrize('pid_offset', [0, 20])
def test_globcurrent_pset_fromfile(mode, dt, pid_offset, tmpdir):
filename = tmpdir.join("pset_fromparticlefile.zarr")
filename = tmpdir.join("pset_fromparticlefile.parquet")
fieldset = set_globcurrent_fieldset()

ptype[mode].setLastID(pid_offset)
pset = ParticleSet(fieldset, pclass=ptype[mode], lon=25, lat=-35)
pfile = pset.ParticleFile(filename, outputdt=delta(hours=6))
pset.execute(AdvectionRK4, runtime=delta(days=1), dt=dt, output_file=pfile)
pfile.close()

restarttime = np.nanmax if dt > 0 else np.nanmin
pset_new = ParticleSet.from_particlefile(fieldset, pclass=ptype[mode], filename=filename, restarttime=restarttime)
Expand Down
8 changes: 4 additions & 4 deletions docs/examples/example_mitgcm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta as delta

import numpy as np
import pandas as pd
import xarray as xr

from parcels import (
Expand Down Expand Up @@ -48,21 +49,20 @@ def periodicBC(particle, fieldset, time):
size=10,
)
pfile = ParticleFile(
"MIT_particles_" + str(mode) + ".zarr", pset, outputdt=delta(days=1), chunks=(len(pset), 1)
"MIT_particles_" + str(mode) + ".parquet", pset, outputdt=delta(days=1)
)
kernels = AdvectionRK4 + pset.Kernel(periodicBC)
pset.execute(
kernels, runtime=delta(days=5), dt=delta(minutes=30), output_file=pfile
)
pfile.close()


def test_mitgcm_output_compare():
run_mitgcm_zonally_reentrant("scipy")
run_mitgcm_zonally_reentrant("jit")

ds_jit = xr.open_zarr("MIT_particles_jit.zarr")
ds_scipy = xr.open_zarr("MIT_particles_scipy.zarr")
ds_jit = xr.Dataset.from_dataframe(pd.read_parquet("MIT_particles_jit.parquet"))
ds_scipy = xr.Dataset.from_dataframe(pd.read_parquet("MIT_particles_scipy.parquet"))

np.testing.assert_allclose(ds_jit.lat.data, ds_scipy.lat.data)
np.testing.assert_allclose(ds_jit.lon.data, ds_scipy.lon.data)
4 changes: 2 additions & 2 deletions docs/examples/example_stommel.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def AgeP(particle, fieldset, time):


def stommel_example(npart=1, mode='jit', verbose=False, method=AdvectionRK4, grid_type='A',
outfile="StommelParticle.zarr", repeatdt=None, maxage=None, write_fields=True, pset_mode='soa'):
outfile="StommelParticle.parquet", repeatdt=None, maxage=None, write_fields=True, pset_mode='soa'):
timer.fieldset = timer.Timer('FieldSet', parent=timer.stommel)
fieldset = stommel_fieldset(grid_type=grid_type)
if write_fields:
Expand Down Expand Up @@ -164,7 +164,7 @@ def main(args=None):
help='Print particle information before and after execution')
p.add_argument('-m', '--method', choices=('RK4', 'EE', 'RK45'), default='RK4',
help='Numerical method used for advection')
p.add_argument('-o', '--outfile', default='StommelParticle.zarr',
p.add_argument('-o', '--outfile', default='StommelParticle.parquet',
help='Name of output file')
p.add_argument('-r', '--repeatdt', default=None, type=int,
help='repeatdt of the ParticleSet')
Expand Down
3 changes: 2 additions & 1 deletion environment_py3_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- matplotlib-base>=2.0.2
- netcdf4>=1.1.9
- numpy>=1.9.1
- pandas
- platformdirs
- psutil
- py>=1.4.27
Expand All @@ -26,7 +27,7 @@ dependencies:
- nbval
- scikit-learn
- pykdtree
- zarr>=2.11.0
- pyarrow

# Formatting
- black
Expand Down
3 changes: 2 additions & 1 deletion environment_py3_osx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- matplotlib-base>=2.0.2
- netcdf4>=1.1.9
- numpy>=1.9.1
- pandas
- platformdirs
- psutil
- py>=1.4.27
Expand All @@ -26,7 +27,7 @@ dependencies:
- nbval
- scikit-learn
- pykdtree
- zarr>=2.11.0
- pyarrow

# Formatting
- black
Expand Down
3 changes: 2 additions & 1 deletion environment_py3_win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
- matplotlib-base>=2.0.2
- netcdf4>=1.1.9
- numpy>=1.9.1
- pandas
- platformdirs
- psutil
- py>=1.4.27
Expand All @@ -24,7 +25,7 @@ dependencies:
- ipykernel
- nbval
- pykdtree
- zarr>=2.11.0
- pyarrow

# Formatting
- black
Expand Down
2 changes: 1 addition & 1 deletion parcels/collection/collectionaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ def set_variable_write_status(self, var, write_status):
var :
Name of the variable (string)
write_status :
Write status of the variable (True, False or 'once')
Write status of the variable (True or False)

"""
var_changed = False
Expand Down
8 changes: 1 addition & 7 deletions parcels/collection/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,12 +897,6 @@ def __getattr__(self, name):
else:
return False

def has_write_once_variables(self):
for var in self.ptype.variables:
if var.to_write == 'once':
return True
return False

@abstractmethod
def getvardata(self, var, indices=None):
pass
Expand All @@ -924,6 +918,6 @@ def set_variable_write_status(self, var, write_status):
var : str
Name of the variable
write_status : bool, str
Write status of the variable (True, False or 'once')
Write status of the variable (True or False)
"""
pass
3 changes: 1 addition & 2 deletions parcels/collection/collectionsoa.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
self._data['depth'][:] = depth
self._data['time'][:] = time
self._data['id'][:] = pid
self._data['once_written'][:] = 0

# special case for exceptions which can only be handled from scipy
self._data['exception'] = np.empty(self.ncount, dtype=object)
Expand Down Expand Up @@ -853,7 +852,7 @@ def set_variable_write_status(self, var, write_status):
var :
Name of the variable (string)
status :
Write status of the variable (True, False or 'once')
Write status of the variable (True or False)
write_status :
"""
var_changed = False
Expand Down
1 change: 1 addition & 0 deletions parcels/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ def computeTimeChunk(self, data, tindex):
interp_method=self.interp_method,
data_full_zdim=self.data_full_zdim,
chunksize=self.chunksize,
cast_data_dtype=self.cast_data_dtype,
rechunk_callback_fields=rechunk_callback_fields,
chunkdims_name_map=self.netcdf_chunkdims_name_map,
netcdf_decodewarning=self.netcdf_decodewarning)
Expand Down
5 changes: 3 additions & 2 deletions parcels/fieldfilebuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, filename, dimensions, indices, timestamp=None,
self.indices = indices
self.dataset = None
self.timestamp = timestamp
self.cast_data_dtype = kwargs.pop('cast_data_dtype', np.float32)
self.ti = None
self.interp_method = interp_method
self.data_full_zdim = data_full_zdim
Expand Down Expand Up @@ -207,7 +208,7 @@ def data_access(self):
data = self.dataset[self.name]
ti = range(data.shape[0]) if self.ti is None else self.ti
data = self._apply_indices(data, ti)
return np.array(data)
return np.array(data, dtype=self.cast_data_dtype)

@property
def time(self):
Expand Down Expand Up @@ -740,7 +741,7 @@ def data_access(self):
self.rechunk_callback_fields()
self.chunking_finalized = True

return data
return data.astype(self.cast_data_dtype)


class DeferredDaskFileBuffer(DaskFileBuffer):
Expand Down
9 changes: 3 additions & 6 deletions parcels/particle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ class Variable:
initial :
Initial value of the variable. Note that this can also be a Field object,
which will then be sampled at the location of the particle
to_write : bool, 'once', optional
Boolean or 'once'. Controls whether Variable is written to NetCDF file.
If to_write = 'once', the variable will be written as a time-independent 1D array
to_write : bool, optional
Boolean. Controls whether Variable is written to NetCDF file.
"""

def __init__(self, name, dtype=np.float32, initial=0, to_write=True):
Expand Down Expand Up @@ -203,8 +202,7 @@ class ScipyParticle(_Particle):
lat = Variable('lat', dtype=np.float32)
depth = Variable('depth', dtype=np.float32)
time = Variable('time', dtype=np.float64)
id = Variable('id', dtype=np.int64, to_write='once')
once_written = Variable('once_written', dtype=np.int32, initial=0, to_write=False) # np.bool not implemented in JIT
id = Variable('id', dtype=np.int64, to_write=False)
dt = Variable('dt', dtype=np.float64, to_write=False)
state = Variable('state', dtype=np.int32, initial=StateCode.Evaluate, to_write=False)
next_dt = Variable('_next_dt', dtype=np.float64, initial=np.nan, to_write=False)
Expand All @@ -218,7 +216,6 @@ def __init__(self, lon, lat, pid, fieldset=None, ngrids=None, depth=0., time=0.,
type(self).time.initial = time
type(self).id.initial = pid
_Particle.lastID = max(_Particle.lastID, pid)
type(self).once_written.initial = 0
type(self).dt.initial = None
type(self).next_dt.initial = np.nan

Expand Down
1 change: 0 additions & 1 deletion parcels/particlefile/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Module controlling the writing of ParticleSets to NetCDF file."""

from .baseparticlefile import BaseParticleFile # noqa: F401
from .baseparticlefile import _set_calendar # noqa: F401
from .particlefileaos import ParticleFileAOS # noqa: F401
from .particlefilesoa import ParticleFileSOA # noqa: F401

Expand Down
Loading