Skip to content

Commit

Permalink
[Refactor] reduce sending audit stats in exchange (#37083)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhuhe Fang <fzhedu@gmail.com>
  • Loading branch information
fzhedu authored Dec 18, 2023
1 parent f2be968 commit cd07b2d
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 50 deletions.
8 changes: 0 additions & 8 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,6 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const
if (_current_request_bytes > config::max_transmit_batched_bytes || eos) {
_chunk_request->set_eos(eos);
_chunk_request->set_use_pass_through(_use_pass_through);
if (auto delta_statistic = state->intermediate_query_statistic()) {
delta_statistic->to_pb(_chunk_request->mutable_query_statistics());
}
butil::IOBuf attachment;
int64_t attachment_physical_bytes = _parent->construct_brpc_attachment(_chunk_request, attachment);
TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(_chunk_request), attachment,
Expand All @@ -285,11 +282,6 @@ Status ExchangeSinkOperator::Channel::send_chunk_request(RuntimeState* state, PT
chunk_request->set_be_number(_parent->_be_number);
chunk_request->set_eos(false);
chunk_request->set_use_pass_through(_use_pass_through);

if (auto delta_statistic = state->intermediate_query_statistic()) {
delta_statistic->to_pb(chunk_request->mutable_query_statistics());
}

TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(chunk_request), attachment,
attachment_physical_bytes, _brpc_dest_addr};
RETURN_IF_ERROR(_parent->_buffer->add_request(info));
Expand Down
52 changes: 19 additions & 33 deletions be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector<TPlanFra
_mem_tracker(fragment_ctx->runtime_state()->instance_mem_tracker()),
_brpc_timeout_ms(fragment_ctx->runtime_state()->query_options().query_timeout * 1000),
_is_dest_merge(is_dest_merge),
_rpc_http_min_size(fragment_ctx->runtime_state()->get_rpc_http_min_size()) {
_rpc_http_min_size(fragment_ctx->runtime_state()->get_rpc_http_min_size()),
_sent_audit_stats_frequency_upper_limit(
std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->num_drivers() * 4))) {
for (const auto& dest : destinations) {
const auto& instance_id = dest.fragment_instance_id;
// instance_id.lo == -1 indicates that the destination is pseudo for bucket shuffle join.
Expand All @@ -48,7 +50,6 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector<TPlanFra
_num_finished_rpcs[instance_id.lo] = 0;
_num_in_flight_rpcs[instance_id.lo] = 0;
_network_times[instance_id.lo] = TimeTrace{};
_eos_query_stats[instance_id.lo] = std::make_shared<QueryStatistics>();
_mutexes[instance_id.lo] = std::make_unique<Mutex>();
_dest_addrs[instance_id.lo] = dest.brpc_server;

Expand Down Expand Up @@ -88,6 +89,19 @@ Status SinkBuffer::add_request(TransmitChunkInfo& request) {
_request_enqueued++;
}
{
// set stats every _sent_audit_stats_frequency, so FE can get approximate stats even missing eos chunks.
// _sent_audit_stats_frequency grows exponentially to reduce the costs of collecting stats but
// let the first (limited) chunks' stats approach truth。
auto request_sequence = _request_sequence++;
if (!request.params->eos() && (request_sequence & (_sent_audit_stats_frequency - 1)) == 0) {
if (_sent_audit_stats_frequency < _sent_audit_stats_frequency_upper_limit) {
_sent_audit_stats_frequency = _sent_audit_stats_frequency << 1;
}
if (auto part_stats = _fragment_ctx->runtime_state()->query_ctx()->intermediate_query_statistic()) {
part_stats->to_pb(request.params->mutable_query_statistics());
}
}

auto& instance_id = request.fragment_instance_id;
RETURN_IF_ERROR(_try_to_send_rpc(instance_id, [&]() { _buffers[instance_id.lo].push(request); }));
}
Expand Down Expand Up @@ -230,31 +244,6 @@ void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_
}
}

void SinkBuffer::_try_to_merge_query_statistics(TransmitChunkInfo& request) {
if (!request.params->has_query_statistics()) {
return;
}
auto& query_statistics = request.params->query_statistics();
bool need_merge = false;
if (query_statistics.scan_rows() > 0 || query_statistics.scan_bytes() > 0 || query_statistics.cpu_cost_ns() > 0) {
need_merge = true;
}
if (!need_merge && query_statistics.stats_items_size() > 0) {
for (int i = 0; i < query_statistics.stats_items_size(); i++) {
const auto& stats_item = query_statistics.stats_items(i);
if (stats_item.scan_rows() > 0 || stats_item.scan_bytes()) {
need_merge = true;
break;
}
}
}
if (need_merge) {
auto& instance_id = request.fragment_instance_id;
_eos_query_stats[instance_id.lo]->merge_pb(query_statistics);
request.params->clear_query_statistics();
}
}

Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function<void()>& pre_works) {
std::lock_guard<Mutex> l(*_mutexes[instance_id.lo]);
pre_works();
Expand Down Expand Up @@ -319,8 +308,6 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
// eos is the last packet to send to finish the input stream of the corresponding of
// ExchangeSourceOperator and eos is sent exactly-once.
if (_num_sinkers[instance_id.lo] > 1) {
// to reduce uncessary rpc requests, we merge all query statistics in eos requests into one and send it through the last eos request
_try_to_merge_query_statistics(request);
if (request.params->chunks_size() == 0) {
continue;
} else {
Expand All @@ -335,10 +322,9 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun
return Status::OK();
}
// this is the last eos query, set query stats
_eos_query_stats[instance_id.lo]->merge_pb(request.params->query_statistics());
request.params->clear_query_statistics();
_eos_query_stats[instance_id.lo]->to_pb(request.params->mutable_query_statistics());
_eos_query_stats[instance_id.lo]->clear();
if (auto final_stats = _fragment_ctx->runtime_state()->query_ctx()->intermediate_query_statistic()) {
final_stats->to_pb(request.params->mutable_query_statistics());
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions be/src/exec/pipeline/exchange/sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ class SinkBuffer {
// And we just pick the maximum accumulated_network_time among all destination
int64_t _network_time();

void _try_to_merge_query_statistics(TransmitChunkInfo& request);

FragmentContext* _fragment_ctx;
MemTracker* const _mem_tracker;
const int32_t _brpc_timeout_ms;
Expand Down Expand Up @@ -157,7 +155,6 @@ class SinkBuffer {
phmap::flat_hash_map<int64_t, int32_t> _num_finished_rpcs;
phmap::flat_hash_map<int64_t, int32_t> _num_in_flight_rpcs;
phmap::flat_hash_map<int64_t, TimeTrace> _network_times;
phmap::flat_hash_map<int64_t, std::shared_ptr<QueryStatistics>> _eos_query_stats;
phmap::flat_hash_map<int64_t, std::unique_ptr<Mutex>> _mutexes;
phmap::flat_hash_map<int64_t, TNetworkAddress> _dest_addrs;

Expand Down Expand Up @@ -192,6 +189,10 @@ class SinkBuffer {
int64_t _first_send_time = -1;
int64_t _last_receive_time = -1;
int64_t _rpc_http_min_size = 0;

std::atomic<int64_t> _request_sequence = 0;
int64_t _sent_audit_stats_frequency = 1;
int64_t _sent_audit_stats_frequency_upper_limit = 64;
};

} // namespace starrocks::pipeline
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ std::shared_ptr<QueryStatistics> QueryContext::intermediate_query_statistic() {
auto query_statistic = std::make_shared<QueryStatistics>();
// Not transmit delta if it's the final sink
if (_is_final_sink) {
return query_statistic;
return nullptr;
}

query_statistic->add_cpu_costs(_delta_cpu_cost_ns.exchange(0));
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,6 @@ Status RuntimeState::_build_global_dict(const GlobalDictLists& global_dict_list,
return Status::OK();
}

std::shared_ptr<QueryStatistics> RuntimeState::intermediate_query_statistic() {
return _query_ctx->intermediate_query_statistic();
}

std::shared_ptr<QueryStatisticsRecvr> RuntimeState::query_recv() {
return _query_ctx->maintained_query_recv();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ class RuntimeState {
void set_enable_pipeline_engine(bool enable_pipeline_engine) { _enable_pipeline_engine = enable_pipeline_engine; }
bool enable_pipeline_engine() const { return _enable_pipeline_engine; }

std::shared_ptr<QueryStatistics> intermediate_query_statistic();
std::shared_ptr<QueryStatisticsRecvr> query_recv();

[[nodiscard]] Status reset_epoch();
Expand Down

0 comments on commit cd07b2d

Please sign in to comment.