Skip to content

Commit

Permalink
[Chore](exchange) change LocalExchangeSharedState:mem_usage signed ty…
Browse files Browse the repository at this point in the history
…pe to avoid query … (apache#36682)

…blocked when negative mem_usage

## Proposed changes
change LocalExchangeSharedState:mem_usage signed type to avoid query
blocked when negative mem_usage
  • Loading branch information
BiteTheDDDDt authored Jun 24, 2024
1 parent 1c435c2 commit a95d58b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 80 deletions.
40 changes: 15 additions & 25 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,8 @@ struct AggSharedState : public BasicSharedState {
std::vector<size_t> make_nullable_keys;

struct MemoryRecord {
MemoryRecord() : used_in_arena(0), used_in_state(0) {}
int64_t used_in_arena;
int64_t used_in_state;
int64_t used_in_arena {};
int64_t used_in_state {};
};
MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
Expand All @@ -362,11 +361,7 @@ struct AggSharedState : public BasicSharedState {
_order_directions(order_directions),
_null_directions(null_directions) {}

HeapLimitCursor(const HeapLimitCursor& other) noexcept
: _row_id(other._row_id),
_limit_columns(other._limit_columns),
_order_directions(other._order_directions),
_null_directions(other._null_directions) {}
HeapLimitCursor(const HeapLimitCursor& other) = default;

HeapLimitCursor(HeapLimitCursor&& other) noexcept
: _row_id(other._row_id),
Expand Down Expand Up @@ -567,11 +562,10 @@ struct MultiCastSharedState : public BasicSharedState {
};

struct BlockRowPos {
BlockRowPos() : block_num(0), row_num(0), pos(0) {}
int64_t block_num; //the pos at which block
int64_t row_num; //the pos at which row
int64_t pos; //pos = all blocks size + row_num
std::string debug_string() {
int64_t block_num {}; //the pos at which block
int64_t row_num {}; //the pos at which row
int64_t pos {}; //pos = all blocks size + row_num
std::string debug_string() const {
std::string res = "\t block_num: ";
res += std::to_string(block_num);
res += "\t row_num: ";
Expand Down Expand Up @@ -823,14 +817,9 @@ struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
DataDistribution(const DataDistribution& other)
: distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {}
DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
DataDistribution& operator=(const DataDistribution& other) {
distribution_type = other.distribution_type;
partition_exprs = other.partition_exprs;
return *this;
}
DataDistribution& operator=(const DataDistribution& other) = default;
ExchangeType distribution_type;
std::vector<TExpr> partition_exprs;
};
Expand All @@ -843,13 +832,14 @@ struct LocalExchangeSharedState : public BasicSharedState {
LocalExchangeSharedState(int num_instances);
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id) {
for (size_t i = 0; i < source_deps.size(); i++) {
source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_deps[i]->set_shared_state(this);
for (auto& source_dep : source_deps) {
source_dep = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_dep->set_shared_state(this);
}
};
void sub_running_sink_operators();
Expand Down
114 changes: 59 additions & 55 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
return Status::OK();
};

if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand Down Expand Up @@ -144,6 +138,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(it.second);
} else {
local_state._shared_state->sub_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
Expand All @@ -162,6 +158,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(i % _num_sources);
} else {
local_state._shared_state->sub_mem_usage(
i % _num_sources, new_block_wrapper->data_block.allocated_bytes(),
false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
Expand All @@ -181,6 +180,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(map[i]);
} else {
local_state._shared_state->sub_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
Expand All @@ -200,9 +201,12 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

return Status::OK();
Expand All @@ -220,25 +224,16 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand All @@ -264,14 +259,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo
return Status::OK();
}
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
*eos = true;
}
} else if (_data_queue[0].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand All @@ -287,10 +279,14 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_
}
new_block.swap(*in_block);
DCHECK_LE(local_state._channel_id, _data_queue.size());
add_mem_usage(local_state, new_block.allocated_bytes());

size_t memory_usage = new_block.allocated_bytes();
add_mem_usage(local_state, memory_usage);

if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
} else {
sub_mem_usage(local_state, memory_usage);
}
if (eos) {
_queue_deps[local_state._channel_id]->set_always_ready();
Expand Down Expand Up @@ -350,6 +346,19 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block
return Status::OK();
}

void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSinkLocalState& local_state,
int64_t delta) {
const auto channel_id = local_state._channel_id;
local_state._shared_state->mem_trackers[channel_id]->release(delta);
if (_queues_mem_usege[channel_id].fetch_sub(delta) > _each_queue_limit) {
_sink_deps[channel_id]->set_ready();
}
// if queue empty , block this queue
if (_queues_mem_usege[channel_id] == 0) {
_queue_deps[channel_id]->block();
}
}

void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& local_state,
int64_t delta) {
const auto channel_id = local_state._channel_id;
Expand Down Expand Up @@ -412,14 +421,11 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand All @@ -436,9 +442,12 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

return Status::OK();
Expand Down Expand Up @@ -494,9 +503,13 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
vectorized::MutableBlock::create_unique(block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes());

size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(i, memory_usage);
if (data_queue[i].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(i);
} else {
local_state._shared_state->sub_mem_usage(i, memory_usage);
}
}
}
Expand All @@ -519,25 +532,16 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ class LocalMergeSortExchanger final : public Exchanger<vectorized::Block> {
std::vector<Dependency*> local_state_dependency(int channel_id) override;

void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta);
void sub_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta);
void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int channel_id, int64_t delta);
void close(LocalExchangeSourceLocalState& local_state) override {}

Expand Down

0 comments on commit a95d58b

Please sign in to comment.