From da20985506a351ff6b13201f261bde5df12e99d3 Mon Sep 17 00:00:00 2001 From: RussTreadon-NOAA Date: Fri, 28 Apr 2023 00:39:32 +0000 Subject: [PATCH] add get_bkg_ens_dict to analysis.py, remove get_bkg_dict from atmens_analysis.py (#1518) --- ush/python/pygfs/task/analysis.py | 64 ++++++++++++++++++++++++ ush/python/pygfs/task/atmens_analysis.py | 63 +---------------------- 2 files changed, 65 insertions(+), 62 deletions(-) diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py index 7c24c9cbdb..ab228aebdf 100644 --- a/ush/python/pygfs/task/analysis.py +++ b/ush/python/pygfs/task/analysis.py @@ -10,6 +10,7 @@ from pygw.template import Template, TemplateConstants from pygw.logger import logit from pygw.task import Task +from pygw.timetools import to_fv3time, to_YMD logger = getLogger(__name__.split('.')[-1]) @@ -199,3 +200,66 @@ def link_jediexe(self: Task) -> None: os.symlink(exe_src, exe_dest) return + + @logit(logger) + def get_bkg_ens_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: + """Compile a dictionary of model background files to copy + + This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer) + that are needed for global atmens DA and returns said dictionary for use by the FileHandler class. + + Parameters + ---------- + task_config: Dict + a dictionary containing all of the configuration needed for the task + + Returns + ---------- + bkg_dict: Dict + a dictionary containing the list of model background files to copy for FileHandler + """ + # NOTE for now this is FV3 restart files and just assumed to be fh006 + # loop over ensemble members + rstlist = [] + dirlist = [] + bkglist = [] + for imem in range(1, self.task_config.NMEM_ENKF + 1): + memchar = f"mem{imem:03d}" + + # accumulate directory list for member restart files + dirlist.append(os.path.join(self.task_config.DATA, 'bkg', memchar)) + + # get FV3 restart files, this will be a lot simpler when using history files + template = self.task_config.COM_ATMOS_RESTART_TMPL + tmpl_dict = { + 'ROTDIR': self.task_config.ROTDIR, + 'RUN': self.runtime_config.RUN, + 'YMD': to_YMD(self.task_config.previous_cycle), + 'HH': self.task_config.previous_cycle.strftime('%H') + } + + # get FV3 restart files, this will be a lot simpler when using history files + tmpl_dict['MEMDIR'] = memchar + rst_dir = Template.substitute_structure(template, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_dict.get) + rstlist.append(rst_dir) + + run_dir = os.path.join(self.task_config.DATA, 'bkg', memchar) + + # atmens DA needs coupler + basename = f'{to_fv3time(self.task_config.current_cycle)}.coupler.res' + bkglist.append([os.path.join(rst_dir, basename), os.path.join(self.task_config.DATA, 'bkg', memchar, basename)]) + + # atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data + for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']: + template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' + for itile in range(1, self.task_config.ntiles + 1): + basename = template.format(tilenum=itile) + bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) + + bkg_dict = { + 'mkdir': rstlist, + 'mkdir': dirlist, + 'copy': bkglist, + } + + return bkg_dict diff --git a/ush/python/pygfs/task/atmens_analysis.py b/ush/python/pygfs/task/atmens_analysis.py index 4e2e2ea436..3cbeb7c916 100644 --- a/ush/python/pygfs/task/atmens_analysis.py +++ b/ush/python/pygfs/task/atmens_analysis.py @@ -91,7 +91,7 @@ def initialize(self: Analysis) -> None: FileHandler(jedi_fix_list).sync() # stage backgrounds - FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync() + FileHandler(self.get_bkg_ens_dict(AttrDict(self.task_config))).sync() # generate ensemble da YAML file logger.debug(f"Generate ensemble da YAML file: {self.task_config.fv3jedi_yaml}") @@ -206,67 +206,6 @@ def finalize(self: Analysis) -> None: def clean(self): super().clean() - @logit(logger) - def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method constructs a dictionary of FV3 restart files (coupler, core, tracer) - that are needed for global atmens DA and returns said dictionary for use by the FileHandler class. - - Parameters - ---------- - task_config: Dict - a dictionary containing all of the configuration needed for the task - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - # NOTE for now this is FV3 restart files and just assumed to be fh006 - # loop over ensemble members - dirlist = [] - bkglist = [] - for imem in range(1, task_config.NMEM_ENKF + 1): - memchar = f"mem{imem:03d}" - - # accumulate directory list for member restart files - dirlist.append(os.path.join(task_config.DATA, 'bkg', memchar)) - - # get FV3 restart files, this will be a lot simpler when using history files - template = task_config.COM_ATMOS_RESTART_TMPL - tmpl_dict = { - 'ROTDIR': task_config.ROTDIR, - 'RUN': self.runtime_config.RUN, - 'YMD': to_YMD(task_config.previous_cycle), - 'HH': task_config.previous_cycle.strftime('%H') - } - - # get FV3 restart files, this will be a lot simpler when using history files - tmpl_dict['MEMDIR'] = memchar - rst_dir = Template.substitute_structure(template, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_dict.get) - os.makedirs(rst_dir, mode=0o775, exist_ok=True) - - run_dir = os.path.join(task_config.DATA, 'bkg', memchar) - - # atmens DA needs coupler - basename = f'{to_fv3time(task_config.current_cycle)}.coupler.res' - bkglist.append([os.path.join(rst_dir, basename), os.path.join(task_config.DATA, 'bkg', memchar, basename)]) - - # atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data - for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']: - template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' - for itile in range(1, task_config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - bkg_dict = { - 'mkdir': dirlist, - 'copy': bkglist, - } - - return bkg_dict - @logit(logger) def jedi2fv3inc(self: Analysis) -> None: """Generate UFS model readable analysis increment