-
Notifications
You must be signed in to change notification settings - Fork 6.6k
[data] Fix quadratic slowdown when locally shuffling tensor extension types #36102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
import random | ||
from typing import List, Optional | ||
from typing import Optional | ||
|
||
from ray.data._internal.arrow_block import ArrowBlockAccessor | ||
from ray.data._internal.arrow_ops import transform_pyarrow | ||
|
@@ -14,6 +13,12 @@ | |
# https://github.com/apache/arrow/issues/35126 is resolved. | ||
MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS = 2 | ||
|
||
# Delay compaction until the shuffle buffer has reached this ratio over the min | ||
# shuffle buffer size. Setting this to 1 minimizes memory usage, at the cost of | ||
# frequent compactions. Setting this to higher values increases memory usage but | ||
# reduces compaction frequency. | ||
SHUFFLE_BUFFER_COMPACTION_RATIO = 1.5 | ||
|
||
|
||
class BatcherInterface: | ||
def add(self, block: Block): | ||
|
@@ -24,10 +29,6 @@ def add(self, block: Block): | |
""" | ||
raise NotImplementedError() | ||
|
||
def can_add(self, block: Block) -> bool: | ||
"""Whether the block can be added to the buffer.""" | ||
raise NotImplementedError() | ||
|
||
def done_adding(self) -> bool: | ||
"""Indicate to the batcher that no more blocks will be added to the buffer.""" | ||
raise NotImplementedError() | ||
|
@@ -83,14 +84,9 @@ def add(self, block: Block): | |
block: Block to add to the block buffer. | ||
""" | ||
if BlockAccessor.for_block(block).num_rows() > 0: | ||
assert self.can_add(block) | ||
self._buffer.append(block) | ||
self._buffer_size += BlockAccessor.for_block(block).num_rows() | ||
|
||
def can_add(self, block: Block) -> bool: | ||
"""Whether the block can be added to the buffer.""" | ||
return not self._done_adding | ||
|
||
def done_adding(self) -> bool: | ||
"""Indicate to the batcher that no more blocks will be added to the batcher.""" | ||
self._done_adding = True | ||
|
@@ -175,28 +171,18 @@ class ShufflingBatcher(BatcherInterface): | |
# | ||
# This shuffling batcher lazily builds a shuffle buffer from added blocks, and once | ||
# a batch is requested via .next_batch(), it concatenates the blocks into a concrete | ||
# shuffle buffer, generates random shuffle indices, and starts returning shuffled | ||
# batches. | ||
# shuffle buffer and randomly shuffles the entire buffer. | ||
# | ||
# Adding of more blocks can be intermixed with retrieving batches, but it should be | ||
# noted that we can end up performing two expensive operations on each retrieval: | ||
# 1. Build added blocks into a concrete shuffle buffer. | ||
# 2. Generate random shuffle indices. | ||
# Note that (1) and (2) only happen when new blocks are added, upon the next | ||
# retrieval. I.e., if no new blocks have been added since the last batch retrieval, | ||
# and there are still batches in the existing concrete shuffle buffer to be yielded, | ||
# then each batch retrieval will only involve slicing the batch out of the concrete | ||
# shuffle buffer. | ||
# 2. Shuffling the entire buffer. | ||
# To amortize the overhead of this process, we only shuffle the blocks after a | ||
# delay designated by SHUFFLE_BUFFER_COMPACTION_RATIO. | ||
# | ||
# Similarly, adding blocks is very cheap. Each added block will be appended to a | ||
# list, with concatenation of the underlying data delayed until the next batch | ||
# retrieval. | ||
# | ||
# Since (1) runs of block additions are cheap, and (2) runs of batch retrievals are | ||
# cheap, callers of ShufflingBatcher are encouraged to add as many blocks as | ||
# possible (up to the shuffle buffer capacity), followed by retrieving as many | ||
# batches as possible (down to the shuffle buffer minimum size), in such contiguous | ||
# runs. | ||
# compaction. | ||
|
||
def __init__( | ||
self, | ||
|
@@ -220,6 +206,7 @@ def __init__( | |
if batch_size is None: | ||
raise ValueError("Must specify a batch_size if using a local shuffle.") | ||
self._batch_size = batch_size | ||
self._shuffle_seed = shuffle_seed | ||
if shuffle_buffer_min_size < batch_size: | ||
# Round it up internally to `batch_size` since our algorithm requires it. | ||
# This is harmless since it only offers extra randomization. | ||
|
@@ -231,13 +218,9 @@ def __init__( | |
self._buffer_min_size = shuffle_buffer_min_size | ||
self._builder = DelegatingBlockBuilder() | ||
self._shuffle_buffer: Block = None | ||
self._shuffle_indices: List[int] = None | ||
self._batch_head = 0 | ||
self._done_adding = False | ||
|
||
if shuffle_seed is not None: | ||
random.seed(shuffle_seed) | ||
|
||
def add(self, block: Block): | ||
"""Add a block to the shuffle buffer. | ||
|
||
|
@@ -247,18 +230,8 @@ def add(self, block: Block): | |
block: Block to add to the shuffle buffer. | ||
""" | ||
if BlockAccessor.for_block(block).num_rows() > 0: | ||
assert self.can_add(block) | ||
self._builder.add_block(block) | ||
|
||
def can_add(self, block: Block) -> bool: | ||
"""Whether the block can be added to the shuffle buffer. | ||
|
||
This does not take the to-be-added block size into account when checking the | ||
buffer size vs. buffer capacity, since we need to support large outlier blocks | ||
and have to guard against min buffer size liveness issues. | ||
""" | ||
return self._buffer_size() <= self._buffer_capacity and not self._done_adding | ||
|
||
def done_adding(self) -> bool: | ||
"""Indicate to the batcher that no more blocks will be added to the batcher. | ||
|
||
|
@@ -275,46 +248,63 @@ def has_batch(self) -> bool: | |
buffer_size = self._buffer_size() | ||
|
||
if not self._done_adding: | ||
# If still adding blocks, ensure that removing a batch wouldn't cause the | ||
# shuffle buffer to dip beneath its configured minimum size. | ||
return buffer_size - self._batch_size >= self._buffer_min_size | ||
# Delay pulling of batches until the buffer is large enough in order to | ||
# amortize compaction overhead. | ||
return self._materialized_buffer_size() >= self._buffer_min_size or ( | ||
buffer_size - self._batch_size | ||
>= self._buffer_min_size * SHUFFLE_BUFFER_COMPACTION_RATIO | ||
ericl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
else: | ||
return buffer_size >= self._batch_size | ||
|
||
def _buffer_size(self) -> int: | ||
"""Return shuffle buffer size.""" | ||
buffer_size = self._builder.num_rows() | ||
if self._shuffle_buffer is not None: | ||
# Include the size of the concrete (materialized) shuffle buffer, adjusting | ||
# for the batch head position, which also serves as a counter of the number | ||
# of already-yielded rows from the current concrete shuffle buffer. | ||
buffer_size += ( | ||
BlockAccessor.for_block(self._shuffle_buffer).num_rows() | ||
- self._batch_head | ||
) | ||
buffer_size += self._materialized_buffer_size() | ||
return buffer_size | ||
|
||
def _materialized_buffer_size(self) -> int: | ||
"""Return materialized (compacted portion of) shuffle buffer size.""" | ||
if self._shuffle_buffer is None: | ||
return 0 | ||
# The size of the concrete (materialized) shuffle buffer, adjusting | ||
# for the batch head position, which also serves as a counter of the number | ||
# of already-yielded rows from the current concrete shuffle buffer. | ||
return max( | ||
0, | ||
BlockAccessor.for_block(self._shuffle_buffer).num_rows() - self._batch_head, | ||
) | ||
|
||
def next_batch(self) -> Block: | ||
"""Get the next shuffled batch from the shuffle buffer. | ||
|
||
Returns: | ||
A batch represented as a Block. | ||
""" | ||
assert self.has_batch() or (self._done_adding and self.has_any()) | ||
# Add rows in the builder to the shuffle buffer. | ||
if self._builder.num_rows() > 0: | ||
# Add rows in the builder to the shuffle buffer. Note that we delay compaction | ||
# as much as possible to amortize the concatenation overhead. Compaction is | ||
# only necessary when the materialized buffer size falls below the min size. | ||
if self._builder.num_rows() > 0 and ( | ||
self._done_adding | ||
or self._materialized_buffer_size() <= self._buffer_min_size | ||
ericl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
if self._shuffle_buffer is not None: | ||
if self._batch_head > 0: | ||
# Compact the materialized shuffle buffer. | ||
# TODO(Clark): If alternating between adding blocks and fetching | ||
# shuffled batches, this aggressive compaction could be inefficient. | ||
self._shuffle_buffer = BlockAccessor.for_block( | ||
self._shuffle_buffer | ||
).take(self._shuffle_indices[self._batch_head :]) | ||
block = BlockAccessor.for_block(self._shuffle_buffer) | ||
self._shuffle_buffer = block.slice( | ||
self._batch_head, block.num_rows() | ||
) | ||
# Add the unyielded rows from the existing shuffle buffer. | ||
self._builder.add_block(self._shuffle_buffer) | ||
# Build the new shuffle buffer. | ||
self._shuffle_buffer = self._builder.build() | ||
self._shuffle_buffer = BlockAccessor.for_block( | ||
self._shuffle_buffer | ||
).random_shuffle(self._shuffle_seed) | ||
if self._shuffle_seed is not None: | ||
self._shuffle_seed += 1 | ||
if ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This if doesn't seem useful either. I guess There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure, it seems @jjyao added this to resolve some performance regressions. I'm guessing it happens in other scenarios? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. found the original issue. I thought |
||
isinstance( | ||
BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor | ||
|
@@ -328,24 +318,15 @@ def next_batch(self) -> Block: | |
) | ||
# Reset the builder. | ||
self._builder = DelegatingBlockBuilder() | ||
# Invalidate the shuffle indices. | ||
self._shuffle_indices = None | ||
self._batch_head = 0 | ||
|
||
assert self._shuffle_buffer is not None | ||
buffer_size = BlockAccessor.for_block(self._shuffle_buffer).num_rows() | ||
# Truncate the batch to the buffer size, if necessary. | ||
batch_size = min(self._batch_size, buffer_size) | ||
|
||
if self._shuffle_indices is None: | ||
# Need to generate new shuffle indices. | ||
self._shuffle_indices = list(range(buffer_size)) | ||
random.shuffle(self._shuffle_indices) | ||
|
||
# Get the shuffle indices for this batch. | ||
batch_indices = self._shuffle_indices[ | ||
self._batch_head : self._batch_head + batch_size | ||
] | ||
slice_start = self._batch_head | ||
self._batch_head += batch_size | ||
# Yield the shuffled batch. | ||
return BlockAccessor.for_block(self._shuffle_buffer).take(batch_indices) | ||
return BlockAccessor.for_block(self._shuffle_buffer).slice( | ||
slice_start, self._batch_head | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This provides most of the speedup (0.03GBps -> 0.12GBps on the attached benchmark).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering how general this optimization will be. What are the cases that num_chunks will be >1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the
indices
may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's actually always 1 since we only ever take from a fully concatenated table, which coalesces the chunks already (batcher.py:326):
This is a bit brittle though, not sure we can avoid regressions without a microbenchmark.
I believe so. Do you think that approach would be preferable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still very confused about the logic of
ShufflingBatcher.next_batch
. Especially the following part.Why put the shuffle_buffer back to the builder, while shuffle_buffer is built from the builder? This seems to copy data back and forth again and again. This could hurt perf badly when we have large
shuffle_buffer_min_size
and smallbatch_size
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The process in my head is very simple:
add
keeps adding data to the builder.shuffle_buffer_min_size
, it builds the shuffle buffer and shuffles the order (this will copy shuffle_buffer_min_size data).next_batch
just takes a batch from the shuffle_buffer (this only copies batch_size data).while the current logic seems too complex. there are a lot of copies (take, add_block, combine_chunks).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think I agree. I'll try to rewrite this class to simplify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up just removing
shuffle_indices
from the current implementation, keeping the compaction logic as we still need that for performance in any case. Switching from take() with shuffle indices to slice() with up-front random shuffle further improves performance from0.2 GBps
to0.3 GBps
.