Skip to content

[data] Fix quadratic slowdown when locally shuffling tensor extension types #36102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/_internal/arrow_ops/transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def take_table(
if any(_is_column_extension_type(col) for col in table.columns):
new_cols = []
for col in table.columns:
if _is_column_extension_type(col):
if _is_column_extension_type(col) and col.num_chunks > 1:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This provides most of the speedup (0.03GBps -> 0.12GBps on the attached benchmark).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how general this optimization will be. What are the cases that num_chunks will be >1 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's actually always 1 since we only ever take from a fully concatenated table, which coalesces the chunks already (batcher.py:326):

            # Build the new shuffle buffer.
            self._shuffle_buffer = self._builder.build()
            if (
                isinstance(
                    BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
                )
                and self._shuffle_buffer.num_columns > 0
                and self._shuffle_buffer.column(0).num_chunks
                >= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS
            ):
                self._shuffle_buffer = transform_pyarrow.combine_chunks(
                    self._shuffle_buffer
                )

This is a bit brittle though, not sure we can avoid regressions without a microbenchmark.

another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?

I believe so. Do you think that approach would be preferable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still very confused about the logic of ShufflingBatcher.next_batch. Especially the following part.
Why put the shuffle_buffer back to the builder, while shuffle_buffer is built from the builder? This seems to copy data back and forth again and again. This could hurt perf badly when we have large shuffle_buffer_min_size and small batch_size.

if self._batch_head > 0:
    # Compact the materialized shuffle buffer.
    # TODO(Clark): If alternating between adding blocks and fetching
    # shuffled batches, this aggressive compaction could be inefficient.
    self._shuffle_buffer = BlockAccessor.for_block(
        self._shuffle_buffer
    ).take(self._shuffle_indices[self._batch_head :])
# Add the unyielded rows from the existing shuffle buffer.
self._builder.add_block(self._shuffle_buffer)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process in my head is very simple:

  1. add keeps adding data to the builder.
  2. when builder size reaches shuffle_buffer_min_size, it builds the shuffle buffer and shuffles the order (this will copy shuffle_buffer_min_size data).
  3. next_batch just takes a batch from the shuffle_buffer (this only copies batch_size data).

while the current logic seems too complex. there are a lot of copies (take, add_block, combine_chunks).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I agree. I'll try to rewrite this class to simplify

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up just removing shuffle_indices from the current implementation, keeping the compaction logic as we still need that for performance in any case. Switching from take() with shuffle indices to slice() with up-front random shuffle further improves performance from 0.2 GBps to 0.3 GBps.

# .take() will concatenate internally, which currently breaks for
# extension arrays.
col = _concatenate_extension_column(col)
Expand Down
123 changes: 52 additions & 71 deletions python/ray/data/_internal/batcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import random
from typing import List, Optional
from typing import Optional

from ray.data._internal.arrow_block import ArrowBlockAccessor
from ray.data._internal.arrow_ops import transform_pyarrow
Expand All @@ -14,6 +13,12 @@
# https://github.com/apache/arrow/issues/35126 is resolved.
MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS = 2

# Delay compaction until the shuffle buffer has reached this ratio over the min
# shuffle buffer size. Setting this to 1 minimizes memory usage, at the cost of
# frequent compactions. Setting this to higher values increases memory usage but
# reduces compaction frequency.
SHUFFLE_BUFFER_COMPACTION_RATIO = 1.5


class BatcherInterface:
def add(self, block: Block):
Expand All @@ -24,10 +29,6 @@ def add(self, block: Block):
"""
raise NotImplementedError()

def can_add(self, block: Block) -> bool:
"""Whether the block can be added to the buffer."""
raise NotImplementedError()

def done_adding(self) -> bool:
"""Indicate to the batcher that no more blocks will be added to the buffer."""
raise NotImplementedError()
Expand Down Expand Up @@ -83,14 +84,9 @@ def add(self, block: Block):
block: Block to add to the block buffer.
"""
if BlockAccessor.for_block(block).num_rows() > 0:
assert self.can_add(block)
self._buffer.append(block)
self._buffer_size += BlockAccessor.for_block(block).num_rows()

def can_add(self, block: Block) -> bool:
"""Whether the block can be added to the buffer."""
return not self._done_adding

def done_adding(self) -> bool:
"""Indicate to the batcher that no more blocks will be added to the batcher."""
self._done_adding = True
Expand Down Expand Up @@ -175,28 +171,18 @@ class ShufflingBatcher(BatcherInterface):
#
# This shuffling batcher lazily builds a shuffle buffer from added blocks, and once
# a batch is requested via .next_batch(), it concatenates the blocks into a concrete
# shuffle buffer, generates random shuffle indices, and starts returning shuffled
# batches.
# shuffle buffer and randomly shuffles the entire buffer.
#
# Adding of more blocks can be intermixed with retrieving batches, but it should be
# noted that we can end up performing two expensive operations on each retrieval:
# 1. Build added blocks into a concrete shuffle buffer.
# 2. Generate random shuffle indices.
# Note that (1) and (2) only happen when new blocks are added, upon the next
# retrieval. I.e., if no new blocks have been added since the last batch retrieval,
# and there are still batches in the existing concrete shuffle buffer to be yielded,
# then each batch retrieval will only involve slicing the batch out of the concrete
# shuffle buffer.
# 2. Shuffling the entire buffer.
# To amortize the overhead of this process, we only shuffle the blocks after a
# delay designated by SHUFFLE_BUFFER_COMPACTION_RATIO.
#
# Similarly, adding blocks is very cheap. Each added block will be appended to a
# list, with concatenation of the underlying data delayed until the next batch
# retrieval.
#
# Since (1) runs of block additions are cheap, and (2) runs of batch retrievals are
# cheap, callers of ShufflingBatcher are encouraged to add as many blocks as
# possible (up to the shuffle buffer capacity), followed by retrieving as many
# batches as possible (down to the shuffle buffer minimum size), in such contiguous
# runs.
# compaction.

def __init__(
self,
Expand All @@ -220,6 +206,7 @@ def __init__(
if batch_size is None:
raise ValueError("Must specify a batch_size if using a local shuffle.")
self._batch_size = batch_size
self._shuffle_seed = shuffle_seed
if shuffle_buffer_min_size < batch_size:
# Round it up internally to `batch_size` since our algorithm requires it.
# This is harmless since it only offers extra randomization.
Expand All @@ -231,13 +218,9 @@ def __init__(
self._buffer_min_size = shuffle_buffer_min_size
self._builder = DelegatingBlockBuilder()
self._shuffle_buffer: Block = None
self._shuffle_indices: List[int] = None
self._batch_head = 0
self._done_adding = False

if shuffle_seed is not None:
random.seed(shuffle_seed)

def add(self, block: Block):
"""Add a block to the shuffle buffer.

Expand All @@ -247,18 +230,8 @@ def add(self, block: Block):
block: Block to add to the shuffle buffer.
"""
if BlockAccessor.for_block(block).num_rows() > 0:
assert self.can_add(block)
self._builder.add_block(block)

def can_add(self, block: Block) -> bool:
"""Whether the block can be added to the shuffle buffer.

This does not take the to-be-added block size into account when checking the
buffer size vs. buffer capacity, since we need to support large outlier blocks
and have to guard against min buffer size liveness issues.
"""
return self._buffer_size() <= self._buffer_capacity and not self._done_adding

def done_adding(self) -> bool:
"""Indicate to the batcher that no more blocks will be added to the batcher.

Expand All @@ -275,46 +248,63 @@ def has_batch(self) -> bool:
buffer_size = self._buffer_size()

if not self._done_adding:
# If still adding blocks, ensure that removing a batch wouldn't cause the
# shuffle buffer to dip beneath its configured minimum size.
return buffer_size - self._batch_size >= self._buffer_min_size
# Delay pulling of batches until the buffer is large enough in order to
# amortize compaction overhead.
return self._materialized_buffer_size() >= self._buffer_min_size or (
buffer_size - self._batch_size
>= self._buffer_min_size * SHUFFLE_BUFFER_COMPACTION_RATIO
)
else:
return buffer_size >= self._batch_size

def _buffer_size(self) -> int:
"""Return shuffle buffer size."""
buffer_size = self._builder.num_rows()
if self._shuffle_buffer is not None:
# Include the size of the concrete (materialized) shuffle buffer, adjusting
# for the batch head position, which also serves as a counter of the number
# of already-yielded rows from the current concrete shuffle buffer.
buffer_size += (
BlockAccessor.for_block(self._shuffle_buffer).num_rows()
- self._batch_head
)
buffer_size += self._materialized_buffer_size()
return buffer_size

def _materialized_buffer_size(self) -> int:
"""Return materialized (compacted portion of) shuffle buffer size."""
if self._shuffle_buffer is None:
return 0
# The size of the concrete (materialized) shuffle buffer, adjusting
# for the batch head position, which also serves as a counter of the number
# of already-yielded rows from the current concrete shuffle buffer.
return max(
0,
BlockAccessor.for_block(self._shuffle_buffer).num_rows() - self._batch_head,
)

def next_batch(self) -> Block:
"""Get the next shuffled batch from the shuffle buffer.

Returns:
A batch represented as a Block.
"""
assert self.has_batch() or (self._done_adding and self.has_any())
# Add rows in the builder to the shuffle buffer.
if self._builder.num_rows() > 0:
# Add rows in the builder to the shuffle buffer. Note that we delay compaction
# as much as possible to amortize the concatenation overhead. Compaction is
# only necessary when the materialized buffer size falls below the min size.
if self._builder.num_rows() > 0 and (
self._done_adding
or self._materialized_buffer_size() <= self._buffer_min_size
):
if self._shuffle_buffer is not None:
if self._batch_head > 0:
# Compact the materialized shuffle buffer.
# TODO(Clark): If alternating between adding blocks and fetching
# shuffled batches, this aggressive compaction could be inefficient.
self._shuffle_buffer = BlockAccessor.for_block(
self._shuffle_buffer
).take(self._shuffle_indices[self._batch_head :])
block = BlockAccessor.for_block(self._shuffle_buffer)
self._shuffle_buffer = block.slice(
self._batch_head, block.num_rows()
)
# Add the unyielded rows from the existing shuffle buffer.
self._builder.add_block(self._shuffle_buffer)
# Build the new shuffle buffer.
self._shuffle_buffer = self._builder.build()
self._shuffle_buffer = BlockAccessor.for_block(
self._shuffle_buffer
).random_shuffle(self._shuffle_seed)
if self._shuffle_seed is not None:
self._shuffle_seed += 1
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if doesn't seem useful either. I guess self._builder.build() should already return a table with only one chunk? I tried adding a print in this if, it never printed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, it seems @jjyao added this to resolve some performance regressions. I'm guessing it happens in other scenarios?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found the original issue. I thought self._builder.build() would combine the chunks. but it's not true.

isinstance(
BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
Expand All @@ -328,24 +318,15 @@ def next_batch(self) -> Block:
)
# Reset the builder.
self._builder = DelegatingBlockBuilder()
# Invalidate the shuffle indices.
self._shuffle_indices = None
self._batch_head = 0

assert self._shuffle_buffer is not None
buffer_size = BlockAccessor.for_block(self._shuffle_buffer).num_rows()
# Truncate the batch to the buffer size, if necessary.
batch_size = min(self._batch_size, buffer_size)

if self._shuffle_indices is None:
# Need to generate new shuffle indices.
self._shuffle_indices = list(range(buffer_size))
random.shuffle(self._shuffle_indices)

# Get the shuffle indices for this batch.
batch_indices = self._shuffle_indices[
self._batch_head : self._batch_head + batch_size
]
slice_start = self._batch_head
self._batch_head += batch_size
# Yield the shuffled batch.
return BlockAccessor.for_block(self._shuffle_buffer).take(batch_indices)
return BlockAccessor.for_block(self._shuffle_buffer).slice(
slice_start, self._batch_head
)
Loading