Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 114 additions & 37 deletions mpas_analysis/ocean/conservation.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def run_task(self):
for plot_type in self.plotTypes:
for varname in self.variableList[plot_type]:
all_plots_variable_list.append(varname)
self._compute_time_series_with_ncrcat(all_plots_variable_list)
self._compute_time_series_with_xarray(all_plots_variable_list)
for plot_type in self.plotTypes:
self._make_plot(plot_type)

Expand Down Expand Up @@ -604,28 +604,27 @@ def _get_variable(self, ds, varname, mks=False):

return variable

def _compute_time_series_with_ncrcat(self, variable_list):

def _check_output_safe_to_append(self, variable_list):
"""
Uses ncrcat to extact time series from conservationCheckOutput files
Check if the output file is safe to append to by verifying the presence
of necessary variables and determining if new input files are needed.

Raises
------
OSError
If ``ncrcat`` is not in the system path.
Parameters
----------
variable_list : list of str
List of variables to include in the time series.

Returns
-------
append : bool
True if the output file can be safely appended to, False otherwise.
inputFiles : list of str
Updated list of input files to process.
"""

if shutil.which('ncrcat') is None:
raise OSError('ncrcat not found. Make sure the latest nco '
'package is installed: \n'
'conda install nco\n'
'Note: this presumes use of the conda-forge '
'channel.')

inputFiles = self.inputFiles
append = False
inputFiles = self.inputFiles

if os.path.exists(self.outputFile):
# make sure all the necessary variables are also present
with xr.open_dataset(self.outputFile) as ds:
if ds.sizes['Time'] == 0:
updateSubset = False
Expand All @@ -637,11 +636,7 @@ def _compute_time_series_with_ncrcat(self, variable_list):
break

if updateSubset:
# add only input files with times that aren't already in
# the output file

append = True

fileNames = sorted(self.inputFiles)
inYears, inMonths = get_files_year_month(
fileNames, self.historyStreams,
Expand All @@ -652,33 +647,51 @@ def _compute_time_series_with_ncrcat(self, variable_list):
totalMonths = 12 * inYears + inMonths

dates = decode_strings(ds.xtime)

lastDate = dates[-1]

lastYear = int(lastDate[0:4])
lastMonth = int(lastDate[5:7])
lastTotalMonths = 12 * lastYear + lastMonth

inputFiles = []
for index, inputFile in enumerate(fileNames):
if totalMonths[index] > lastTotalMonths:
inputFiles.append(inputFile)
inputFiles = [
inputFile for index, inputFile in enumerate(fileNames)
if totalMonths[index] > lastTotalMonths
]

if len(inputFiles) == 0:
# nothing to do
return
return append, inputFiles
else:
# there is an output file but it has the wrong variables
# so we need ot delete it.
self.logger.warning('Warning: deleting file {self.outputFile}'
' because it is empty or some variables'
' were missing')
self.logger.warning(
f'Warning: deleting file {self.outputFile} because it '
'is empty or some variables were missing')
os.remove(self.outputFile)

variableList = variable_list + ['xtime']
return append, inputFiles

def _compute_time_series_with_ncrcat(self, variable_list):
"""
Uses ncrcat to extract time series from conservationCheckOutput files.

Raises
------
OSError
If ``ncrcat`` is not in the system path.
"""
if shutil.which('ncrcat') is None:
raise OSError('ncrcat not found. Make sure the latest nco '
'package is installed: \n'
'conda install nco\n'
'Note: this presumes use of the conda-forge '
'channel.')

variable_list = variable_list + ['xtime']
append, inputFiles = self._check_output_safe_to_append(variable_list)

if len(inputFiles) == 0:
# nothing to do
return

args = ['ncrcat', '-4', '--no_tmp_fl',
'-v', ','.join(variableList)]
'-v', ','.join(variable_list)]

if append:
args.append('--record_append')
Expand Down Expand Up @@ -710,3 +723,67 @@ def _compute_time_series_with_ncrcat(self, variable_list):
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
' '.join(args))

def _compute_time_series_with_xarray(self, variable_list):
"""
Uses xarray to extract time series from conservationCheckOutput files,
handling redundant `xtime` entries and sorting by `xtime`.

Parameters
----------
variable_list : list of str
List of variables to include in the time series.
"""
# Authors
# -------
# Xylar Asay-Davis

inputFiles = self.inputFiles
variable_list = variable_list + ['xtime']

append, inputFiles = self._check_output_safe_to_append(variable_list)

# Open all input files as a single dataset
self.logger.info(
f'Opening input files with xarray: {inputFiles[0]} ... '
f'{inputFiles[-1]}')
ds = xr.open_mfdataset(
inputFiles,
combine='nested',
concat_dim='Time',
data_vars='minimal',
coords='minimal',
compat='override'
)

# Select only the requested variables
ds = ds[variable_list]

# Handle redundant `xtime` entries by keeping the last occurrence
self.logger.info('Removing redundant xtime entries...')
_, unique_indices = np.unique(ds['xtime'].values, return_index=True)
unique_indices = sorted(unique_indices) # Ensure ascending order
ds = ds.isel(Time=unique_indices)

if append:
# Load the existing dataset and combine it with the new dataset
self.logger.info(
f'Appending to existing dataset in {self.outputFile}...')
with xr.open_dataset(self.outputFile) as existing_ds:
ds = xr.concat([existing_ds, ds], dim='Time')
# Remove redundant `xtime` entries again after concatenation
_, unique_indices = np.unique(
ds['xtime'].values, return_index=True)
unique_indices = sorted(unique_indices)
ds = ds.isel(Time=unique_indices)

# Sort by `xtime` to ensure the time series is in ascending order
self.logger.info('Sorting by xtime...')
ds = ds.sortby('xtime')

# Save the resulting dataset to the output file
self.logger.info(
f'Saving concatenated dataset to {self.outputFile}...')
ds.to_netcdf(self.outputFile, format='NETCDF4')

self.logger.info('Time series successfully created with xarray.')
Loading