Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incrementally-populated Zarr Arrays #300

Open
DahnJ opened this issue Jun 28, 2024 · 10 comments
Open

Incrementally-populated Zarr Arrays #300

DahnJ opened this issue Jun 28, 2024 · 10 comments

Comments

@DahnJ
Copy link

DahnJ commented Jun 28, 2024

I believe there is a common usage pattern of Zarr that should be documented and more widely known – populating Zarr arrays incrementally and on-demand, chunk-by-chunk.

We have built internal tooling around this that resembles the chunk manifests (#154, #287, virtualizarr#chunk-manifests) and VirtualiZarr (zarr-developers/VirtualiZarr#33, virtualizarr#manifestarray).

This issue explains the usage pattern and opens the question of how our tooling could be open-sourced.

Incrementally populating Zarr arrays

A core Zarr usecase for us is to initialize an empty Zarr array with a pre-determined grid and then fill it with data as needed.

For a great overview of this usecase, see Earthmover's recent webinar and the relevant code repository. Note that in the webinar, the entire grid is populated. In our case, parts of the grid are populated on-demand.

Code example

This code defines an empty Zarr array given an example schema. For a fuller example, see create_dataset_schema in Earthmover's repository.

image

from odc.geo.geobox import GeoBox
from odc.geo.xr import xr_zeros
import pandas as pd

def define_schema(varname, geobox, time, band):
    schema = (
        # Chunks -1 to have a Dask-backed array
        # But without creating a massive computational graph
        xr_zeros(geobox, chunks=-1)
        .expand_dims(
            time=time,
            band=band,
        )
        .to_dataset(name=varname)
    )

    schema['band'] = schema['band'].astype(str)
    encoding = {varname: {"chunks": (1, 1, 256, 256)}}

    schema.to_zarr(
        'schema.zarr',
        encoding=encoding,
        compute=False,  # Do not initialize any data: no chunks written
        zarr_version=3,
        mode='w'
    )

geobox = GeoBox.from_bbox(
   (-2_000_000, -5_000_000,
   2_250_000, -1_000_000),
   "epsg:3577", resolution=1000)


define_schema(
    "data", 
    geobox, 
    time=pd.date_range("2020-01-01", periods=12, freq='MS'), 
    band=["B1", "B2", "B3"]
)

This pattern is powerful, as it enables us only to compute and ingest what's needed in a query-or-compute fashion. Without going too far from the scope of this issue, it enables us to cascade the computation/ingestion of data in a chunk-by-chunk manner illustrated in the following diagram.

image

Note: The details of this example are made up for the purposes of this issue

Chunk-level MetaArray

When working with sparsely populated arrays, the natural question arises: which chunks are populated?

To this end, we employ an object we call the chunk-level meta-array (MetaArray for short). This is a second array where each pixel corresponds to a chunk in the original array. The value of the pixel is

  • 1 if the corresponding chunk is populated
  • 0 if the corresponding chunk is empty

image

Note: Chunk initialization is not the only information one can keep in such MetaArray. For example, the MetaArray could keep track of min & max of each chunk, enabling predicate pushdown, or aggregate statistics for providing approximate answers on the chunk level.

Toy code example

Note: This is only a very rough sketch of the resulting object to illustrate the point.

import xarray as xr

def create_metaarray(schema):
    """Create meta-array from a schema.
    
    A very rough sketch. Not the actual implementation.
    """
    _, _, cy, cx = schema.data.encoding['chunks']

    # Create spatial pixels with coordinates equal to the centre of the chunk
    xx = schema['x'].values
    xdiff = xx[cx] - xx[0]
    x_meta = xx[::cx] + xdiff/2

    yy = schema['y'].values
    ydiff = yy[cy] - yy[0]
    y_meta = yy[::cy] + ydiff/2

    # Initialize an empty meta-array
    meta = xr.DataArray(
        data=0,
        dims=schema.dims,
        coords=dict(time=schema.time, band=schema.band, x=x_meta, y=y_meta),
    )

    meta.to_zarr('schema-meta.zarr', zarr_version=3, mode='w')


schema = xr.open_zarr('schema.zarr', zarr_version=3)
create_metaarray(schema)

# When writing to the Zarr array, also populate the metaarray.

Relationship to Chunk manifest & VirtualiZarr

Increasingly, the work in the Zarr world resembles the data structure we have built.

The similarities in these proposals and implementations make me think there is a path where the MetaArrays could become an open-source part of the Zarr ecosystem, potentially even enabling further use-cases such as chunk-level aggregate statistics outlined in ZEP 5.

The question is what the exact path forward would be.

Potential path forward

Here is my best guess at what the path forward could be.

There are essentially two layers to MetaArrays

  1. Information on the chunk level in the index space, stored on disk. This could be taken over by the chunk manifest.
  2. Convenient in-memory representation & an API to query this information in the label space. This would remain the domain of MetaArrays.

image

Note: Here I'm using the "index" and "label" space terminology from Xarray

1. Information on the chunk level

In the future, 1. could become part of Zarr. The chunk manifest would only need to represent the information on whether a chunk is initialized to contain the full information needed for construction of the MetaArray.

Currently, MetaArrays are stored as a separate Zarr array and updated using zarr. In the future

  • They could be read from whichever format chunk manifest is stored in (of which Zarr itself is one option)
  • Their updating & consistency would then be completely taken care of by the storage transformer for the chunk manifest

2. API to query this information

To be easily queryable, the chunk-level information must be exposed back in the coordinates of the original dataset and provide an API for its querying.

Xarray is our choice for this API. For example, asking the question "do we have data for this geometry" simply becomes a rioxarray clip operation and xarray reduction.

A similar conclusion was reached by VirtualiZarr, where the use of xarray e.g. exposes a familiar API for combining of multiple datasets.

MetaArrays could become another package building on top of Zarr, similar to VirtualiZarr. However, if the chunk manifest already contains information for the construction of MetaArrays and VirtualiZarr uses this information to construct Xarray(-like) datasets, a greater integration could be possible. For example in the form of common functionality to read the chunk manifest into xarray format.

Note: We have further APIs that utilize Pydantic and GeoPandas for further processing of chunk information. For simplicity, I am focusing only on the Xarray representation here

Questions

I would appreciate suggestions from the Zarr community on where to best focus the efforts.

Here is a set of concrete open questions and discussion points:

  • Does the chunk manifest storage transformer cover the data needed by MetaArrays? Could the chunk manifest also represent information about chunk initialization? Or should this information be another storage transformer that shares implementation with the chunk manifest?
  • What is the potential relationship to VirtualiZarr? Could they share functionality, or could a single package cover both uses? Perhaps there's a common functionality for going from the manifest into an xarray object?
  • Have I missed any potential difficulties here?
  • Is there a different path to the one that I outlined?

A secondary set of questions is around the usefulness of this – would the community find the MetaArrays useful? Be it for storing chunk initialization information, or aggregate statistics. Are people perhaps already using variants of this functionality? Creating this functionality was necessary for us – and thus likely for others, too!

Thank you for any suggestions and answers!

@TomNicholas
Copy link
Member

@DahnJ this is super interesting!

When working with sparsely populated arrays, the natural question arises: which chunks are populated?

I actually recently had a separate use case where I built a really big Zarr store chunk-by chunk using a big parallel dask operation, and at the end a couple of tasks had failed. The result had some un-initialised chunks, but finding out which ones are not initialized is trickier than it should be. It did not actually occur to me that this problem is closely related to the manifests I built in VirtualiZarr, but you're completely right that it is!

It also reminds me of this little tool that CarbonPlan made for quickly checking how much of a Zarr store had actually been initialized (@andersy005 wrote it I think). I was already going to suggest upstreaming that.

What is the potential relationship to VirtualiZarr? Could they share functionality, or could a single package cover both uses?

I would imagine a lot of this can be upstreamed from VirtualiZarr into zarr-python at least. Reminds me also of @d-v-b 's issue zarr-developers/VirtualiZarr#71, which also asks whether all Zarr arrays should have an intermediate chunk manifest layer.

In the meantime we could mess around with adding methods to VirtualiZarr's ManifestArray e.g. .chunks_initialized(self) -> np.ndarray, which would return your numpy array of 1s and 0s, with the same shape as the chunk grid.

FYI non-initialized chunks in VirtualiZarr are currently represented by the numpy array containing the chunk manifests' paths having entries of empty strings.

Perhaps there's a common functionality for going from the manifest into an xarray object?

Remember that you need more than just the Chunk Manifest itself to get to the xarray representation - you also need the array name, the dim names, and the dtype.

Finally, ZEP 5 (#205) plans to propose a way to store chunk-level statistics, one of the potential use-cases of MetaArrays.

@rabernat suggested something very similar on zarr-developers/VirtualiZarr#33 (comment)

@jbms
Copy link
Contributor

jbms commented Jun 28, 2024

I certainly agree that there are a lot of use cases for efficiently querying which chunks have data, and possibly storing additional per-chunk statistics. For arrays with a very large number of chunks, you might even want to do this hierarchically.

A related idea would be to have a way to store a binary mask indicating which individual elements have been written, in case writes can be unaligned to chunk boundaries, but in the common case where the mask value is same throughout an entire chunk, some way to efficiently represent that, similar to this MetaArray.

One issue that can come up with zarr arrays is the fact that there can be ambiguity between a chunk that was never written, and a chunk that was written but happens to be equal to the fill value at all positions. In some implementations, chunks that are equal to the fill value are not stored.

I'm not entirely clear how you are proposing to use the chunk manifest format. As currently proposed the chunk manifest would allow you to do list operations very cheaply, and therefore you could easily figure out which chunks are present, and modulo the issue I mentioned about chunks that are logically written but not stored because they are equal to the fill value, this would tell you which chunks have been written. The issue with the chunk manifest format is that, being a single json file either stored separately or embedded in the array metadata file, it is mostly a read-only format. If writes are supported at all, you are likely to run into contention if trying to concurrently write from multiple machines, unless you have some very specialized JSON storage system designed to support concurrent writes.

The chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.

@TomNicholas
Copy link
Member

I'm not entirely clear how you are proposing to use the chunk manifest format

it is mostly a read-only format

The chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.

My initial understanding of this issue (@DahnJ please correct me if I'm wrong), was that it's more concerned with giving users access to useful in-memory representations about the state of the chunk grid. It's less about actually changing the chunk manifest format on-disk. Perhaps it would have made more sense to raise it on the zarr-python repo instead?

But then this

Their updating & consistency would then be completely taken care of by the storage transformer for the chunk manifest

makes it sound much more like the whole arraylake database-of-chunks idea...

@DahnJ
Copy link
Author

DahnJ commented Jun 29, 2024

Thank you for the quick feedback @TomNicholas and @jbms. It's great to hear that this seems useful and that you've even already needed querying information about initialized chunks!

Chunk manifest

I think that the crux here is what the ambitions for the chunk manifest are.

  • Read only format: If it's a "mostly read-only format" that gets created with the Zarr array and does not support updates, then I agree that it's not a suitable solution for an incrementally populated dataset.
  • Storage transformer supporting updates: If it's implemented as a storage transformer that supports updating the chunk manifest upon writing into the Zarr array, then I think it's sufficient.
  • Concurrent writes: The question of write contention is an important one. For our purposes here, I would find it sufficient to allow no concurrent writes. Should the users need concurrent writes (we do), they can rely on solutions such as Arraylake's database-of-chunks (which we are users of). There's also a whole interesting discussion around alternative storage formats for the manifest – e.g. can we use a secondary Zarr array for this? That's along the lines of what we're currently doing. However, short of that, the no-concurrent-writes chunk manifest and MetaArrays still seem like a useful construct to pursue.

I would therefore say that as long as chunk manifest supports changes made to the zarr array, it's a good candidate to use for MetaArrays. In such a case, the issue becomes only about providing a useful in-memory representation, exactly as @TomNicholas notes.

Information necessary for MetaArrays

Remember that you need more than just the Chunk Manifest itself to get to the xarray representation - you also need the array name, the dim names, and the dtype.

Yes, you're right. For the MetaArrays, we also require the coordinate arrays. That way the user can query populated chunks in the label space – answering questions such as "do we have 2022 data for Ecuador".

chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.

Right, storing such information would be an extension of the manifest's planned capabilities. We can thus focus on the primary goal here – chunk initialization information, which the chunk manifest, as planned, does represent.

VirtualiZarr

Thanks for the additional details @TomNicholas. The fact that uninitialized chunks are already represented is great and hacking the chunks_initialized method thus sounds like a great way to continue exploring the potential open-source variant of the in-memory representation of our MetaArrays! I will try to get a bit more familiar with VirtualiZarr myself.

Other answers

binary mask indicating which individual elements have been written, in case writes can be unaligned to chunk boundaries

This is a good point. We restrict our writes to entire chunks precisely for that reason. That allows us to then only store information at the chunk level.

One issue that can come up with zarr arrays is the fact that there can be ambiguity between a chunk that was never written, and a chunk that was written but happens to be equal to the fill value at all positions. In some implementations, chunks that are equal to the fill value are not stored.

Thanks for noting that, the behaviour around write_empty_chunks is definitely something to be aware of and have clarity around when dealing with chunk initialization information.

@d-v-b
Copy link
Contributor

d-v-b commented Jun 29, 2024

This is a very cool idea and it does address one of the challenges people inevitably encounter when they start scaling zarr arrays. But I also agree with two points @jbms made:

The issue with the chunk manifest format is that, being a single json file either stored separately or embedded in the array metadata file, it is mostly a read-only format. If writes are supported at all, you are likely to run into contention if trying to concurrently write from multiple machines, unless you have some very specialized JSON storage system designed to support concurrent writes.

To elaborate, the basic premise of a conventionally chunked (i.e, unsharded) array is that concurrent chunk write operations are independent. A chunk manifest / index adds a synchronization step, which makes concurrent write operations dependent. This introduces new complexity (e.g., write contention) and new failure modes (e.g., writing a chunk succeeds but updating the manifest fails). New complexity is fine (and necessary in many cases) but it's important to map it out.

The chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.

I would rephrase this as "why not use a database / tabular format here?" Of course if we stick with vanilla zarr there are only two options, JSON or an array, and the array option looks more attractive than JSON. But if you want a rich, dynamic representation of a chunk mutation, then you probably want to store a record with multiple data types for each chunk (e.g, a creation time stamp, a modification timestamp, a size, some summary stats of the values in the chunk, etc), and in that case a Zarr array is going to be a very poor fit IMO. So I would be curious to see an analysis of how adding a non-zarr ingredient to the mix could solve this problem, weighed against the complexity that adds.

@d-v-b
Copy link
Contributor

d-v-b commented Jun 29, 2024

I will also throw out a random thought that occurred to me: zarr doesn't have a good way of structurally expressing a dependency relationship between arrays. But this could be solved if arrays could contain other arrays. If we support nested arrays, then it would be very natural to store a meta array as a sub-array of the array it describes. I'm not suggesting we do this, just pointing out that it solves a particular problem.

@DahnJ
Copy link
Author

DahnJ commented Jul 2, 2024

Thanks @dvb for providing helpful additional detail. You're right – this complexity absolutely needs to be mapped out. For the purposes of this issue, as long as the chunk manifest allows for updates, then I think it's a sufficient solution to power the MetaArrays (e.g. as a feature in VirtualiZarr).

I will wait for the community to converge on a concrete proposal for the chunk manifest – and potentially join in the conversation. Hopefully this issue serves as additional motivation for chunk manifests to go beyond just a read-only JSON file.

Chunk Manifest update

Discussing the details of the chunk manifest is not in the scope of this issue. However, to help the manifest discussion with the additional context from this issue, here's a summary of what's been said so far and some of my thoughts.

Failure mode: Inconsistent chunks & manifest

As noted, updating both chunks and the manifest brings in a potential failure where chunks are successfully written, but the manifest update fails. This could cause an inconsistent state where the manifest contains the wrong data (e.g. wrong chunk length).

Possible solution: One method would be to provide a layer of indirection and store chunks under a different key than just their indexes (e.g. use a hash value, or append a version number) and then those chunks are only ever truly "written" if they've been updated in the manifest. There might also be a feature to "vacuum" orphaned chunks during failed writes. This is quite related to ideas that have been explored previously:

Write contention

Multiple processes could attempt to write into the chunk manifest at once.

Possible solution: The simplest solution here is an exclusive lock. More optimistic concurrency control may be possible – a potential direction is to have the manifest be an append-only event log with an optional compaction step. I haven't thought this through properly though.

Data format requirements

Some of the larger datasets out there have 10s of millions of chunks (example). Our datasets frequently are on the scale of billions of chunks (global datasets with multiple bands and spanning a large time range). However, typically only a small fraction of those are populated, also in the order of 10s of millions.

The manifest must store at least three pieces of information. Due to regularities in the data and with the right encoding and compression, this might compress quite well – but it's still a significant amount of data.

The big advantage of the current storage of MetaArrays using zarr is that this naturally allows for fetching only those regions of the MetaArray one is interested in. This is a good way to keep the amount of data transferred to a minimum. It would be good, if not necessary, for manifests to provide similar subsetting feature.

The data format should also support transactional operations (e.g. updating the key for a chunk). Alternatively, an append-only log might be considered (similar to event sourcing).

There's also some potential in local caching or keeping a local synchronized version of the chunk manifest. Potentially related Pangeo discussion: Conflict-free Replicated Zarr

@DahnJ
Copy link
Author

DahnJ commented Jul 3, 2024

Special case of incremental arrays: Fetching from Zarr

I want to record a special case of populating arrays incrementally that is potentially relevant here. This was brought up by @dstansby in Zulip

In our general case, we're populating the arrays with data from non-Zarr sources, potentially produced through a complex processing pipeline. The MetaArrays themselves do not know how the data is populated, nor should they.

However, the user might need to simply fetch regions of existing zarr array incrementally. In that case, the MetaArrays could potentially provide a convenient API not only to discover unpopulated chunks, but to also aid in fetching them. This would provide functionality similar to what @dstansby described:

for chunk in chunks:
    if exists_on_hard_drive(chunk):
        pass
    download(chunk)

@jbms
Copy link
Contributor

jbms commented Jul 3, 2024

Perhaps you can clarify what you are trying to accomplish exactly, independent of any particular solution?

In #300 (comment) you mention lazily-writing an array as individual chunks are requested. This seems to be something that is easily supported by the existing zarr v2 and v3 formats without chunk manifests: you can simply query the underlying storage to determine if the key corresponding to the chunk is present. If it is, you can read it. Otherwise, you can compute and store it.

If you want to construct an in-memory "MetaArray" indicating which chunks are present, this can be done by performing a list operation on the underlying kvstore in order to obtain the list of all chunks. List operations are very fast when using a chunk manifest since no additional I/O is required after loading the manifest, but I expect in most cases other underlying kvstores, like the local filesystem or even S3, would still provide adequate performance, as long as you can construct the in-memory MetaArray once and then do a bunch of queries.

If you want to construct a MetaArray corresponding to just a subregion of the full array, that can also be done by performing one or more "range-restricted" or "prefix-restricted" list operations on the underlying kvstore. However, if the query does not happen to align well with the "chunk_key_encoding" of the array (and the current chunk key encoding options in zarr v3 are generally problematic for this type of use), it could require a large number of queries. If you want to make this work better, you could propose some improved chunk_key_encodings for zarr v3. Note that this type of querying is used in tensorstore for https://google.github.io/tensorstore/python/api/tensorstore.TensorStore.storage_statistics.html#

In #300 (comment) you mention wanting write support when using a chunk manifest, but it is not clear what form of writing you mean. Under the current chunk manifest proposal, zarr implementations would treat the manifest as read only, because the chunk manifest does not indicate where new keys are written. For existing keys, in principle you could attempt to overwrite the same location in place, but that only works if the size of the chunk stays the same (effectively meaning no compression), but that is not necessarily the actually desired behavior. You could also have some form of "writing by reference", where you "write" to the chunk manifest by linking in additional keys referring to existing data elsewhere. But that might more naturally be handled entirely separate from the zarr implementation, since it would require specialized APIs.

It isn't really clear to me what benefits you hope to gain from using a chunk manifest.

@DahnJ
Copy link
Author

DahnJ commented Jul 3, 2024

Thanks @jbms, these are great questions and points. This is precisely why I wrote the issue in the first place 🙂

It isn't really clear to me what benefits you hope to gain from using a chunk manifest.

The primary motivation has been that this could entirely remove the need for code that updates the MetaArrays. Since the manifest and the MetaArrays store similar information, it would remove the need to keep two parallel implementations.

That's of course only true if the manifest support updates. If it does not, it cannot be used to power MetaArrays.

You could also have some form of "writing by reference", where you "write" to the chunk manifest by linking in additional keys referring to existing data elsewhere

I agree, this could be a solution to a lot of the concistency/concurrency issues mentioned. I elaborated on this a bit in #300 (comment). This is again going in the direction of #154.

Constructing MetaArrays from object store list operations

You're asking a great question: why even store the chunk initialization information, when it's right there in the object store in the form of stored chunk objects?

This was actually our original method. We would initialize the xr.Dataset MetaArray on-the-fly from the output of list_prefix. However, at some point, with a growing number of populated chunks, the list_prefix method in Zarr became prohibitively slow. We saw two options at that point

  1. Allow for range-restricted list operations and keep initializing MetaArrays on-the-fly.
  2. Start storing the MetaArrays. This requires updating the data structure, but provides fast queries – querying millions of chunks requires only fetching millions of compressed bool values.

We've explored these options for our storage (which is an on-prem object store, indexed by Arraylake) and, at that time, chose option 2. as the scalable solution.

However, this choice was in the context of that particular storage and may not apply in general. This issue is about opening up the functionality to the wider Zarr community and thus it's worth exploring 1. again.

I think an important difficulty with listing the object is that you can only list by prefix, not match a generic pattern. This means that one cannot easily slice in each dimension - if I want to slice in the first dimension only, e.g. any chunks with indexes of the form (x, 2, 2), I have to list all chunks in the other dimensions as well.

It would be very interesting to see what the performance is in listing in the order of millions of chunks in e.g. S3 and, if necessary, whether an improved chunk_key_encodings could provide satisfactory performance.

It's also worth considering how well this works with sharding, virtual zarr and other potential (planned) features.


In #300 (comment) you mention lazily-writing an array as individual chunks are requested. This seems to be something that is easily supported by the existing zarr v2 and v3 formats without chunk manifests

If this is well-supported by Zarr already, then please ignore that comment. I only recorded it as a special case of incremental arrays that I saw in the Zulip discussion. It's not relevant to our use cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants