-
-
Notifications
You must be signed in to change notification settings - Fork 331
Zstd Codec on the GPU #2863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Zstd Codec on the GPU #2863
Conversation
d6608e7
to
d548adc
Compare
Thanks for opening this PR! At the moment we do not have any codecs implemented in the |
@dstansby My understanding was that |
# Convert to nvcomp arrays | ||
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs) | ||
|
||
outputs = self._zstd_codec.decode(filtered_inputs) if len(filtered_inputs) > 0 else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question related to #2904 (which is looking into memory usage). Would it be possible for nvcomp-python to provide an out
argument to decode
? If I'm reading the C++ docs correctly, that does seem to decompress into an output buffer. Eventually it would be nice to do that all the way into Zarr's out
buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not currently supported, but I've seen similar requests from elsewhere. Hopefully someday.
# Convert to nvcomp arrays | ||
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs) | ||
|
||
outputs = self._zstd_codec.decode(filtered_inputs) if len(filtered_inputs) > 0 else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not currently supported, but I've seen similar requests from elsewhere. Hopefully someday.
chunks_and_specs = list(chunks_and_specs) | ||
|
||
# Convert to nvcomp arrays | ||
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just be reinterpreting some bytes / pointers, right? There's no possibility of doing any kind of (blocking) I/O? If so, then I'd recommend making this a regular sync
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment on L161 with the return await self._convert_from_nvcomp_arrays
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also in encode
.
# Convert to nvcomp arrays | ||
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs) | ||
|
||
outputs = self._zstd_codec.decode(filtered_inputs) if len(filtered_inputs) > 0 else [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the line where we should be careful about what happens where.
Do you know whether nvcomp.Codec.decode
is blocking, or does it just asynchronously schedule the decode on the GPU? If it's non-blocking, then for simplicity, I'd recommend using asyncio.to_thread
around a (regular sync Python) function that schedules the decode and then uses the event.wait()
to block to do the stream synchronization.
async def decode_wrapper(codec, filtered_inputs):
result = codec.decode(filtered_inputs)
# wait for the decode to complete. cupy.cuda.Event?
return result
I have some longer-term thoughts around how zarr-python handles concurrency for different types of workloads, but for now I think it's probably best to follow the other codecs, which use to_thread
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More specifically, I thinking something like this on the class:
async def _synchronize_stream(self) -> None:
# this is the blocking operation. Offload it to a worker thread to not block the main thread
await asyncio.to_thread(self.stream.synchronize)
and then in decode:
...
decoded = self._convert_from_nvcomp_arrays(outputs, chunks_and_specs)
# Uphold zarr-python's guarantee that the decode is finished before returning
await self._synchronize_stream()
return decoded
That way, the main thread will schedule everything to happen on the GPU via self._zstd_codec.decode()
, but the actual stream synchronization will happen on another thread, so that it doesn't block the event loop.
That requires putting a stream
(and maybe Device
?) object on the ZstdCodec
class so that we can make sure we synchronize the right stream (the same one passed to nvcomp.Codec
)
This PR adds a Zstd codec that runs on the GPU using the nvCOMP 4.2 python APIs.
TODO:
docs/user-guide/*.rst
changes/