Skip to content

Commit

Permalink
[pipelineX](local shuffle) remove unused code in local shuffle to imp…
Browse files Browse the repository at this point in the history
…rove performance apache#29292
  • Loading branch information
Mryange authored Dec 29, 2023
1 parent 7c5fda1 commit 07bd65a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 125 deletions.
141 changes: 29 additions & 112 deletions be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,38 +283,12 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
_passthrough_data_queue[channel_id].enqueue(std::move(new_block));
_data_queue[channel_id].enqueue(std::move(new_block));
local_state._shared_state->set_ready_to_read(channel_id);

return Status::OK();
}

bool AdaptivePassthroughExchanger::_passthrough_get_block(
RuntimeState* state, vectorized::Block* block, SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
_free_blocks.enqueue(std::move(next_block));
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
return false;
}
} else if (_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
_free_blocks.enqueue(std::move(next_block));
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
return false;
}
return true;
}

Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
Expand All @@ -339,7 +313,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
auto& data_queue = _shuffle_data_queue;
auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
Expand All @@ -357,79 +331,21 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
local_state._partition_rows_histogram[channel_ids[i]]--;
}
}

vectorized::Block data_block;
std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
if (_free_blocks.try_enqueue(data_block)) {
new_block_wrapper = ShuffleBlockWrapper::create_shared(std::move(data_block));
} else {
new_block_wrapper = ShuffleBlockWrapper::create_shared(block->clone_empty());
}

new_block_wrapper->data_block.swap(*block);
if (new_block_wrapper->data_block.empty()) {
return Status::OK();
}
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
new_block_wrapper->ref(_num_partitions);

for (size_t i = 0; i < _num_partitions; i++) {
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
const size_t start = local_state._partition_rows_histogram[i];
const size_t size = local_state._partition_rows_histogram[i + 1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
i, new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(i);
} else {
new_block_wrapper->unref(local_state._shared_state);
std::unique_ptr<vectorized::MutableBlock> mutable_block =
vectorized::MutableBlock::create_unique(block->clone_empty());
mutable_block->add_rows(block, start, size);
auto new_block = mutable_block->to_block();
local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes());
data_queue[i].enqueue(std::move(new_block));
}
local_state._shared_state->set_ready_to_read(i);
}

return Status::OK();
}
bool AdaptivePassthroughExchanger::_shuffle_get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;

auto get_data = [&](vectorized::Block* result_block) {
do {
const auto* offset_start = &((
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false);
mutable_block->add_rows(&block_wrapper->data_block, offset_start,
offset_start + std::get<2>(partitioned_block.second));
block_wrapper->unref(local_state._shared_state);
} while (mutable_block->rows() < state->batch_size() &&
_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
*result_block = mutable_block->to_block();
};
if (_running_sink_operators == 0) {
if (_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::MutableBlock::create_unique(
partitioned_block.first->data_block.clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
return false;
}
} else if (_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::MutableBlock::create_unique(
partitioned_block.first->data_block.clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
return false;
}
return true;
}

Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state,
Expand All @@ -447,23 +363,24 @@ Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block
Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
auto is_shuffle_success = false, is_passthrough_success = false;
SourceState shuffle_state = SourceState::MORE_DATA, passthrough_state = SourceState::MORE_DATA;

is_shuffle_success = _shuffle_get_block(state, block, shuffle_state, local_state);

if (is_shuffle_success) {
return Status::OK();
}

is_passthrough_success = _passthrough_get_block(state, block, passthrough_state, local_state);

if (is_passthrough_success) {
return Status::OK();
}

if (shuffle_state == SourceState::FINISHED && passthrough_state == SourceState::FINISHED) {
source_state = SourceState::FINISHED;
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
_free_blocks.enqueue(std::move(next_block));
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
_free_blocks.enqueue(std::move(next_block));
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand Down
15 changes: 2 additions & 13 deletions be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,10 @@ class BroadcastExchanger final : public Exchanger {
// a copy of ShuffleExchanger and PassthroughExchanger.
class AdaptivePassthroughExchanger : public Exchanger {
public:
using PartitionedBlock =
std::pair<std::shared_ptr<ShuffleBlockWrapper>,
std::tuple<std::shared_ptr<std::vector<uint32_t>>, size_t, size_t>>;
ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions)
: Exchanger(running_sink_operators, num_partitions) {
_passthrough_data_queue.resize(num_partitions);
_shuffle_data_queue.resize(num_partitions);
_data_queue.resize(num_partitions);
}
Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state,
LocalExchangeSinkLocalState& local_state) override;
Expand All @@ -204,17 +200,10 @@ class AdaptivePassthroughExchanger : public Exchanger {
SourceState source_state, LocalExchangeSinkLocalState& local_state);
Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state,
LocalExchangeSinkLocalState& local_state);

bool _passthrough_get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state);
bool _shuffle_get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state, LocalExchangeSourceLocalState& local_state);
Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state,
LocalExchangeSinkLocalState& local_state);
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _passthrough_data_queue;
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _shuffle_data_queue;
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;

std::atomic_bool _is_pass_through = false;
std::atomic_int32_t _total_block = 0;
Expand Down

0 comments on commit 07bd65a

Please sign in to comment.