Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,40 @@
#include "vec/sink/vdata_stream_sender.h"

namespace doris {

namespace vectorized {
BroadcastPBlockHolder::~BroadcastPBlockHolder() {
// lock the parent queue, if the queue could lock success, then return the block
// to the queue, to reuse the block
std::shared_ptr<BroadcastPBlockHolderQueue> tmp_queue = _parent_creator.lock();
if (tmp_queue != nullptr) {
tmp_queue->push(BroadcastPBlockHolder::create_shared(std::move(_pblock)));
}
// If the queue already deconstruted, then release pblock automatically since it
// is a unique ptr.
}

void BroadcastPBlockHolder::unref() noexcept {
DCHECK_GT(_ref_count._value, 0);
auto old_value = _ref_count._value.fetch_sub(1);
if (_dep && old_value == 1) {
_dep->return_available_block();
void BroadcastPBlockHolderQueue::push(std::shared_ptr<BroadcastPBlockHolder> holder) {
std::unique_lock l(_holders_lock);
holder->set_parent_creator(shared_from_this());
_holders.push(holder);
if (_broadcast_dependency) {
_broadcast_dependency->set_ready();
}
}

std::shared_ptr<BroadcastPBlockHolder> BroadcastPBlockHolderQueue::pop() {
std::unique_lock l(_holders_lock);
if (_holders.empty()) {
return {};
}
std::shared_ptr<BroadcastPBlockHolder> res = _holders.top();
_holders.pop();
if (_holders.empty() && _broadcast_dependency != nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DCHECK(_broadcast_dependency != nullptr);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C ould not add this decheck ,because pipeline also use this code.

_broadcast_dependency->block();
}
return res;
}
} // namespace vectorized

namespace pipeline {
Expand Down Expand Up @@ -184,12 +208,10 @@ Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
template <typename Parent>
Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& request) {
if (_is_finishing) {
request.block_holder->unref();
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
if (_is_receiver_eof(ins_id.lo)) {
request.block_holder->unref();
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
Expand Down Expand Up @@ -243,7 +265,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
if (!request.exec_status.ok()) {
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
}
auto send_callback = request.channel->get_send_callback(id, request.eos, nullptr);
auto send_callback = request.channel->get_send_callback(id, request.eos);

_instance_to_rpc_ctx[id]._send_callback = send_callback;
_instance_to_rpc_ctx[id].is_cancelled = false;
Expand Down Expand Up @@ -307,8 +329,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
auto statistic = brpc_request->mutable_query_statistics();
_statistics->to_pb(statistic);
}
auto send_callback =
request.channel->get_send_callback(id, request.eos, request.block_holder);
auto send_callback = request.channel->get_send_callback(id, request.eos);

ExchangeRpcContext rpc_ctx;
rpc_ctx._send_callback = send_callback;
Expand Down
60 changes: 41 additions & 19 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <memory>
#include <mutex>
#include <queue>
#include <stack>
#include <string>

#include "common/global_types.h"
Expand All @@ -45,7 +46,6 @@ class TUniqueId;
using InstanceLoId = int64_t;

namespace pipeline {
class BroadcastDependency;
class ExchangeSinkQueueDependency;
class Dependency;
} // namespace pipeline
Expand All @@ -71,25 +71,52 @@ struct AtomicWrapper {
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
// will be shared between different channel, so we have to use a ref count to mark if this
// PBlock is available for next serialization.
class BroadcastPBlockHolderQueue;
class BroadcastPBlockHolder {
ENABLE_FACTORY_CREATOR(BroadcastPBlockHolder);

public:
BroadcastPBlockHolder() { _pblock = std::make_unique<PBlock>(); }
BroadcastPBlockHolder(std::unique_ptr<PBlock>&& pblock) { _pblock = std::move(pblock); }
~BroadcastPBlockHolder();

PBlock* get_block() { return _pblock.get(); }

private:
friend class BroadcastPBlockHolderQueue;
std::unique_ptr<PBlock> _pblock;
std::weak_ptr<BroadcastPBlockHolderQueue> _parent_creator;
void set_parent_creator(std::shared_ptr<BroadcastPBlockHolderQueue> parent_creator) {
_parent_creator = parent_creator;
}
};

// Use a stack inside to ensure that the PBlock is in cpu cache
class BroadcastPBlockHolderQueue : public std::enable_shared_from_this<BroadcastPBlockHolderQueue> {
ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderQueue);

public:
BroadcastPBlockHolder() : _ref_count(0), _dep(nullptr) {}
BroadcastPBlockHolder(pipeline::BroadcastDependency* dep) : _ref_count(0), _dep(dep) {}
~BroadcastPBlockHolder() noexcept = default;
BroadcastPBlockHolderQueue() = default;

void ref(int delta) noexcept { _ref_count._value.fetch_add(delta); }
void unref() noexcept;
void ref() noexcept { ref(1); }
BroadcastPBlockHolderQueue(std::shared_ptr<pipeline::Dependency>& broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
}

bool available() { return _ref_count._value == 0; }
void push(std::shared_ptr<BroadcastPBlockHolder> holder);

PBlock* get_block() { return &pblock; }
bool empty() {
std::unique_lock l(_holders_lock);
return _holders.empty();
}

std::shared_ptr<BroadcastPBlockHolder> pop();

private:
AtomicWrapper<int32_t> _ref_count;
PBlock pblock;
pipeline::BroadcastDependency* _dep = nullptr;
std::stack<std::shared_ptr<BroadcastPBlockHolder>> _holders;
std::shared_ptr<pipeline::Dependency> _broadcast_dependency;
std::mutex _holders_lock;
};

} // namespace vectorized

namespace pipeline {
Expand All @@ -104,7 +131,7 @@ struct TransmitInfo {
template <typename Parent>
struct BroadcastTransmitInfo {
vectorized::PipChannel<Parent>* channel = nullptr;
vectorized::BroadcastPBlockHolder* block_holder = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
bool eos;
};

Expand All @@ -115,10 +142,9 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
public:
ExchangeSendCallback() = default;

void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) {
void init(InstanceLoId id, bool eos) {
_id = id;
_eos = eos;
_data = data;
}

~ExchangeSendCallback() override = default;
Expand All @@ -135,9 +161,6 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {

void call() noexcept override {
try {
if (_data) {
_data->unref();
}
if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
std::string err = fmt::format(
"failed to send brpc when exchange, error={}, error_text={}, client: {}, "
Expand All @@ -164,7 +187,6 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
std::function<void(const InstanceLoId&, const bool&, const Response&, const int64_t&)> _suc_fn;
InstanceLoId _id;
bool _eos;
vectorized::BroadcastPBlockHolder* _data = nullptr;
};

struct ExchangeRpcContext {
Expand Down
40 changes: 15 additions & 25 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency = BroadcastDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
_broadcast_dependency->set_available_block(config::num_broadcast_buffer);
_broadcast_pb_blocks.reserve(config::num_broadcast_buffer);
for (size_t i = 0; i < config::num_broadcast_buffer; i++) {
_broadcast_pb_blocks.emplace_back(
vectorized::BroadcastPBlockHolder(_broadcast_dependency.get()));
_broadcast_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true, state->get_query_ctx());
_broadcast_pb_blocks =
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
for (int i = 0; i < config::num_broadcast_buffer; ++i) {
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
}
_exchange_sink_dependency->add_child(_broadcast_dependency);

Expand Down Expand Up @@ -338,7 +338,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
}
} else {
vectorized::BroadcastPBlockHolder* block_holder = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
RETURN_IF_ERROR(local_state.get_next_available_buffer(&block_holder));
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand All @@ -355,22 +355,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
} else {
block_holder->get_block()->Clear();
}
local_state._broadcast_dependency->take_available_block();
block_holder->ref(local_state.channels.size());
for (auto channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
block_holder->unref();
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(
block_holder, source_state == SourceState::FINISHED);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
} else {
block_holder->unref();
}
}
cur_block.clear_column_data();
Expand Down Expand Up @@ -454,21 +449,16 @@ void ExchangeSinkLocalState::register_channels(
}

Status ExchangeSinkLocalState::get_next_available_buffer(
vectorized::BroadcastPBlockHolder** holder) {
std::shared_ptr<vectorized::BroadcastPBlockHolder>* holder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<vectorized::BroadcastPBlockHolder>* holder) {
std::shared_ptr<vectorized::BroadcastPBlockHolder>& holder) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better keep the pattern. If the parameter is the output parameter, then use pointer not reference.

// This condition means we need use broadcast buffer, so we should make sure
// there are available buffer before running pipeline
for (size_t broadcast_pb_block_idx = 0; broadcast_pb_block_idx < _broadcast_pb_blocks.size();
broadcast_pb_block_idx++) {
if (_broadcast_pb_blocks[broadcast_pb_block_idx].available()) {
*holder = &_broadcast_pb_blocks[broadcast_pb_block_idx];
return Status::OK();
}
if (_broadcast_pb_blocks->empty()) {
return Status::InternalError("No broadcast buffer left! Dependency: {}",
_broadcast_dependency->debug_string());
} else {
*holder = _broadcast_pb_blocks->pop();
return Status::OK();
}
return Status::InternalError("No broadcast buffer left! Available blocks: " +
std::to_string(_broadcast_dependency->available_blocks()) +
" and number of buffer is " +
std::to_string(_broadcast_pb_blocks.size()) +
" Dependency: " + _broadcast_dependency->debug_string());
}

template <typename Channels, typename HashValueType>
Expand Down
51 changes: 3 additions & 48 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,51 +72,6 @@ class ExchangeSinkQueueDependency final : public Dependency {
~ExchangeSinkQueueDependency() override = default;
};

class BroadcastDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(BroadcastDependency);
BroadcastDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "BroadcastDependency", true, query_ctx),
_available_block(0) {}
~BroadcastDependency() override = default;

std::string debug_string(int indentation_level = 0) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _available_block = {}",
std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(), _ready, _available_block.load());
return fmt::to_string(debug_string_buffer);
}

void set_available_block(int available_block) { _available_block = available_block; }

void return_available_block() {
if (_available_block.fetch_add(1) == 0) {
std::lock_guard<std::mutex> lock(_lock);
if (_available_block == 0) {
return;
}
Dependency::set_ready();
}
}

void take_available_block() {
if (_available_block.fetch_sub(1) == 1) {
std::lock_guard<std::mutex> lock(_lock);
if (_available_block == 0) {
Dependency::block();
}
}
}

int available_blocks() const { return _available_block; }

private:
std::atomic<int> _available_block;
std::mutex _lock;
};

/**
* We use this to control the execution for local exchange.
* +---------------+ +---------------+ +---------------+
Expand Down Expand Up @@ -165,7 +120,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependenc
Dependency* finishdependency() override { return _finish_dependency.get(); }
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder);
Status get_next_available_buffer(std::shared_ptr<vectorized::BroadcastPBlockHolder>* holder);

RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }
Expand Down Expand Up @@ -231,12 +186,12 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependenc

// Sender instance id, unique within a fragment.
int _sender_id;
std::vector<vectorized::BroadcastPBlockHolder> _broadcast_pb_blocks;
std::shared_ptr<vectorized::BroadcastPBlockHolderQueue> _broadcast_pb_blocks;

vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;

std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<BroadcastDependency> _broadcast_dependency;
std::shared_ptr<Dependency> _broadcast_dependency;
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(_block_holder.get(),
true);
status = channel->send_broadcast_block(_block_holder, true);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/result_file_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ResultFileSinkLocalState final
std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels;
bool _only_local_exchange = false;
vectorized::BlockSerializer<ResultFileSinkLocalState> _serializer;
std::unique_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
RuntimeProfile::Counter* _brpc_wait_timer = nullptr;
RuntimeProfile::Counter* _local_send_timer = nullptr;
RuntimeProfile::Counter* _brpc_send_timer = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct BasicSharedState {
};

class Dependency : public std::enable_shared_from_this<Dependency> {
ENABLE_FACTORY_CREATOR(Dependency);

public:
Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
: _id(id),
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
_start_time = VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
_execution_dependency.reset(new pipeline::Dependency(-1, -1, "ExecutionDependency", this));
_execution_dependency =
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this);
_runtime_filter_mgr.reset(
new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this)));
}
Expand Down
Loading