Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initialize_zarr #8460

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft

Add initialize_zarr #8460

wants to merge 29 commits into from

Conversation

dcherian
Copy link
Contributor

@dcherian dcherian commented Nov 16, 2023

The intended pattern is:

    after_init = initialize_zarr(store, ds, region_dims=("x",))
    for i in range(ds.sizes["x"]):
        after_init.isel(x=[i]).to_zarr(store, region={"x": slice(i, i + 1)})

cc @slevang

@github-actions github-actions bot added topic-backends topic-zarr Related to zarr storage library io labels Nov 16, 2023
# Right now we do two writes for eager variables
template = zeros_like(ds)
template.to_zarr(store, mode=mode, **kwargs, compute=False)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☝🏾 This is important. What do we mean by initialization?

  1. All non-dask arrays will be written to disk. & Metadata for all dask arrays will be written to disk.
  2. Alternatively, it could be all metadata for all variables will be written to disk. Reading that initialized store will get you fill values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally thinking that it would be the former. It's at least important to write indexes, not just their metadata...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't write the dimension coordinates, the dataset may be unreadable by xarray. The fill values could decode to invalid results. This happens especially with time coordinates.

xarray/backends/zarr.py Outdated Show resolved Hide resolved
xarray/backends/zarr.py Outdated Show resolved Hide resolved
Copy link
Contributor

@slevang slevang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! full_like().to_zarr(compute=False) being so much faster feels like a hack but I'll take it.

I do think having a version of this built into Dataset.to_zarr() as a kwarg (metadata_only, template, etc) would be nice for the most concise user code. Going that route would potentially require resolving the issue of needing to drop non-region containing variables that you address here though, or at least more documentation.

