Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Fix quadratic slowdown when locally shuffling tensor extension types #36102

Merged
merged 6 commits into from
Jun 9, 2023

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Jun 6, 2023

Why are these changes needed?

Benchmark script:

import time
import ray

ds = ray.data.range_tensor(50000, shape=(13000,))

start = time.time()
for batch in ds.iter_batches(batch_size=10, local_shuffle_buffer_size=1000):
    pass
print((ds.size_bytes() / 1e9) / (time.time() - start), "GB/s")

Closes #36099

Signed-off-by: Eric Liang <ekhliang@gmail.com>
@@ -34,7 +34,7 @@ def take_table(
if any(_is_column_extension_type(col) for col in table.columns):
new_cols = []
for col in table.columns:
if _is_column_extension_type(col):
if _is_column_extension_type(col) and col.num_chunks > 1:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This provides most of the speedup (0.03GBps -> 0.12GBps on the attached benchmark).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how general this optimization will be. What are the cases that num_chunks will be >1 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's actually always 1 since we only ever take from a fully concatenated table, which coalesces the chunks already (batcher.py:326):

            # Build the new shuffle buffer.
            self._shuffle_buffer = self._builder.build()
            if (
                isinstance(
                    BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
                )
                and self._shuffle_buffer.num_columns > 0
                and self._shuffle_buffer.column(0).num_chunks
                >= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS
            ):
                self._shuffle_buffer = transform_pyarrow.combine_chunks(
                    self._shuffle_buffer
                )

This is a bit brittle though, not sure we can avoid regressions without a microbenchmark.

another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?

I believe so. Do you think that approach would be preferable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still very confused about the logic of ShufflingBatcher.next_batch. Especially the following part.
Why put the shuffle_buffer back to the builder, while shuffle_buffer is built from the builder? This seems to copy data back and forth again and again. This could hurt perf badly when we have large shuffle_buffer_min_size and small batch_size.

if self._batch_head > 0:
    # Compact the materialized shuffle buffer.
    # TODO(Clark): If alternating between adding blocks and fetching
    # shuffled batches, this aggressive compaction could be inefficient.
    self._shuffle_buffer = BlockAccessor.for_block(
        self._shuffle_buffer
    ).take(self._shuffle_indices[self._batch_head :])
# Add the unyielded rows from the existing shuffle buffer.
self._builder.add_block(self._shuffle_buffer)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process in my head is very simple:

  1. add keeps adding data to the builder.
  2. when builder size reaches shuffle_buffer_min_size, it builds the shuffle buffer and shuffles the order (this will copy shuffle_buffer_min_size data).
  3. next_batch just takes a batch from the shuffle_buffer (this only copies batch_size data).

while the current logic seems too complex. there are a lot of copies (take, add_block, combine_chunks).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I agree. I'll try to rewrite this class to simplify

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up just removing shuffle_indices from the current implementation, keeping the compaction logic as we still need that for performance in any case. Switching from take() with shuffle indices to slice() with up-front random shuffle further improves performance from 0.2 GBps to 0.3 GBps.

Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl changed the title [WIP] Fix quadratic slowdown when locally shuffling tensor extension types [data] Fix quadratic slowdown when locally shuffling tensor extension types Jun 6, 2023
Copy link
Contributor Author

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ready to review. Is there also a place I can add this microbenchmark to be tracked?

@ericl ericl added tests-ok The tagger certifies test failures are unrelated and assumes personal liability. @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. and removed tests-ok The tagger certifies test failures are unrelated and assumes personal liability. labels Jun 6, 2023
ericl added 2 commits June 7, 2023 14:56
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jun 7, 2023
Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Jun 8, 2023
Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another issue I found was can_add. I guess it's supposed to implement back-pressure. But it's never used outside the Batcher class. When the compact ratio is large, the check in add will fail. I think we should just remove the check.

self._shuffle_buffer
).random_shuffle(self._shuffle_seed)
if self._shuffle_seed is not None:
self._shuffle_seed += 1
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if doesn't seem useful either. I guess self._builder.build() should already return a table with only one chunk? I tried adding a print in this if, it never printed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, it seems @jjyao added this to resolve some performance regressions. I'm guessing it happens in other scenarios?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found the original issue. I thought self._builder.build() would combine the chunks. but it's not true.

Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl removed the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Jun 8, 2023
@ericl ericl merged commit c7651c4 into ray-project:master Jun 9, 2023
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
… types (ray-project#36102)

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[data] local_shuffle_buffer_size causes quadratic slowdown when using Tensor extension types
3 participants