-
Notifications
You must be signed in to change notification settings - Fork 168
Closed
Description
The read_from_npy function in particlefilesoa.py generates a 64-bit array with dimension (number_of_trajectories, number_of_unique_time_recods) so the capacity to store this array in memory is a requirement to export the npy files dumped by parcels to netcdf. For long runs/frequent output, this restriction is much stricter than the memory limit on the number of active particles/field size. Furthermore, if there is insufficient memory to store this array, this will only be flagged as an OOM memory during export to netcdf (i.e. after the simulation has completed), which may result in a lot of time being wasted.
I have two suggestions:
- It is not possible to predict the size of this array before particle tracking has completed, but I think the minimum possible size is (number_of_trajectories, run_time/output_dt), and all of these variables are available prior to execution. So whilst this would not guarantee that there is sufficient memory, throwing a warning if 8*number_of_trajectories*run_time/output_dt > available_memory would at least catch runs that are guaranteed to fail at the export step (assuming that BG memory use does not vary by much).
- Evaluate whether
8*(self.maxid_written+1)*len(self.time_written) >= psutil.virtual_memory()[1]in the early stages of theexportfunction and, if true, read the numpy files and write to the netcdf file in chunks, rather than trying to store all data from a particular variable in memory at once. I've written up a naive modification of the relevant functions inparticlefilesoa.pybelow (which splits particle IDs into chunks with limits inid_range, and only loads particle IDs satisfyingid_range[0] <= id < id_range[1]at once). This is not a great implementation and I have only carried out basic tests, but I would be happy to work on this further if you think this method is reasonable and useful.
def read_from_npy(self, file_list, time_steps, var, id_range):
"""
Read NPY-files for one variable using a loop over all files.
Attention:
For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
:param file_list: List that contains all file names in the output directory
:param time_steps: Number of time steps that were written in out directory
:param var: name of the variable to read
"""
valid_id = np.arange(id_range[0], id_range[1])
n_ids = len(valid_id)
data = np.nan * np.zeros((n_ids, time_steps))
time_index = np.zeros(n_ids, dtype=np.int64)
t_ind_used = np.zeros(time_steps, dtype=np.int64)
# loop over all files
for npyfile in file_list:
try:
data_dict = np.load(npyfile, allow_pickle=True).item()
except NameError:
raise RuntimeError('Cannot combine npy files into netcdf file because your ParticleFile is '
'still open on interpreter shutdown.\nYou can use '
'"parcels_convert_npydir_to_netcdf %s" to convert these to '
'a NetCDF file yourself.\nTo avoid this error, make sure you '
'close() your ParticleFile at the end of your script.' % self.tempwritedir)
id_avail = np.array(data_dict["id"], dtype=np.int64)
id_mask_full = np.in1d(id_avail, valid_id) # which ids in data are present in this chunk
id_mask_chunk = np.in1d(valid_id, id_avail) # which ids in this chunk are present in data
t_ind = time_index[id_mask_chunk] if 'once' not in file_list[0] else 0
t_ind_used[t_ind] = 1
data[id_mask_chunk, t_ind] = data_dict[var][id_mask_full]
time_index[id_mask_chunk] = time_index[id_mask_chunk] + 1
# remove rows and columns that are completely filled with nan values
tmp = data[time_index > 0, :]
return tmp[:, t_ind_used == 1]
def export(self):
"""
Exports outputs in temporary NPY-files to NetCDF file
Attention:
For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
"""
if MPI:
# The export can only start when all threads are done.
MPI.COMM_WORLD.Barrier()
if MPI.COMM_WORLD.Get_rank() > 0:
return # export only on threat 0
# Retrieve all temporary writing directories and sort them in numerical order
temp_names = sorted(glob(os.path.join("%s" % self.tempwritedir_base, "*")),
key=lambda x: int(os.path.basename(x)))
if len(temp_names) == 0:
raise RuntimeError("No npy files found in %s" % self.tempwritedir_base)
global_maxid_written = -1
global_time_written = []
global_file_list = []
if len(self.var_names_once) > 0:
global_file_list_once = []
for tempwritedir in temp_names:
if os.path.exists(tempwritedir):
pset_info_local = np.load(os.path.join(tempwritedir, 'pset_info.npy'), allow_pickle=True).item()
global_maxid_written = np.max([global_maxid_written, pset_info_local['maxid_written']])
for npyfile in pset_info_local['file_list']:
tmp_dict = np.load(npyfile, allow_pickle=True).item()
global_time_written.append([t for t in tmp_dict['time']])
global_file_list += pset_info_local['file_list']
if len(self.var_names_once) > 0:
global_file_list_once += pset_info_local['file_list_once']
self.maxid_written = global_maxid_written
self.time_written = np.unique(global_time_written)
for var in self.var_names:
# Find available memory to check if output file is too large
avail_mem = psutil.virtual_memory()[1]
req_mem = (self.maxid_written+1)*len(self.time_written)*8*1.2
if req_mem > avail_mem:
# Read id_per_chunk ids at a time to keep memory use down
total_chunks = int(np.ceil(req_mem/avail_mem))
id_per_chunk = int(np.ceil((self.maxid_written+1)/total_chunks))
else:
total_chunks = 1
id_per_chunk = self.maxid_written+1
for chunk in range(total_chunks):
# Minimum and maximum id for this chunk
id_range = [chunk*id_per_chunk,
np.min(((chunk+1)*id_per_chunk, self.maxid_written+1))]
# Read chunk-sized data from NPY-files
data = self.read_from_npy(global_file_list, len(self.time_written), var, id_range)
if var == self.var_names[0]:
self.open_netcdf_file((self.maxid_written+1, data.shape[1]))
varout = 'z' if var == 'depth' else var
getattr(self, varout)[id_range[0]:id_range[1], :] = data
if len(self.var_names_once) > 0:
for var in self.var_names_once:
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, 1, var, [0, self.maxid_written+1])
self.close_netcdf_file()
Metadata
Metadata
Assignees
Labels
No labels