for name, var in ds._variables.items()
if set(var.dims).issubset(dims_to_drop)
]
return ds.drop_vars(vars_to_drop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically happening already on write in any case of region="auto", see here. Interestingly, if you try to write to a region along a single dimension of a multidimensional array, it will fail if you skip this step with region={"x": "auto"} but succeed with region="auto", because the latter now expands region to {"x: slice(...), "y": slice(...)}, so it doesn't trip this error.

This gets back to the discussion of "should we actually be writing any indices in region mode". For the vast majority of use cases I think not, and we could simplify a lot of these logic branches by just dropping the indices early on. But because there is still the available option to use mode="a", region=..., doing so would break for anyone trying to simultaneously write a region and grow the coordinate. Which also gets to @rabernat's proposal of reworking these modes entirely...

)

# TODO: what should we do here.
# compute=False only skips dask variables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for this to fairly be called an "initialize" or "template" method, we should not be writing any data_vars, regardless of dask vs numpy, but we should write all the indices and metadata.

dcherian and others added 2 commits November 16, 2023 15:06
Co-authored-by: Maximilian Roos <5635139+max-sixty@users.noreply.github.com>
Co-authored-by: Maximilian Roos <5635139+max-sixty@users.noreply.github.com>
@dcherian
Copy link
Contributor Author

Going that route would potentially require resolving the issue of needing to drop non-region containing variables that you address here though, or at least more documentation.

Yes, i was trying to make an easy recommended pattern for region writes. To do that, it seemed like to_zarr wasn't the right place.

@jhamman
Copy link
Member

jhamman commented Nov 17, 2023

I just pushed a little experiment that could be a really nice path forward here: c0cf4ee.

Demo:

ds = xr.tutorial.open_dataset('rasm').chunk('5mb')

# baseline
mapper_a = fs.get_mapper("s3://foo/init-zarr-test-a")
%time xr.zeros_like(ds).to_zarr(mapper_a, mode='w', compute=False)
# CPU times: user 296 ms, sys: 7.16 ms, total: 303 ms
# Wall time: 2.75 s

# my most recent commit
mapper_b = fs.get_mapper("s3://foo/init-zarr-test-b")
%time store = init_zarr_v2(ds, store=mapper_b)
# CPU times: user 41.4 ms, sys: 8.19 ms, total: 49.6 ms
# Wall time: 144 ms

# it works!
a = xr.open_zarr(mapper_a)
b = xr.open_zarr(mapper_b)
xr.testing.assert_equal(a, b)

The key realization I made is that if we aren't going to write the arrays, we can just use the Zarr API to create the metadata objects and push them to store.

Some caveats about what I just pushed:

  • Missing a bunch of feature flags, tests, etc.
  • Currently makes an assumption that the target store is clean and ready to be initialized
  • May also be missing some specialized encoding around strings / object dtypes. This is all in the Zarr backend so could likely be abstracted away to share a common code path.

@max-sixty
Copy link
Collaborator

Nice!!


(One small difference between that and the PR — we possibly should write any var that doesn't have a region dim, not only the indexes)

* main: (58 commits)
  Adapt map_blocks to use new Coordinates API (pydata#8560)
  add xeofs to ecosystem.rst (pydata#8561)
  Offer a fixture for unifying DataArray & Dataset tests (pydata#8533)
  Generalize cumulative reduction (scan) to non-dask types (pydata#8019)
  Filter null values before plotting (pydata#8535)
  Update concat.py (pydata#8538)
  Add getitem to array protocol (pydata#8406)
  Added option to specify weights in xr.corr() and xr.cov() (pydata#8527)
  Filter out doctest warning (pydata#8539)
  Bump actions/setup-python from 4 to 5 (pydata#8540)
  Point users to where in their code they should make mods for Dataset.dims (pydata#8534)
  Add Cumulative aggregation (pydata#8512)
  dev whats-new
  Whats-new for 2023.12.0 (pydata#8532)
  explicitly skip using `__array_namespace__` for `numpy.ndarray` (pydata#8526)
  Add `eval` method to Dataset (pydata#7163)
  Deprecate ds.dims returning dict (pydata#8500)
  test and fix empty xindexes repr (pydata#8521)
  Remove PR labeler bot (pydata#8525)
  Hypothesis strategy for generating Variable objects (pydata#8404)
  ...
@dcherian dcherian marked this pull request as draft January 4, 2024 04:48
* upstream/main:
  Faster encoding functions. (pydata#8565)
  ENH: vendor SerializableLock from dask and use as default backend lock, adapt tests (pydata#8571)
  Silence a bunch of CachingFileManager warnings (pydata#8584)
  Bump actions/download-artifact from 3 to 4 (pydata#8556)
  Minimize duplication in `map_blocks` task graph (pydata#8412)
  [pre-commit.ci] pre-commit autoupdate (pydata#8578)
  ignore a `DeprecationWarning` emitted by `seaborn` (pydata#8576)
  Fix mypy type ignore (pydata#8564)
  Support for the new compression arguments. (pydata#7551)
  FIX: reverse index output of bottleneck move_argmax/move_argmin functions (pydata#8552)
@dcherian dcherian marked this pull request as ready for review January 4, 2024 22:39

This function initializes a Zarr store with all indexed coordinate variables, and
metadata for every variable in the dataset.
If ``region_dims`` is specified, it will also
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a necessary change, but one plausible way to frame this is

  • default region_dims=[...]
  • ...and then there's no need for the if — we default to writing the minimum and returning the maximum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting suggestion!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an edge case around scalars. We would write them if the user wants region writes, but not write them if they don't. So we need to distinguish between region_dims=(), region_dims=(...,) and region_dims=tuple(ds.dims). If those last two are identical then we can't handle the scalar case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK interesting point.

OOI, is this function helpful if we're not doing region writes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, on reflection — is there a case where we really don't want to write scalars? (genuine question, very possibly there is...)

metadata for every variable in the dataset.
If ``region_dims`` is specified, it will also
1. Write variables that don't have any of ``region_dims``, and
2. Return a dataset with the remaining variables, which contain one or more of ``region_dims``.
Copy link
Collaborator

@max-sixty max-sixty Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
2. Return a dataset with the remaining variables, which contain one or more of ``region_dims``.
2. Return a dataset with the remaining variables and indexes, which contain one or more of ``region_dims``.

...is that right?

Should we drop the non-region indexes?

My mental model of this (but it's from a few months ago):

  • For indexes that are part of the region, we want to keep them now that we have region="auto". When we use region="auto" in each process, it correctly won't rewrite those indexes
  • For other indexes, we want to drop them, because otherwise each process will incorrectly attempt to write them, and this can lead to conflicts

Copy link
Contributor Author

@dcherian dcherian Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we use region="auto" in each process, it correctly won't rewrite those indexes

Surprising to me, but you are correct! #8589

Currently we keep any indexes needed for any variable that has dimensions that overlap with region_dims. So for the test case, there is a variable with dims (x, y). If region_dims==("x",), then the index y is also returned. This works fine with region="auto" but requires adding "y": slice(None) when explicitly specifying region. I think this is OK, if you're specifying region, explicitly specify the whole thing as region="auto" does.

For other indexes, we want to drop them, because otherwise each process will incorrectly attempt to write them, and this can lead to conflicts

We do end up dropping those indexes. The test case has a variable with dimensions ("z",) but the regions only contain "x" and/or "y". The z index ends up getting dropped.

Copy link
Collaborator

@max-sixty max-sixty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big one!! I hadn't realized it was such a lift.

Thanks a lot.

I left a few comments

zarr.open_group(
store,
mode=mode,
storage_options=kwargs.get("storage_options", None),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obv don't change but FYI the None is default in python here

@slevang
Copy link
Contributor

slevang commented Jan 18, 2024

Sorry @dcherian I haven't had a chance to look at this in detail yet, but overall it seems really nice.

One random thought on the API: is there a way to configure options such that ds.to_zarr(..., compute=False) could just call this method, since it does effectively the same thing but much faster? This way users get an immediate performance improvement without having to change existing code, plus you can still use the standalone xr.initialize_zarr to get back a template dataset for region writes.

@dcherian dcherian marked this pull request as draft March 25, 2024 17:57
@petewa
Copy link

petewa commented Apr 2, 2024

Hi @dcherian, great work! This has solved one of the performance bottlenecks for us.

We are finding Xarray fairly slow in writing small Zarrs to S3 with many variables/DataArrays.

It seems slow in:

  • Creating the metadata (due to latency?), which your PR solves. I tested this.
  • Writing many chunks to S3 (due to latency?)
    • It's significantly faster to write to disk/memory then a parallel upload to s3.

Are there any plans to progress this PR. It would be good to have this merged.

Copy link
Contributor

@slevang slevang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this looks great, what else needs to be done?


all_variables = set(ds._variables)
# TODO: how do we work with the new index API?
index_vars = {dim for dim in ds.dims if dim in all_variables}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No expert, but maybe:
index_vars = set(ds.indexes.keys())

encode_coordinates=False,
)

to_drop = (eager_write_vars | chunked_vars_without_region) - index_vars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If #8904 went in we wouldn't need to drop index_vars here.

@rabernat
Copy link
Contributor

rabernat commented Apr 2, 2024

I share the perspective that it would be nicer to implement this within the existing to_zarr API rather than add a new top-level function to Xarray. @dcherian do you see a path to doing that?

@dcherian
Copy link
Contributor Author

dcherian commented Apr 2, 2024

do you see a path to doing that?

Yes i too agree now, but it'll take some work (=time). I'm thinking we add a initialize method to ZarrStore and use that in store (?).

@rabernat
Copy link
Contributor

rabernat commented Apr 2, 2024

I'm thinking we add a initialize method to ZarrStore and use that in store (?).

This makes sense to me. There's no reason for this not to be the default for all Zarr dataset creation. It will massively speed things up in most cases.

Thinking bigger, it should be possible to upstream some of this to Zarr itself. But that can come later.

@max-sixty
Copy link
Collaborator

To the extent that it's helpful and not annoying to add "this would be really useful": this would be really useful! It seems fairly close IIUC (though easy for me to say...)

One reflection from encouraging more Zarr usage with colleagues is that it is a) really spectacular for reading data, b) still much more complicated to write data for the average dev relative to something like parquet. I think this would close a lot of the gap...

@rabernat
Copy link
Contributor

I'm fine with getting this out into the wild, but I am hesitant to commit to it as permanent API. Could we say it is an experimental feature, don't depend on it in production, etc?

@max-sixty
Copy link
Collaborator

I'm fine with getting this out into the wild, but I am hesitant to commit to it as permanent API. Could we say it is an experimental feature, don't depend on it in production, etc?

Definitely +1 from me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
io topic-backends topic-zarr Related to zarr storage library
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add metadata_only param to .to_zarr?
6 participants