[data] Fix quadratic slowdown when locally shuffling tensor extension types#36102
Conversation
| new_cols = [] | ||
| for col in table.columns: | ||
| if _is_column_extension_type(col): | ||
| if _is_column_extension_type(col) and col.num_chunks > 1: |
There was a problem hiding this comment.
This provides most of the speedup (0.03GBps -> 0.12GBps on the attached benchmark).
There was a problem hiding this comment.
I'm wondering how general this optimization will be. What are the cases that num_chunks will be >1 ?
There was a problem hiding this comment.
another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?
There was a problem hiding this comment.
I think it's actually always 1 since we only ever take from a fully concatenated table, which coalesces the chunks already (batcher.py:326):
# Build the new shuffle buffer.
self._shuffle_buffer = self._builder.build()
if (
isinstance(
BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
)
and self._shuffle_buffer.num_columns > 0
and self._shuffle_buffer.column(0).num_chunks
>= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS
):
self._shuffle_buffer = transform_pyarrow.combine_chunks(
self._shuffle_buffer
)
This is a bit brittle though, not sure we can avoid regressions without a microbenchmark.
another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?
I believe so. Do you think that approach would be preferable?
There was a problem hiding this comment.
I'm still very confused about the logic of ShufflingBatcher.next_batch. Especially the following part.
Why put the shuffle_buffer back to the builder, while shuffle_buffer is built from the builder? This seems to copy data back and forth again and again. This could hurt perf badly when we have large shuffle_buffer_min_size and small batch_size.
if self._batch_head > 0:
# Compact the materialized shuffle buffer.
# TODO(Clark): If alternating between adding blocks and fetching
# shuffled batches, this aggressive compaction could be inefficient.
self._shuffle_buffer = BlockAccessor.for_block(
self._shuffle_buffer
).take(self._shuffle_indices[self._batch_head :])
# Add the unyielded rows from the existing shuffle buffer.
self._builder.add_block(self._shuffle_buffer)There was a problem hiding this comment.
The process in my head is very simple:
addkeeps adding data to the builder.- when builder size reaches
shuffle_buffer_min_size, it builds the shuffle buffer and shuffles the order (this will copy shuffle_buffer_min_size data). next_batchjust takes a batch from the shuffle_buffer (this only copies batch_size data).
while the current logic seems too complex. there are a lot of copies (take, add_block, combine_chunks).
There was a problem hiding this comment.
Yeah I think I agree. I'll try to rewrite this class to simplify
There was a problem hiding this comment.
I ended up just removing shuffle_indices from the current implementation, keeping the compaction logic as we still need that for performance in any case. Switching from take() with shuffle indices to slice() with up-front random shuffle further improves performance from 0.2 GBps to 0.3 GBps.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
ericl
left a comment
There was a problem hiding this comment.
Ready to review. Is there also a place I can add this microbenchmark to be tracked?
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
raulchen
left a comment
There was a problem hiding this comment.
another issue I found was can_add. I guess it's supposed to implement back-pressure. But it's never used outside the Batcher class. When the compact ratio is large, the check in add will fail. I think we should just remove the check.
| ).random_shuffle(self._shuffle_seed) | ||
| if self._shuffle_seed is not None: | ||
| self._shuffle_seed += 1 | ||
| if ( |
There was a problem hiding this comment.
This if doesn't seem useful either. I guess self._builder.build() should already return a table with only one chunk? I tried adding a print in this if, it never printed.
There was a problem hiding this comment.
I'm not sure, it seems @jjyao added this to resolve some performance regressions. I'm guessing it happens in other scenarios?
There was a problem hiding this comment.
found the original issue. I thought self._builder.build() would combine the chunks. but it's not true.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
… types (ray-project#36102) Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Why are these changes needed?
Benchmark script:
Closes #36099