-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Fix quadratic slowdown when locally shuffling tensor extension types #36102
Conversation
@@ -34,7 +34,7 @@ def take_table( | |||
if any(_is_column_extension_type(col) for col in table.columns): | |||
new_cols = [] | |||
for col in table.columns: | |||
if _is_column_extension_type(col): | |||
if _is_column_extension_type(col) and col.num_chunks > 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This provides most of the speedup (0.03GBps -> 0.12GBps on the attached benchmark).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering how general this optimization will be. What are the cases that num_chunks will be >1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices
may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's actually always 1 since we only ever take from a fully concatenated table, which coalesces the chunks already (batcher.py:326):
# Build the new shuffle buffer.
self._shuffle_buffer = self._builder.build()
if (
isinstance(
BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
)
and self._shuffle_buffer.num_columns > 0
and self._shuffle_buffer.column(0).num_chunks
>= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS
):
self._shuffle_buffer = transform_pyarrow.combine_chunks(
self._shuffle_buffer
)
This is a bit brittle though, not sure we can avoid regressions without a microbenchmark.
another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?
I believe so. Do you think that approach would be preferable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still very confused about the logic of ShufflingBatcher.next_batch
. Especially the following part.
Why put the shuffle_buffer back to the builder, while shuffle_buffer is built from the builder? This seems to copy data back and forth again and again. This could hurt perf badly when we have large shuffle_buffer_min_size
and small batch_size
.
if self._batch_head > 0:
# Compact the materialized shuffle buffer.
# TODO(Clark): If alternating between adding blocks and fetching
# shuffled batches, this aggressive compaction could be inefficient.
self._shuffle_buffer = BlockAccessor.for_block(
self._shuffle_buffer
).take(self._shuffle_indices[self._batch_head :])
# Add the unyielded rows from the existing shuffle buffer.
self._builder.add_block(self._shuffle_buffer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The process in my head is very simple:
add
keeps adding data to the builder.- when builder size reaches
shuffle_buffer_min_size
, it builds the shuffle buffer and shuffles the order (this will copy shuffle_buffer_min_size data). next_batch
just takes a batch from the shuffle_buffer (this only copies batch_size data).
while the current logic seems too complex. there are a lot of copies (take, add_block, combine_chunks).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think I agree. I'll try to rewrite this class to simplify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up just removing shuffle_indices
from the current implementation, keeping the compaction logic as we still need that for performance in any case. Switching from take() with shuffle indices to slice() with up-front random shuffle further improves performance from 0.2 GBps
to 0.3 GBps
.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ready to review. Is there also a place I can add this microbenchmark to be tracked?
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another issue I found was can_add
. I guess it's supposed to implement back-pressure. But it's never used outside the Batcher class. When the compact ratio is large, the check in add
will fail. I think we should just remove the check.
self._shuffle_buffer | ||
).random_shuffle(self._shuffle_seed) | ||
if self._shuffle_seed is not None: | ||
self._shuffle_seed += 1 | ||
if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if doesn't seem useful either. I guess self._builder.build()
should already return a table with only one chunk? I tried adding a print in this if, it never printed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, it seems @jjyao added this to resolve some performance regressions. I'm guessing it happens in other scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
found the original issue. I thought self._builder.build()
would combine the chunks. but it's not true.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
… types (ray-project#36102) Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Why are these changes needed?
Benchmark script:
Closes #36099