Skip to content

Commit

Permalink
[BugFix] aggregate query statistic from all executor nodes (#11801)
Browse files Browse the repository at this point in the history
(cherry picked from commit e18e679)
  • Loading branch information
murphyatwork authored and wanpengfei-git committed Oct 17, 2022
1 parent 291da0d commit d7c859d
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 78 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Status ExchangeNode::open(RuntimeState* state) {

Status ExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->merge(_sub_plan_query_statistics_recvr.get());
_sub_plan_query_statistics_recvr->aggregate(statistics);
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
namespace starrocks::pipeline {
Status ExchangeMergeSortSourceOperator::prepare(RuntimeState* state) {
SourceOperator::prepare(state);
auto query_statistic_recv = state->query_recv();
_stream_recvr = state->exec_env()->stream_mgr()->create_recvr(
state, _row_desc, state->fragment_instance_id(), _plan_node_id, _num_sender,
config::exchg_node_buffer_size_bytes, _unique_metrics, true, nullptr, true,
config::exchg_node_buffer_size_bytes, _unique_metrics, true, query_statistic_recv, true,
// ExchangeMergeSort will never perform pipeline level shuffle
DataStreamRecvr::INVALID_DOP_FOR_NON_PIPELINE_LEVEL_SHUFFLE, true);
return _stream_recvr->create_merger_for_pipeline(state, _sort_exec_exprs, &_is_asc_order, &_nulls_first);
Expand Down
40 changes: 23 additions & 17 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ class ExchangeSinkOperator::Channel {
Status init(RuntimeState* state);

// Send one chunk to remote, this chunk may be batched in this channel.
Status send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos);
Status send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos);

// Send one chunk to remote, this chunk may be batched in this channel.
// When the chunk is sent really rather than bachend, *is_real_sent will
// be set to true.
Status send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos, bool* is_real_sent);
Status send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos,
bool* is_real_sent);

// Channel will sent input request directly without batch it.
// This function is only used when broadcast, because request can be reused
// by all the channels.
Status send_chunk_request(PTransmitChunkParamsPtr chunk_request, const butil::IOBuf& attachment,
int64_t attachment_physical_bytes);
Status send_chunk_request(RuntimeState* state, PTransmitChunkParamsPtr chunk_request,
const butil::IOBuf& attachment, int64_t attachment_physical_bytes);

// Used when doing shuffle.
// This function will copy selective rows in chunks to batch.
Expand Down Expand Up @@ -165,7 +166,7 @@ Status ExchangeSinkOperator::Channel::add_rows_selective(vectorized::Chunk* chun
}

if (_chunks[driver_sequence]->num_rows() + size > state->chunk_size()) {
RETURN_IF_ERROR(send_one_chunk(_chunks[driver_sequence].get(), driver_sequence, false));
RETURN_IF_ERROR(send_one_chunk(state, _chunks[driver_sequence].get(), driver_sequence, false));
// we only clear column data, because we need to reuse column schema
_chunks[driver_sequence]->set_num_rows(0);
}
Expand All @@ -174,14 +175,14 @@ Status ExchangeSinkOperator::Channel::add_rows_selective(vectorized::Chunk* chun
return Status::OK();
}

Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence,
bool eos) {
Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk,
int32_t driver_sequence, bool eos) {
bool is_real_sent = false;
return send_one_chunk(chunk, driver_sequence, eos, &is_real_sent);
return send_one_chunk(state, chunk, driver_sequence, eos, &is_real_sent);
}

Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos,
bool* is_real_sent) {
Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk,
int32_t driver_sequence, bool eos, bool* is_real_sent) {
*is_real_sent = false;
if (_chunk_request == nullptr) {
_chunk_request = std::make_shared<PTransmitChunkParams>();
Expand Down Expand Up @@ -218,6 +219,8 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* ch
if (_current_request_bytes > config::max_transmit_batched_bytes || eos) {
_chunk_request->set_eos(eos);
_chunk_request->set_use_pass_through(_use_pass_through);
auto delta_statistic = state->intermediate_query_statistic();
delta_statistic->to_pb(_chunk_request->mutable_query_statistics());
butil::IOBuf attachment;
int64_t attachment_physical_bytes = _parent->construct_brpc_attachment(_chunk_request, attachment);
TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(_chunk_request), attachment,
Expand All @@ -231,7 +234,7 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* ch
return Status::OK();
}

Status ExchangeSinkOperator::Channel::send_chunk_request(PTransmitChunkParamsPtr chunk_request,
Status ExchangeSinkOperator::Channel::send_chunk_request(RuntimeState* state, PTransmitChunkParamsPtr chunk_request,
const butil::IOBuf& attachment,
int64_t attachment_physical_bytes) {
chunk_request->set_node_id(_dest_node_id);
Expand All @@ -240,6 +243,9 @@ Status ExchangeSinkOperator::Channel::send_chunk_request(PTransmitChunkParamsPtr
chunk_request->set_eos(false);
chunk_request->set_use_pass_through(_use_pass_through);

auto delta_statistic = state->intermediate_query_statistic();
delta_statistic->to_pb(chunk_request->mutable_query_statistics());

TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(chunk_request), attachment,
attachment_physical_bytes};
_parent->_buffer->add_request(info);
Expand All @@ -256,10 +262,10 @@ Status ExchangeSinkOperator::Channel::_close_internal(RuntimeState* state, Fragm
if (!fragment_ctx->is_canceled()) {
for (auto driver_sequence = 0; driver_sequence < _chunks.size(); ++driver_sequence) {
if (_chunks[driver_sequence] != nullptr) {
RETURN_IF_ERROR(send_one_chunk(_chunks[driver_sequence].get(), driver_sequence, false));
RETURN_IF_ERROR(send_one_chunk(state, _chunks[driver_sequence].get(), driver_sequence, false));
}
}
RETURN_IF_ERROR(send_one_chunk(nullptr, ExchangeSinkOperator::DEFAULT_DRIVER_SEQUENCE, true));
RETURN_IF_ERROR(send_one_chunk(state, nullptr, ExchangeSinkOperator::DEFAULT_DRIVER_SEQUENCE, true));
}

return Status::OK();
Expand Down Expand Up @@ -413,7 +419,7 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C
int has_not_pass_through = false;
for (auto idx : _channel_indices) {
if (_channels[idx]->use_pass_through()) {
RETURN_IF_ERROR(_channels[idx]->send_one_chunk(send_chunk, DEFAULT_DRIVER_SEQUENCE, false));
RETURN_IF_ERROR(_channels[idx]->send_one_chunk(state, send_chunk, DEFAULT_DRIVER_SEQUENCE, false));
} else {
has_not_pass_through = true;
}
Expand All @@ -437,7 +443,7 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C
if (!_channels[idx]->use_pass_through()) {
PTransmitChunkParamsPtr copy = std::make_shared<PTransmitChunkParams>(*_chunk_request);
RETURN_IF_ERROR(
_channels[idx]->send_chunk_request(copy, attachment, attachment_physical_bytes));
_channels[idx]->send_chunk_request(state, copy, attachment, attachment_physical_bytes));
}
}
_current_request_bytes = 0;
Expand All @@ -450,7 +456,7 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C
// 1. Get request of that channel
auto& channel = _channels[_curr_random_channel_idx];
bool real_sent = false;
RETURN_IF_ERROR(channel->send_one_chunk(send_chunk, DEFAULT_DRIVER_SEQUENCE, false, &real_sent));
RETURN_IF_ERROR(channel->send_one_chunk(state, send_chunk, DEFAULT_DRIVER_SEQUENCE, false, &real_sent));
if (real_sent) {
_curr_random_channel_idx = (_curr_random_channel_idx + 1) % _channels.size();
}
Expand Down Expand Up @@ -534,7 +540,7 @@ Status ExchangeSinkOperator::set_finishing(RuntimeState* state) {
int64_t attachment_physical_bytes = construct_brpc_attachment(_chunk_request, attachment);
for (const auto& channel : _channels) {
PTransmitChunkParamsPtr copy = std::make_shared<PTransmitChunkParams>(*_chunk_request);
channel->send_chunk_request(copy, attachment, attachment_physical_bytes);
channel->send_chunk_request(state, copy, attachment, attachment_physical_bytes);
}
_current_request_bytes = 0;
_chunk_request.reset();
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/pipeline/exchange/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ std::shared_ptr<DataStreamRecvr> ExchangeSourceOperatorFactory::create_stream_re
if (_stream_recvr != nullptr) {
return _stream_recvr;
}
auto query_statistic_recv = state->query_recv();
_stream_recvr = state->exec_env()->stream_mgr()->create_recvr(
state, _row_desc, state->fragment_instance_id(), _plan_node_id, _num_sender,
config::exchg_node_buffer_size_bytes, profile, false, nullptr, true, _degree_of_parallelism, false);
config::exchg_node_buffer_size_bytes, profile, false, query_statistic_recv, true, _degree_of_parallelism,
false);
return _stream_recvr;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const TExec
// Set up sink if required
std::unique_ptr<DataSink> sink;
if (fragment.__isset.output_sink) {
if (fragment.output_sink.type == TDataSinkType::RESULT_SINK) {
_query_ctx->set_result_sink(true);
}
RowDescriptor row_desc;
RETURN_IF_ERROR(DataSink::create_data_sink(runtime_state, fragment.output_sink, fragment.output_exprs, params,
row_desc, &sink));
Expand Down
34 changes: 33 additions & 1 deletion be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
#include "runtime/current_thread.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_filter_cache.h"
#include "util/thread.h"

namespace starrocks::pipeline {
QueryContext::QueryContext()
: _fragment_mgr(new FragmentContextManager()),
_total_fragments(0),
_num_fragments(0),
_num_active_fragments(0) {}
_num_active_fragments(0) {
_sub_plan_query_statistics_recvr = std::make_shared<QueryStatisticsRecvr>();
}

QueryContext::~QueryContext() {
// When destruct FragmentContextManager, we use query-level MemTracker. since when PipelineDriver executor
Expand Down Expand Up @@ -94,6 +98,34 @@ Status QueryContext::init_query(workgroup::WorkGroup* wg) {
return st;
}

std::shared_ptr<QueryStatisticsRecvr> QueryContext::maintained_query_recv() {
return _sub_plan_query_statistics_recvr;
}

std::shared_ptr<QueryStatistics> QueryContext::intermediate_query_statistic() {
auto query_statistic = std::make_shared<QueryStatistics>();
// Not transmit delta if it's the result sink node
if (_is_result_sink) {
return query_statistic;
}
query_statistic->add_scan_stats(_delta_scan_rows_num.exchange(0), _delta_scan_bytes.exchange(0));
query_statistic->add_cpu_costs(_delta_cpu_cost_ns.exchange(0));
query_statistic->add_mem_costs(mem_cost_bytes());
_sub_plan_query_statistics_recvr->aggregate(query_statistic.get());
return query_statistic;
}

std::shared_ptr<QueryStatistics> QueryContext::final_query_statistic() {
DCHECK(_is_result_sink) << "must be the result sink";
auto res = std::make_shared<QueryStatistics>();
res->add_scan_stats(_total_scan_rows_num, _total_scan_bytes);
res->add_cpu_costs(_total_cpu_cost_ns);
res->add_mem_costs(mem_cost_bytes());

_sub_plan_query_statistics_recvr->aggregate(res.get());
return res;
}

QueryContextManager::QueryContextManager(size_t log2_num_slots)
: _num_slots(1 << log2_num_slots),
_slot_mask(_num_slots - 1),
Expand Down
41 changes: 32 additions & 9 deletions be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "exec/pipeline/pipeline_fwd.h"
#include "gen_cpp/InternalService_types.h" // for TQueryOptions
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "util/hash_util.hpp"
#include "util/time.h"
Expand Down Expand Up @@ -96,13 +97,22 @@ class QueryContext {
Status init_query(workgroup::WorkGroup* wg);

// Some statistic about the query, including cpu, scan_rows, scan_bytes
void incr_cpu_cost(int64_t cost) { _cur_cpu_cost_ns += cost; }
int64_t cpu_cost() const { return _cur_cpu_cost_ns; }
int64_t mem_cost_bytes() const { return _mem_tracker->peak_consumption(); }
void incr_cur_scan_rows_num(int64_t rows_num) { _cur_scan_rows_num += rows_num; }
int64_t cur_scan_rows_num() const { return _cur_scan_rows_num; }
void incr_cur_scan_bytes(int64_t scan_bytes) { _cur_scan_bytes += scan_bytes; }
int64_t get_scan_bytes() const { return _cur_scan_bytes; }
void incr_cpu_cost(int64_t cost) {
_total_cpu_cost_ns += cost;
_delta_cpu_cost_ns += cost;
}
void incr_cur_scan_rows_num(int64_t rows_num) {
_total_scan_rows_num += rows_num;
_delta_scan_rows_num += rows_num;
}
void incr_cur_scan_bytes(int64_t scan_bytes) {
_total_scan_bytes += scan_bytes;
_delta_scan_bytes += scan_bytes;
}
int64_t cpu_cost() const { return _total_cpu_cost_ns; }
int64_t cur_scan_rows_num() const { return _total_scan_rows_num; }
int64_t get_scan_bytes() const { return _total_scan_bytes; }

// Query start time, used to check how long the query has been running
// To ensure that the minimum run time of the query will not be killed by the big query checking mechanism
Expand All @@ -112,6 +122,14 @@ class QueryContext {
void set_scan_limit(int64_t scan_limit) { _scan_limit = scan_limit; }
int64_t get_scan_limit() const { return _scan_limit; }

// Delta statistic since last retrieve
std::shared_ptr<QueryStatistics> intermediate_query_statistic();
// Merged statistic from all executor nodes
std::shared_ptr<QueryStatistics> final_query_statistic();
std::shared_ptr<QueryStatisticsRecvr> maintained_query_recv();
bool is_result_sink() const { return _is_result_sink; }
void set_result_sink(bool value) { _is_result_sink = value; }

public:
static constexpr int DEFAULT_EXPIRE_SECONDS = 300;

Expand All @@ -135,9 +153,14 @@ class QueryContext {

std::once_flag _init_query_once;
int64_t _query_begin_time = 0;
std::atomic<int64_t> _cur_cpu_cost_ns = 0;
std::atomic<int64_t> _cur_scan_rows_num = 0;
std::atomic<int64_t> _cur_scan_bytes = 0;
std::atomic<int64_t> _total_cpu_cost_ns = 0;
std::atomic<int64_t> _total_scan_rows_num = 0;
std::atomic<int64_t> _total_scan_bytes = 0;
std::atomic<int64_t> _delta_cpu_cost_ns = 0;
std::atomic<int64_t> _delta_scan_rows_num = 0;
std::atomic<int64_t> _delta_scan_bytes = 0;
bool _is_result_sink = false;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr; // For receive

int64_t _scan_limit = 0;
};
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/pipeline/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@ void ResultSinkOperator::close(RuntimeState* state) {
// the visibility of _num_written_rows is guaranteed by _num_result_sinkers.fetch_sub().
_sender->update_num_written_rows(_num_written_rows.load(std::memory_order_relaxed));

auto query_statistic = std::make_shared<QueryStatistics>();
QueryContext* query_ctx = state->query_ctx();
query_statistic->add_scan_stats(query_ctx->cur_scan_rows_num(), query_ctx->get_scan_bytes());
query_statistic->add_cpu_costs(query_ctx->cpu_cost());
query_statistic->add_mem_costs(query_ctx->mem_cost_bytes());
auto query_statistic = query_ctx->final_query_statistic();
query_statistic->set_returned_rows(_num_written_rows);
_sender->set_query_statistics(query_statistic);

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/data_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ std::shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
DCHECK(pass_through_chunk_buffer != nullptr);
std::shared_ptr<DataStreamRecvr> recvr(
new DataStreamRecvr(this, state, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging,
buffer_size, profile, std::move(sub_plan_query_statistics_recvr), is_pipeline,
buffer_size, profile, sub_plan_query_statistics_recvr, is_pipeline,
degree_of_parallelism, keep_order, pass_through_chunk_buffer));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::lock_guard<std::mutex> l(_lock);
Expand Down
48 changes: 45 additions & 3 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,50 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
*statistics->mutable_stats_items() = {_stats_items.begin(), _stats_items.end()};
}

void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
recvr->merge(this);
void QueryStatistics::clear() {
scan_rows = 0;
scan_bytes = 0;
cpu_ns = 0;
returned_rows = 0;
_stats_items.clear();
}

void QueryStatistics::add_stats_item(QueryStatisticsItemPB& stats_item) {
this->_stats_items.emplace_back(stats_item);
this->scan_rows += stats_item.scan_rows();
this->scan_bytes += stats_item.scan_bytes();
}

void QueryStatistics::add_scan_stats(int64_t scan_rows, int64_t scan_bytes) {
this->scan_rows += scan_rows;
this->scan_bytes += scan_bytes;
}

void QueryStatistics::merge(int sender_id, QueryStatistics& other) {
// Make the exchange action atomic
int64_t rows = other.scan_rows.load();
scan_rows += rows;
other.scan_rows -= rows;

int64_t bytes = other.scan_bytes.load();
scan_bytes += bytes;
other.scan_bytes -= bytes;

int64_t cpu_ns = other.cpu_ns.load();
cpu_ns += cpu_ns;
other.cpu_ns -= cpu_ns;

int64_t mem_bytes = other.mem_cost_bytes.load();
mem_cost_bytes = std::max<int64_t>(mem_cost_bytes, mem_bytes);

_stats_items.insert(_stats_items.end(), other._stats_items.begin(), other._stats_items.end());
}

void QueryStatistics::merge_pb(const PQueryStatistics& statistics) {
scan_rows += statistics.scan_rows();
scan_bytes += statistics.scan_bytes();
cpu_ns += statistics.cpu_cost_ns();
mem_cost_bytes += statistics.mem_cost_bytes();
mem_cost_bytes = std::max<int64_t>(mem_cost_bytes, statistics.mem_cost_bytes());
_stats_items.insert(_stats_items.end(), statistics.stats_items().begin(), statistics.stats_items().end());
}

Expand All @@ -58,6 +93,13 @@ void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender
query_statistics->merge_pb(statistics);
}

void QueryStatisticsRecvr::aggregate(QueryStatistics* statistics) {
std::lock_guard<SpinLock> l(_lock);
for (auto& pair : _query_statistics) {
statistics->merge(pair.first, *pair.second);
}
}

QueryStatisticsRecvr::~QueryStatisticsRecvr() {
// It is unnecessary to lock here, because the destructor will be
// called alter DataStreamRecvr's close in ExchangeNode.
Expand Down
Loading

0 comments on commit d7c859d

Please sign in to comment.