-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[refactor](broadcastbuffer) using a queue to remove ref and unref codes #28698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,16 +45,40 @@ | |
| #include "vec/sink/vdata_stream_sender.h" | ||
|
|
||
| namespace doris { | ||
|
|
||
| namespace vectorized { | ||
| BroadcastPBlockHolder::~BroadcastPBlockHolder() { | ||
| // lock the parent queue, if the queue could lock success, then return the block | ||
| // to the queue, to reuse the block | ||
| std::shared_ptr<BroadcastPBlockHolderQueue> tmp_queue = _parent_creator.lock(); | ||
| if (tmp_queue != nullptr) { | ||
| tmp_queue->push(BroadcastPBlockHolder::create_shared(std::move(_pblock))); | ||
| } | ||
| // If the queue already deconstruted, then release pblock automatically since it | ||
| // is a unique ptr. | ||
| } | ||
|
|
||
| void BroadcastPBlockHolder::unref() noexcept { | ||
| DCHECK_GT(_ref_count._value, 0); | ||
| auto old_value = _ref_count._value.fetch_sub(1); | ||
| if (_dep && old_value == 1) { | ||
| _dep->return_available_block(); | ||
| void BroadcastPBlockHolderQueue::push(std::shared_ptr<BroadcastPBlockHolder> holder) { | ||
yiguolei marked this conversation as resolved.
Show resolved
Hide resolved
yiguolei marked this conversation as resolved.
Show resolved
Hide resolved
yiguolei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| std::unique_lock l(_holders_lock); | ||
| holder->set_parent_creator(shared_from_this()); | ||
| _holders.push(holder); | ||
| if (_broadcast_dependency) { | ||
| _broadcast_dependency->set_ready(); | ||
| } | ||
| } | ||
|
|
||
| std::shared_ptr<BroadcastPBlockHolder> BroadcastPBlockHolderQueue::pop() { | ||
yiguolei marked this conversation as resolved.
Show resolved
Hide resolved
yiguolei marked this conversation as resolved.
Show resolved
Hide resolved
yiguolei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| std::unique_lock l(_holders_lock); | ||
| if (_holders.empty()) { | ||
| return {}; | ||
| } | ||
| std::shared_ptr<BroadcastPBlockHolder> res = _holders.top(); | ||
| _holders.pop(); | ||
| if (_holders.empty() && _broadcast_dependency != nullptr) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DCHECK(_broadcast_dependency != nullptr);
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. C ould not add this decheck ,because pipeline also use this code. |
||
| _broadcast_dependency->block(); | ||
| } | ||
| return res; | ||
| } | ||
| } // namespace vectorized | ||
|
|
||
| namespace pipeline { | ||
|
|
@@ -184,12 +208,10 @@ Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) { | |
| template <typename Parent> | ||
| Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& request) { | ||
| if (_is_finishing) { | ||
| request.block_holder->unref(); | ||
| return Status::OK(); | ||
| } | ||
| TUniqueId ins_id = request.channel->_fragment_instance_id; | ||
| if (_is_receiver_eof(ins_id.lo)) { | ||
| request.block_holder->unref(); | ||
| return Status::EndOfFile("receiver eof"); | ||
| } | ||
| bool send_now = false; | ||
|
|
@@ -243,7 +265,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { | |
| if (!request.exec_status.ok()) { | ||
| request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); | ||
| } | ||
| auto send_callback = request.channel->get_send_callback(id, request.eos, nullptr); | ||
| auto send_callback = request.channel->get_send_callback(id, request.eos); | ||
|
|
||
| _instance_to_rpc_ctx[id]._send_callback = send_callback; | ||
| _instance_to_rpc_ctx[id].is_cancelled = false; | ||
|
|
@@ -307,8 +329,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { | |
| auto statistic = brpc_request->mutable_query_statistics(); | ||
| _statistics->to_pb(statistic); | ||
| } | ||
| auto send_callback = | ||
| request.channel->get_send_callback(id, request.eos, request.block_holder); | ||
| auto send_callback = request.channel->get_send_callback(id, request.eos); | ||
|
|
||
| ExchangeRpcContext rpc_ctx; | ||
| rpc_ctx._send_callback = send_callback; | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -181,13 +181,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf | |||||
| _exchange_sink_dependency->add_child(_queue_dependency); | ||||||
| if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && | ||||||
| !only_local_exchange) { | ||||||
| _broadcast_dependency = BroadcastDependency::create_shared( | ||||||
| _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); | ||||||
| _broadcast_dependency->set_available_block(config::num_broadcast_buffer); | ||||||
| _broadcast_pb_blocks.reserve(config::num_broadcast_buffer); | ||||||
| for (size_t i = 0; i < config::num_broadcast_buffer; i++) { | ||||||
| _broadcast_pb_blocks.emplace_back( | ||||||
| vectorized::BroadcastPBlockHolder(_broadcast_dependency.get())); | ||||||
| _broadcast_dependency = | ||||||
| Dependency::create_shared(_parent->operator_id(), _parent->node_id(), | ||||||
| "BroadcastDependency", true, state->get_query_ctx()); | ||||||
| _broadcast_pb_blocks = | ||||||
| vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency); | ||||||
| for (int i = 0; i < config::num_broadcast_buffer; ++i) { | ||||||
| _broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared()); | ||||||
| } | ||||||
| _exchange_sink_dependency->add_child(_broadcast_dependency); | ||||||
|
|
||||||
|
|
@@ -338,7 +338,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block | |||||
| } | ||||||
| } | ||||||
| } else { | ||||||
| vectorized::BroadcastPBlockHolder* block_holder = nullptr; | ||||||
| std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr; | ||||||
| RETURN_IF_ERROR(local_state.get_next_available_buffer(&block_holder)); | ||||||
| { | ||||||
| SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); | ||||||
|
|
@@ -355,22 +355,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block | |||||
| } else { | ||||||
| block_holder->get_block()->Clear(); | ||||||
| } | ||||||
| local_state._broadcast_dependency->take_available_block(); | ||||||
| block_holder->ref(local_state.channels.size()); | ||||||
| for (auto channel : local_state.channels) { | ||||||
| if (!channel->is_receiver_eof()) { | ||||||
| Status status; | ||||||
| if (channel->is_local()) { | ||||||
| block_holder->unref(); | ||||||
| status = channel->send_local_block(&cur_block); | ||||||
| } else { | ||||||
| SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); | ||||||
| status = channel->send_broadcast_block( | ||||||
| block_holder, source_state == SourceState::FINISHED); | ||||||
| } | ||||||
| HANDLE_CHANNEL_STATUS(state, channel, status); | ||||||
| } else { | ||||||
| block_holder->unref(); | ||||||
| } | ||||||
| } | ||||||
| cur_block.clear_column_data(); | ||||||
|
|
@@ -454,21 +449,16 @@ void ExchangeSinkLocalState::register_channels( | |||||
| } | ||||||
|
|
||||||
| Status ExchangeSinkLocalState::get_next_available_buffer( | ||||||
| vectorized::BroadcastPBlockHolder** holder) { | ||||||
| std::shared_ptr<vectorized::BroadcastPBlockHolder>* holder) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'd better keep the pattern. If the parameter is the output parameter, then use pointer not reference. |
||||||
| // This condition means we need use broadcast buffer, so we should make sure | ||||||
| // there are available buffer before running pipeline | ||||||
| for (size_t broadcast_pb_block_idx = 0; broadcast_pb_block_idx < _broadcast_pb_blocks.size(); | ||||||
| broadcast_pb_block_idx++) { | ||||||
| if (_broadcast_pb_blocks[broadcast_pb_block_idx].available()) { | ||||||
| *holder = &_broadcast_pb_blocks[broadcast_pb_block_idx]; | ||||||
| return Status::OK(); | ||||||
| } | ||||||
| if (_broadcast_pb_blocks->empty()) { | ||||||
| return Status::InternalError("No broadcast buffer left! Dependency: {}", | ||||||
| _broadcast_dependency->debug_string()); | ||||||
| } else { | ||||||
| *holder = _broadcast_pb_blocks->pop(); | ||||||
| return Status::OK(); | ||||||
| } | ||||||
| return Status::InternalError("No broadcast buffer left! Available blocks: " + | ||||||
| std::to_string(_broadcast_dependency->available_blocks()) + | ||||||
| " and number of buffer is " + | ||||||
| std::to_string(_broadcast_pb_blocks.size()) + | ||||||
| " Dependency: " + _broadcast_dependency->debug_string()); | ||||||
| } | ||||||
|
|
||||||
| template <typename Channels, typename HashValueType> | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.