Skip to content

Zstd Codec on the GPU #2863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft

Conversation

akshaysubr
Copy link
Contributor

This PR adds a Zstd codec that runs on the GPU using the nvCOMP 4.2 python APIs.

TODO:

  • Make fully async
  • Performance benchmarking
  • CPU-GPU roundtrip testing
  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/user-guide/*.rst
  • Changes documented as a new file in changes/
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

@github-actions github-actions bot added the needs release notes Automatically applied to PRs which haven't added release notes label Feb 25, 2025
@dstansby
Copy link
Contributor

Thanks for opening this PR! At the moment we do not have any codecs implemented in the zarr-python package, but instead store them in numcodecs. So although it looks like there are some zarr-python specific changes that are needed to support the new GPU codec, the actual codec should be implemented in numcodecs, and then imported in zarr-python.

@akshaysubr
Copy link
Contributor Author

@dstansby My understanding was that numcodecs is the place that python bindings to native codec implementations live and that with v3, the Codec class itself lives in zarr-python. The GPU codecs and python bindings are implemented in nvCOMP and imported through the nvidia-nvcomp-cu12 python package so I'm not sure which part of this would need to go in numcodecs. What did you have in mind?

# Convert to nvcomp arrays
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs)

outputs = self._zstd_codec.decode(filtered_inputs) if len(filtered_inputs) > 0 else []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question related to #2904 (which is looking into memory usage). Would it be possible for nvcomp-python to provide an out argument to decode? If I'm reading the C++ docs correctly, that does seem to decompress into an output buffer. Eventually it would be nice to do that all the way into Zarr's out buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not currently supported, but I've seen similar requests from elsewhere. Hopefully someday.

# Convert to nvcomp arrays
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs)

outputs = self._zstd_codec.decode(filtered_inputs) if len(filtered_inputs) > 0 else []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not currently supported, but I've seen similar requests from elsewhere. Hopefully someday.

chunks_and_specs = list(chunks_and_specs)

# Convert to nvcomp arrays
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be reinterpreting some bytes / pointers, right? There's no possibility of doing any kind of (blocking) I/O? If so, then I'd recommend making this a regular sync method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment on L161 with the return await self._convert_from_nvcomp_arrays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in encode.

# Convert to nvcomp arrays
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs)

outputs = self._zstd_codec.decode(filtered_inputs) if len(filtered_inputs) > 0 else []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the line where we should be careful about what happens where.

Do you know whether nvcomp.Codec.decode is blocking, or does it just asynchronously schedule the decode on the GPU? If it's non-blocking, then for simplicity, I'd recommend using asyncio.to_thread around a (regular sync Python) function that schedules the decode and then uses the event.wait() to block to do the stream synchronization.

async def decode_wrapper(codec, filtered_inputs):
    result = codec.decode(filtered_inputs)
    # wait for the decode to complete. cupy.cuda.Event?
    return result

I have some longer-term thoughts around how zarr-python handles concurrency for different types of workloads, but for now I think it's probably best to follow the other codecs, which use to_thread.

Copy link
Contributor

@TomAugspurger TomAugspurger Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More specifically, I thinking something like this on the class:

    async def _synchronize_stream(self) -> None:
        # this is the blocking operation. Offload it to a worker thread to not block the main thread
        await asyncio.to_thread(self.stream.synchronize)

and then in decode:

        ...
        decoded = self._convert_from_nvcomp_arrays(outputs, chunks_and_specs)
        # Uphold zarr-python's guarantee that the decode is finished before returning
        await self._synchronize_stream()
        return decoded

That way, the main thread will schedule everything to happen on the GPU via self._zstd_codec.decode(), but the actual stream synchronization will happen on another thread, so that it doesn't block the event loop.

That requires putting a stream (and maybe Device?) object on the ZstdCodec class so that we can make sure we synchronize the right stream (the same one passed to nvcomp.Codec)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs release notes Automatically applied to PRs which haven't added release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants