Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WorkflowTemplate run-fv3gfs for segmented long runs #576

Merged
merged 65 commits into from
Aug 26, 2020

Conversation

oliverwm1
Copy link
Contributor

@oliverwm1 oliverwm1 commented Aug 19, 2020

Long fv3gfs runs generate large netCDFs that are difficult to post-process. Also runs cannot be restarted partway through if they fail. This PR introduces an argo WorkflowTemplate run-fv3gfs that can do segmented fv3gfs runs along with automated post-processing and appending of zarr outputs.

Limitations of current implementation:

  • time chunk size must evenly divide run length if doing more than one segment
  • Entrypoint argo workflow must define workdir volume and restart-data volume
  • The fv3config object cannot be changed by the user between segments (e.g. for the nudge-to-obs case, this means all analysis files for the entire simulation will need to be specified as assets for each segment. If these files are on a mounted volume, could point directly to them instead of downloading from GCS)

Added public API:

  • run-fv3gfs argo WorkflowTemplate
  • append_run.py script in post_process_run image

Significant internal changes:

  • post_process.py can now output to a local path
  • nudging WorkflowTemplate now uses run-fv3gfs. This has the added bonus that nudging run outputs will now be rechunked.
  • some small changes to e2e test to accommodate above change.

Requirement changes:

  • fsspec, gcsfs and pip are now installed in the post_process_run image

  • Tests added

Resolves [VCMML-446]

TODO:

  • Change append_run implementation so input is not modified (do work in tempdir)
  • Clean workdir as segmented run proceeds, so artifacts from previous segments don't get copied into later segments
  • Think about how this will work for nudge-to-obs (config object needs to change from segment to segment to get correct files, or prepare config script needs to make sure the analysis files for all segments are specified in the first config)
  • Handle time encoding for variables
  • Make nudging workflow use this, so that it gets exercised by e2e test
  • Ensure later segments don't run if a segment fails (probably need to switch to using recursion to handle this, see argo#3664)
  • Add docs
  • Figure out how to pass other mounted volumes to this (e.g. restarts or analysis files for nudging runs)
  • (optional) Move artifacts into more useful bins (restarts, config, logging?)

@@ -34,7 +34,7 @@ def get_chunks(user_chunks: ChunkSpec) -> ChunkSpec:


def upload_dir(d, dest):
subprocess.check_call(["gsutil", "-m", "rsync", "-r", d, dest])
subprocess.check_call(["gsutil", "-m", "rsync", "-r", "-e", d, dest])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't follow symlinks.

Copy link
Member

@spencerkclark spencerkclark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very clean and the time logic all looks great. I mainly just had a few questions and renaming suggestions.

workflows/argo/README.md Outdated Show resolved Hide resolved
workflows/argo/README.md Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/argo/run-fv3gfs.yaml Outdated Show resolved Hide resolved
workflows/argo/run-fv3gfs.yaml Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/post_process_run/test_append_run.py Outdated Show resolved Hide resolved
Copy link
Contributor

@nbren12 nbren12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This is a pretty sophisticated cloud workflow, and a good demonstration that we are on the bleeding edge of running models in the cloud. It's a big feature so I have a few requested changes.

The argo mostly looks good to me, but could benefit from clearer naming and documentation IMO. I made some suggestions below.

Likewise, the appending implementation looks good to me overall, but I am concerned that the test are too tightly scoped to the implementations details. Conversely, there isn't good coverage of some of the code in append_run. I think part of this issue is that the shift_store and set_time_units_like interface are pretty low-level. I think an append_zarr(dest_path, src_path) would be more convenient and testable.

volumes:
- name: restart-data
persistentVolumeClaim:
claimName: '{{workflow.parameters.restart-pvc}}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this option? We had it so that we could change the name of the restart-pvc. I guess this hasn't changed yet, but it could.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the e2e workflow needs this to be parametrized. Agreed it should be for the nudging workflow, I'll change it there.

workflows/argo/README.md Outdated Show resolved Hide resolved
workflows/argo/README.md Outdated Show resolved Hide resolved
workflows/argo/README.md Outdated Show resolved Hide resolved
workflows/argo/nudging/nudging.yaml Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved
workflows/post_process_run/append_run.py Outdated Show resolved Hide resolved


@pytest.mark.parametrize("with_coords", [True, False])
def test_appending_shifted_zarr_gives_expected_ds(tmpdir, with_coords):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this test seems to test the integration of shift_store, _copy_tree, and some other lower-level interfaces. I suppose the append_run function is doing something similar, but that implementation could change, so it's not clear if this test is actually testing something relevant.

If this combination of units is important, then can you create a single public API, test it, and then use it inside of append_run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the reason for the complexity of this test (and lack of a single public API for this functionality) is that I wanted to have only one upload_dir call in append_run, instead of uploading each zarr (and the other items) in serial. As well, I was trying to avoid interaction with GCS in the test.

I guess if there's enough files in each zarr, then doing the uploads in serial won't be much of a penalty in terms of taking advantage of multi-threaded upload. Will need to make some changes to ensure all non-zarr items are uploaded with one gsutil call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is now a public append_zarr_along_time function that is directly tested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It does seem like shift_store is a good entrypoint for the tests, since the upload_dir requires remote interaction.

Although you could mock upload_dir to work with local files (e.g. use copy_tree instead of upload_dir). That would be a quick route to 100% coverage, and may be a useful feature besides for post-processing on HPC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, gsutil can copy from local to local, so we can use it in tests without remote interaction!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although maybe there is still remote interaction for auth? not sure, but either way, it works fine in the test I wrote.

@oliverwm1
Copy link
Contributor Author

This is ready for re-review. The most substantive change I made was introducing a new append_zarr_along_time function. This will be useful if we want use this Zarr-appending capability in contexts outside of run-fv3gfs. (Although probably should add some more safety checks if/when we use it elsewhere)

Copy link
Member

@spencerkclark spencerkclark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @oliverwm1, this looks good to me now!

Comment on lines 62 to 74
|Parameter| Description|
|-------- |-------------|
| fv3config | String representation of an fv3config object |
| runfile | String representation of an fv3gfs runfile |
| output-url | GCS url for outputs |
| fv3gfs-image | Docker image used to run model. Must have fv3gfs-wrapper and fv3config installed. |
| post-process-image | Docker image used to post-process and upload outputs |
| chunks | (optional) String describing desired chunking of diagnostics |
| cpu | (optional) Requested cpu for run-model step |
| memory | (optional) Requested memory for run-model step |
| segment-count | (optional) Number of segments to run |
| working-volume-name | (optional) Name of volume for temporary work. Volume claim must be made prior to start of run-fv3gfs workflow. |
| external-volume-name | (optional) Name of volume with external data required for model run. E.g. for restart data in a nudged run. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could pass this through a Markdown table formatter so it looks nice in plain text too (e.g. here, or here).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, thanks!

Copy link
Contributor

@nbren12 nbren12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes. I like the new public API, and it's tests are good enough to cover the cases of _shift_array so at least we can feel free to delete that test if it becomes obsolete. TBH I still do think we can delete it now despite it's initial usefulness for development, but that's up to you.

The docs looks great.

I have a couple suggestions below about names. The only "required change" is to make the dimension name an argument of "append_zarr_along_time".

source_store = zarr.open(source_path, mode="r+")
target_store = zarr.open_consolidated(fsspec.get_mapper(target_path))
_set_time_units_like(source_store, target_store)
_shift_store(source_store, "time", _get_dim_size(target_store, "time"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize it does have to be a time-like coordinate , but can you make the dimension name an argument in case it is named something other than "time". This would also allow you to remove the hard-coded name "time" in the tests.

destination_file = os.path.join(destination, file_)
logger.info(f"Appending {rundir_file} to {destination_file}")
append_zarr_along_time(rundir_file, destination_file, fs)
shutil.rmtree(rundir_file) # remove local copy so not uploaded twice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first glance, this line looks very dangerous, but I realize now that it is happening on a temporary copy. Can you rename rundir to tmp_rundir (or something) to clarify that this is clean-up of temporary files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

| fv3gfs-image | Docker image used to run model. Must have fv3gfs-wrapper and fv3config installed. |
| post-process-image | Docker image used to post-process and upload outputs |
| chunks | (optional) String describing desired chunking of diagnostics |
| cpu | (optional) Requested cpu for run-model step |
Copy link
Contributor

@nbren12 nbren12 Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the optional values, it would be nice to know the default, but I admit these could quickly become out of date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I went back and forth on this. We need an argo parser to auto-gen the docs!

@oliverwm1 oliverwm1 merged commit 3938bc9 into master Aug 26, 2020
@oliverwm1 oliverwm1 deleted the feature/segmented-fv3gfs-runs branch August 26, 2020 19:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants