Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Fix quadratic slowdown when locally shuffling tensor extension types #36102

Merged
merged 6 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/_internal/arrow_ops/transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def take_table(
if any(_is_column_extension_type(col) for col in table.columns):
new_cols = []
for col in table.columns:
if _is_column_extension_type(col):
if _is_column_extension_type(col) and col.num_chunks > 1:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This provides most of the speedup (0.03GBps -> 0.12GBps on the attached benchmark).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how general this optimization will be. What are the cases that num_chunks will be >1 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's actually always 1 since we only ever take from a fully concatenated table, which coalesces the chunks already (batcher.py:326):

            # Build the new shuffle buffer.
            self._shuffle_buffer = self._builder.build()
            if (
                isinstance(
                    BlockAccessor.for_block(self._shuffle_buffer), ArrowBlockAccessor
                )
                and self._shuffle_buffer.num_columns > 0
                and self._shuffle_buffer.column(0).num_chunks
                >= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS
            ):
                self._shuffle_buffer = transform_pyarrow.combine_chunks(
                    self._shuffle_buffer
                )

This is a bit brittle though, not sure we can avoid regressions without a microbenchmark.

another question. IIUC, here we will concatenate all the chunks for the entire column, which is not necessary. Because the indices may only fall into some of the chunks. We can potentially optimize this by only taking the rows from partial chunks. Is my understanding correct?

I believe so. Do you think that approach would be preferable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still very confused about the logic of ShufflingBatcher.next_batch. Especially the following part.
Why put the shuffle_buffer back to the builder, while shuffle_buffer is built from the builder? This seems to copy data back and forth again and again. This could hurt perf badly when we have large shuffle_buffer_min_size and small batch_size.

if self._batch_head > 0:
    # Compact the materialized shuffle buffer.
    # TODO(Clark): If alternating between adding blocks and fetching
    # shuffled batches, this aggressive compaction could be inefficient.
    self._shuffle_buffer = BlockAccessor.for_block(
        self._shuffle_buffer
    ).take(self._shuffle_indices[self._batch_head :])
# Add the unyielded rows from the existing shuffle buffer.
self._builder.add_block(self._shuffle_buffer)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process in my head is very simple:

  1. add keeps adding data to the builder.
  2. when builder size reaches shuffle_buffer_min_size, it builds the shuffle buffer and shuffles the order (this will copy shuffle_buffer_min_size data).
  3. next_batch just takes a batch from the shuffle_buffer (this only copies batch_size data).

while the current logic seems too complex. there are a lot of copies (take, add_block, combine_chunks).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I agree. I'll try to rewrite this class to simplify

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up just removing shuffle_indices from the current implementation, keeping the compaction logic as we still need that for performance in any case. Switching from take() with shuffle indices to slice() with up-front random shuffle further improves performance from 0.2 GBps to 0.3 GBps.

# .take() will concatenate internally, which currently breaks for
# extension arrays.
col = _concatenate_extension_column(col)
Expand Down
47 changes: 32 additions & 15 deletions python/ray/data/_internal/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
# https://github.com/apache/arrow/issues/35126 is resolved.
MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS = 2

# Delay compaction until the shuffle buffer has reached this ratio over the min
# shuffle buffer size. Setting this to 1 minimizes memory usage, at the cost of
# frequent compactions. Setting this to higher values increases memory usage but
# reduces compaction frequency.
SHUFFLE_BUFFER_COMPACTION_RATIO = 1.5


class BatcherInterface:
def add(self, block: Block):
Expand Down Expand Up @@ -275,39 +281,50 @@ def has_batch(self) -> bool:
buffer_size = self._buffer_size()

if not self._done_adding:
# If still adding blocks, ensure that removing a batch wouldn't cause the
# shuffle buffer to dip beneath its configured minimum size.
return buffer_size - self._batch_size >= self._buffer_min_size
# Delay pulling of batches until the buffer is large enough in order to
# amortize compaction overhead.
return (
buffer_size - self._batch_size
>= self._buffer_min_size * SHUFFLE_BUFFER_COMPACTION_RATIO
)
else:
return buffer_size >= self._batch_size

def _buffer_size(self) -> int:
"""Return shuffle buffer size."""
buffer_size = self._builder.num_rows()
if self._shuffle_buffer is not None:
# Include the size of the concrete (materialized) shuffle buffer, adjusting
# for the batch head position, which also serves as a counter of the number
# of already-yielded rows from the current concrete shuffle buffer.
buffer_size += (
BlockAccessor.for_block(self._shuffle_buffer).num_rows()
- self._batch_head
)
buffer_size += self._materialized_buffer_size()
return buffer_size

def _materialized_buffer_size(self) -> int:
"""Return materialized (compacted portion of) shuffle buffer size."""
if self._shuffle_buffer is None:
return 0
# The size of the concrete (materialized) shuffle buffer, adjusting
# for the batch head position, which also serves as a counter of the number
# of already-yielded rows from the current concrete shuffle buffer.
return max(
0,
BlockAccessor.for_block(self._shuffle_buffer).num_rows() - self._batch_head,
)

def next_batch(self) -> Block:
"""Get the next shuffled batch from the shuffle buffer.

Returns:
A batch represented as a Block.
"""
assert self.has_batch() or (self._done_adding and self.has_any())
# Add rows in the builder to the shuffle buffer.
if self._builder.num_rows() > 0:
# Add rows in the builder to the shuffle buffer. Note that we delay compaction
# as much as possible to amortize the concatenation overhead. Compaction is
# only necessary when the materialized buffer size falls below the min size.
if self._builder.num_rows() > 0 and (
self._done_adding
or self._materialized_buffer_size() <= self._buffer_min_size
):
if self._shuffle_buffer is not None:
if self._batch_head > 0:
# Compact the materialized shuffle buffer.
# TODO(Clark): If alternating between adding blocks and fetching
# shuffled batches, this aggressive compaction could be inefficient.
self._shuffle_buffer = BlockAccessor.for_block(
self._shuffle_buffer
).take(self._shuffle_indices[self._batch_head :])
Expand Down
131 changes: 91 additions & 40 deletions python/ray/data/tests/test_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,39 @@ def test_shuffling_batcher():
shuffle_buffer_min_size=buffer_size,
)

def add_and_check(num_rows, expect_has_batch=False, no_nexting_yet=True):
def add_and_check(
num_rows,
materialized_buffer_size,
pending_buffer_size,
expect_has_batch=False,
no_nexting_yet=True,
):
block = gen_block(num_rows)
assert batcher.can_add(block)
batcher.add(block)
assert not expect_has_batch or batcher.has_batch()
if expect_has_batch:
assert batcher.has_batch()
else:
assert not batcher.has_batch()

if no_nexting_yet:
# Check that no shuffle buffer has been materialized yet.
assert batcher._shuffle_buffer is None
assert batcher._shuffle_indices is None
assert batcher._batch_head == 0

assert batcher._builder.num_rows() == pending_buffer_size
assert batcher._materialized_buffer_size() == materialized_buffer_size
if batcher._shuffle_indices:
assert (
max(0, len(batcher._shuffle_indices) - batcher._batch_head)
== materialized_buffer_size
)

def next_and_check(
current_cursor,
materialized_buffer_size,
pending_buffer_size,
should_batch_be_full=True,
should_have_batch_after=True,
new_data_added=False,
Expand All @@ -52,27 +71,18 @@ def next_and_check(
if new_data_added:
# If new data was added, the builder should be non-empty.
assert batcher._builder.num_rows() > 0
# Store the old shuffle indices for comparison in post.
old_shuffle_indices = batcher._shuffle_indices

batch = batcher.next_batch()

if should_batch_be_full:
assert len(batch) == batch_size

# Check that shuffle buffer has been materialized and its state is as expected.
assert batcher._shuffle_buffer is not None
# Builder should always be empty after consuming a batch since the shuffle
# buffer should always be materialized.
assert batcher._builder.num_rows() == 0
assert len(
batcher._shuffle_indices
) == batcher._buffer_size() + current_cursor + len(batch)
if new_data_added:
# If new data was added, confirm that the old shuffle indices were
# invalidated.
assert batcher._shuffle_indices != old_shuffle_indices
assert batcher._batch_head == current_cursor + len(batch)
assert batcher._builder.num_rows() == pending_buffer_size
assert batcher._materialized_buffer_size() == materialized_buffer_size
if batcher._shuffle_indices:
assert (
max(0, len(batcher._shuffle_indices) - batcher._batch_head)
== materialized_buffer_size
)

if should_have_batch_after:
assert batcher.has_batch()
Expand All @@ -81,48 +91,89 @@ def next_and_check(

# Add less than a batch.
# Buffer not full and no batch slack.
add_and_check(3)
add_and_check(3, materialized_buffer_size=0, pending_buffer_size=3)

# Add to more than a batch (total=10).
# Buffer not full and no batch slack.
add_and_check(7)
add_and_check(7, materialized_buffer_size=0, pending_buffer_size=10)

# Fill up to buffer (total=20).
# Buffer is full but no batch slack.
add_and_check(10)
add_and_check(10, materialized_buffer_size=0, pending_buffer_size=20)

# Fill past buffer but not to full batch (total=22)
# Buffer is over-full but no batch slack.
add_and_check(2)

# Fill past buffer to full batch (total=25).
add_and_check(3, expect_has_batch=True)
# Fill past buffer and over 1.5 * min buffer size. A batch is now available, but
# compaction still doesn't happen until a next().
add_and_check(
15, materialized_buffer_size=0, pending_buffer_size=35, expect_has_batch=True
)

# Consume only available batch.
next_and_check(0, should_have_batch_after=False, new_data_added=True)
next_and_check(
0,
materialized_buffer_size=30,
pending_buffer_size=0,
should_have_batch_after=False,
new_data_added=True,
)

# Add 4 batches-worth to the already-full buffer.
add_and_check(20, no_nexting_yet=False)
add_and_check(
20,
materialized_buffer_size=30,
pending_buffer_size=20,
expect_has_batch=True,
no_nexting_yet=False,
)

# Consume 4 batches from the buffer.
next_and_check(0, new_data_added=True)
next_and_check(batch_size)
next_and_check(2 * batch_size)
next_and_check(3 * batch_size, should_have_batch_after=False)
next_and_check(
0, materialized_buffer_size=25, pending_buffer_size=20, new_data_added=True
)
next_and_check(batch_size, materialized_buffer_size=20, pending_buffer_size=20)
# Triggers materialization of pending batches to avoid falling below min buf size.
next_and_check(2 * batch_size, materialized_buffer_size=35, pending_buffer_size=0)
next_and_check(
3 * batch_size,
materialized_buffer_size=30,
pending_buffer_size=0,
should_have_batch_after=False,
)

# Add a full batch + a partial batch to the buffer.
add_and_check(8, no_nexting_yet=False)
next_and_check(0, should_have_batch_after=False, new_data_added=True)
add_and_check(
8,
materialized_buffer_size=30,
pending_buffer_size=8,
expect_has_batch=True,
no_nexting_yet=False,
)
next_and_check(
0,
materialized_buffer_size=25,
pending_buffer_size=8,
should_have_batch_after=False,
new_data_added=True,
)

# Indicate to the batcher that we're done adding blocks.
batcher.done_adding()

# Consume 4 full batches and one partial batch.
next_and_check(batch_size)
next_and_check(2 * batch_size)
next_and_check(3 * batch_size)
# Consume 6 full batches and one partial batch, fully draining the buffer.
next_and_check(batch_size, materialized_buffer_size=28, pending_buffer_size=0)
next_and_check(2 * batch_size, materialized_buffer_size=23, pending_buffer_size=0)
next_and_check(3 * batch_size, materialized_buffer_size=18, pending_buffer_size=0)
next_and_check(4 * batch_size, materialized_buffer_size=13, pending_buffer_size=0)
next_and_check(5 * batch_size, materialized_buffer_size=8, pending_buffer_size=0)
next_and_check(
6 * batch_size,
materialized_buffer_size=3,
pending_buffer_size=0,
should_have_batch_after=False,
)
next_and_check(
4 * batch_size,
7 * batch_size,
materialized_buffer_size=0,
pending_buffer_size=0,
should_batch_be_full=False,
should_have_batch_after=False,
)
Expand Down