Skip to content

Memory limitation in netcdf export step #1091

@nvogtvincent

Description

@nvogtvincent

The read_from_npy function in particlefilesoa.py generates a 64-bit array with dimension (number_of_trajectories, number_of_unique_time_recods) so the capacity to store this array in memory is a requirement to export the npy files dumped by parcels to netcdf. For long runs/frequent output, this restriction is much stricter than the memory limit on the number of active particles/field size. Furthermore, if there is insufficient memory to store this array, this will only be flagged as an OOM memory during export to netcdf (i.e. after the simulation has completed), which may result in a lot of time being wasted.

I have two suggestions:

  • It is not possible to predict the size of this array before particle tracking has completed, but I think the minimum possible size is (number_of_trajectories, run_time/output_dt), and all of these variables are available prior to execution. So whilst this would not guarantee that there is sufficient memory, throwing a warning if 8*number_of_trajectories*run_time/output_dt > available_memory would at least catch runs that are guaranteed to fail at the export step (assuming that BG memory use does not vary by much).
  • Evaluate whether 8*(self.maxid_written+1)*len(self.time_written) >= psutil.virtual_memory()[1] in the early stages of the export function and, if true, read the numpy files and write to the netcdf file in chunks, rather than trying to store all data from a particular variable in memory at once. I've written up a naive modification of the relevant functions in particlefilesoa.py below (which splits particle IDs into chunks with limits in id_range, and only loads particle IDs satisfying id_range[0] <= id < id_range[1] at once). This is not a great implementation and I have only carried out basic tests, but I would be happy to work on this further if you think this method is reasonable and useful.
def read_from_npy(self, file_list, time_steps, var, id_range):
    """
    Read NPY-files for one variable using a loop over all files.

    Attention:
    For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.

    :param file_list: List that  contains all file names in the output directory
    :param time_steps: Number of time steps that were written in out directory
    :param var: name of the variable to read
    """

    valid_id = np.arange(id_range[0], id_range[1])
    n_ids = len(valid_id)

    data = np.nan * np.zeros((n_ids, time_steps))
    time_index = np.zeros(n_ids, dtype=np.int64)
    t_ind_used = np.zeros(time_steps, dtype=np.int64)

    # loop over all files
    for npyfile in file_list:
        try:
            data_dict = np.load(npyfile, allow_pickle=True).item()
        except NameError:
            raise RuntimeError('Cannot combine npy files into netcdf file because your ParticleFile is '
                               'still open on interpreter shutdown.\nYou can use '
                               '"parcels_convert_npydir_to_netcdf %s" to convert these to '
                               'a NetCDF file yourself.\nTo avoid this error, make sure you '
                               'close() your ParticleFile at the end of your script.' % self.tempwritedir)

        id_avail = np.array(data_dict["id"], dtype=np.int64)
        id_mask_full = np.in1d(id_avail, valid_id) # which ids in data are present in this chunk
        id_mask_chunk = np.in1d(valid_id, id_avail) # which ids in this chunk are present in data
        t_ind = time_index[id_mask_chunk] if 'once' not in file_list[0] else 0
        t_ind_used[t_ind] = 1
        data[id_mask_chunk, t_ind] = data_dict[var][id_mask_full]
        time_index[id_mask_chunk] = time_index[id_mask_chunk] + 1

    # remove rows and columns that are completely filled with nan values
    tmp = data[time_index > 0, :]
    return tmp[:, t_ind_used == 1]


def export(self):
    """
    Exports outputs in temporary NPY-files to NetCDF file

    Attention:
    For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
    """

    if MPI:
        # The export can only start when all threads are done.
        MPI.COMM_WORLD.Barrier()
        if MPI.COMM_WORLD.Get_rank() > 0:
            return  # export only on threat 0

    # Retrieve all temporary writing directories and sort them in numerical order
    temp_names = sorted(glob(os.path.join("%s" % self.tempwritedir_base, "*")),
                        key=lambda x: int(os.path.basename(x)))

    if len(temp_names) == 0:
        raise RuntimeError("No npy files found in %s" % self.tempwritedir_base)

    global_maxid_written = -1
    global_time_written = []
    global_file_list = []
    if len(self.var_names_once) > 0:
        global_file_list_once = []
    for tempwritedir in temp_names:
        if os.path.exists(tempwritedir):
            pset_info_local = np.load(os.path.join(tempwritedir, 'pset_info.npy'), allow_pickle=True).item()
            global_maxid_written = np.max([global_maxid_written, pset_info_local['maxid_written']])
            for npyfile in pset_info_local['file_list']:
                tmp_dict = np.load(npyfile, allow_pickle=True).item()
                global_time_written.append([t for t in tmp_dict['time']])
            global_file_list += pset_info_local['file_list']
            if len(self.var_names_once) > 0:
                global_file_list_once += pset_info_local['file_list_once']
    self.maxid_written = global_maxid_written
    self.time_written = np.unique(global_time_written)

    for var in self.var_names:
        # Find available memory to check if output file is too large
        avail_mem = psutil.virtual_memory()[1]
        req_mem   = (self.maxid_written+1)*len(self.time_written)*8*1.2

        if req_mem > avail_mem:
            # Read id_per_chunk ids at a time to keep memory use down
            total_chunks = int(np.ceil(req_mem/avail_mem))
            id_per_chunk = int(np.ceil((self.maxid_written+1)/total_chunks))
        else:
            total_chunks = 1
            id_per_chunk = self.maxid_written+1

        for chunk in range(total_chunks):
            # Minimum and maximum id for this chunk
            id_range = [chunk*id_per_chunk,
                        np.min(((chunk+1)*id_per_chunk, self.maxid_written+1))]

            # Read chunk-sized data from NPY-files
            data = self.read_from_npy(global_file_list, len(self.time_written), var, id_range)
            if var == self.var_names[0]:
                self.open_netcdf_file((self.maxid_written+1, data.shape[1]))
            varout = 'z' if var == 'depth' else var
            getattr(self, varout)[id_range[0]:id_range[1], :] = data

    if len(self.var_names_once) > 0:
        for var in self.var_names_once:
            getattr(self, var)[:] = self.read_from_npy(global_file_list_once, 1, var, [0, self.maxid_written+1])

    self.close_netcdf_file()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions