From cd07b2df7955daddbcdf5bb82a2768be1d43aa48 Mon Sep 17 00:00:00 2001 From: Zhuhe Fang Date: Mon, 18 Dec 2023 16:04:59 +0800 Subject: [PATCH] [Refactor] reduce sending audit stats in exchange (#37083) Signed-off-by: Zhuhe Fang --- .../exchange/exchange_sink_operator.cpp | 8 --- be/src/exec/pipeline/exchange/sink_buffer.cpp | 52 +++++++------------ be/src/exec/pipeline/exchange/sink_buffer.h | 7 +-- be/src/exec/pipeline/query_context.cpp | 2 +- be/src/runtime/runtime_state.cpp | 4 -- be/src/runtime/runtime_state.h | 1 - 6 files changed, 24 insertions(+), 50 deletions(-) diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp index a3a66cc83342d5..3dda60a4415fbf 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp @@ -258,9 +258,6 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const if (_current_request_bytes > config::max_transmit_batched_bytes || eos) { _chunk_request->set_eos(eos); _chunk_request->set_use_pass_through(_use_pass_through); - if (auto delta_statistic = state->intermediate_query_statistic()) { - delta_statistic->to_pb(_chunk_request->mutable_query_statistics()); - } butil::IOBuf attachment; int64_t attachment_physical_bytes = _parent->construct_brpc_attachment(_chunk_request, attachment); TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(_chunk_request), attachment, @@ -285,11 +282,6 @@ Status ExchangeSinkOperator::Channel::send_chunk_request(RuntimeState* state, PT chunk_request->set_be_number(_parent->_be_number); chunk_request->set_eos(false); chunk_request->set_use_pass_through(_use_pass_through); - - if (auto delta_statistic = state->intermediate_query_statistic()) { - delta_statistic->to_pb(chunk_request->mutable_query_statistics()); - } - TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(chunk_request), attachment, attachment_physical_bytes, _brpc_dest_addr}; RETURN_IF_ERROR(_parent->_buffer->add_request(info)); diff --git a/be/src/exec/pipeline/exchange/sink_buffer.cpp b/be/src/exec/pipeline/exchange/sink_buffer.cpp index d156e5d6acad86..77f0fbbe9831e5 100644 --- a/be/src/exec/pipeline/exchange/sink_buffer.cpp +++ b/be/src/exec/pipeline/exchange/sink_buffer.cpp @@ -30,7 +30,9 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vectorruntime_state()->instance_mem_tracker()), _brpc_timeout_ms(fragment_ctx->runtime_state()->query_options().query_timeout * 1000), _is_dest_merge(is_dest_merge), - _rpc_http_min_size(fragment_ctx->runtime_state()->get_rpc_http_min_size()) { + _rpc_http_min_size(fragment_ctx->runtime_state()->get_rpc_http_min_size()), + _sent_audit_stats_frequency_upper_limit( + std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->num_drivers() * 4))) { for (const auto& dest : destinations) { const auto& instance_id = dest.fragment_instance_id; // instance_id.lo == -1 indicates that the destination is pseudo for bucket shuffle join. @@ -48,7 +50,6 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector(); _mutexes[instance_id.lo] = std::make_unique(); _dest_addrs[instance_id.lo] = dest.brpc_server; @@ -88,6 +89,19 @@ Status SinkBuffer::add_request(TransmitChunkInfo& request) { _request_enqueued++; } { + // set stats every _sent_audit_stats_frequency, so FE can get approximate stats even missing eos chunks. + // _sent_audit_stats_frequency grows exponentially to reduce the costs of collecting stats but + // let the first (limited) chunks' stats approach truth。 + auto request_sequence = _request_sequence++; + if (!request.params->eos() && (request_sequence & (_sent_audit_stats_frequency - 1)) == 0) { + if (_sent_audit_stats_frequency < _sent_audit_stats_frequency_upper_limit) { + _sent_audit_stats_frequency = _sent_audit_stats_frequency << 1; + } + if (auto part_stats = _fragment_ctx->runtime_state()->query_ctx()->intermediate_query_statistic()) { + part_stats->to_pb(request.params->mutable_query_statistics()); + } + } + auto& instance_id = request.fragment_instance_id; RETURN_IF_ERROR(_try_to_send_rpc(instance_id, [&]() { _buffers[instance_id.lo].push(request); })); } @@ -230,31 +244,6 @@ void SinkBuffer::_process_send_window(const TUniqueId& instance_id, const int64_ } } -void SinkBuffer::_try_to_merge_query_statistics(TransmitChunkInfo& request) { - if (!request.params->has_query_statistics()) { - return; - } - auto& query_statistics = request.params->query_statistics(); - bool need_merge = false; - if (query_statistics.scan_rows() > 0 || query_statistics.scan_bytes() > 0 || query_statistics.cpu_cost_ns() > 0) { - need_merge = true; - } - if (!need_merge && query_statistics.stats_items_size() > 0) { - for (int i = 0; i < query_statistics.stats_items_size(); i++) { - const auto& stats_item = query_statistics.stats_items(i); - if (stats_item.scan_rows() > 0 || stats_item.scan_bytes()) { - need_merge = true; - break; - } - } - } - if (need_merge) { - auto& instance_id = request.fragment_instance_id; - _eos_query_stats[instance_id.lo]->merge_pb(query_statistics); - request.params->clear_query_statistics(); - } -} - Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::function& pre_works) { std::lock_guard l(*_mutexes[instance_id.lo]); pre_works(); @@ -319,8 +308,6 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun // eos is the last packet to send to finish the input stream of the corresponding of // ExchangeSourceOperator and eos is sent exactly-once. if (_num_sinkers[instance_id.lo] > 1) { - // to reduce uncessary rpc requests, we merge all query statistics in eos requests into one and send it through the last eos request - _try_to_merge_query_statistics(request); if (request.params->chunks_size() == 0) { continue; } else { @@ -335,10 +322,9 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun return Status::OK(); } // this is the last eos query, set query stats - _eos_query_stats[instance_id.lo]->merge_pb(request.params->query_statistics()); - request.params->clear_query_statistics(); - _eos_query_stats[instance_id.lo]->to_pb(request.params->mutable_query_statistics()); - _eos_query_stats[instance_id.lo]->clear(); + if (auto final_stats = _fragment_ctx->runtime_state()->query_ctx()->intermediate_query_statistic()) { + final_stats->to_pb(request.params->mutable_query_statistics()); + } } } diff --git a/be/src/exec/pipeline/exchange/sink_buffer.h b/be/src/exec/pipeline/exchange/sink_buffer.h index 6baae3bf09ecc2..a8745304a6c974 100644 --- a/be/src/exec/pipeline/exchange/sink_buffer.h +++ b/be/src/exec/pipeline/exchange/sink_buffer.h @@ -124,8 +124,6 @@ class SinkBuffer { // And we just pick the maximum accumulated_network_time among all destination int64_t _network_time(); - void _try_to_merge_query_statistics(TransmitChunkInfo& request); - FragmentContext* _fragment_ctx; MemTracker* const _mem_tracker; const int32_t _brpc_timeout_ms; @@ -157,7 +155,6 @@ class SinkBuffer { phmap::flat_hash_map _num_finished_rpcs; phmap::flat_hash_map _num_in_flight_rpcs; phmap::flat_hash_map _network_times; - phmap::flat_hash_map> _eos_query_stats; phmap::flat_hash_map> _mutexes; phmap::flat_hash_map _dest_addrs; @@ -192,6 +189,10 @@ class SinkBuffer { int64_t _first_send_time = -1; int64_t _last_receive_time = -1; int64_t _rpc_http_min_size = 0; + + std::atomic _request_sequence = 0; + int64_t _sent_audit_stats_frequency = 1; + int64_t _sent_audit_stats_frequency_upper_limit = 64; }; } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/query_context.cpp b/be/src/exec/pipeline/query_context.cpp index 80fa7393585601..3e541a00401735 100644 --- a/be/src/exec/pipeline/query_context.cpp +++ b/be/src/exec/pipeline/query_context.cpp @@ -165,7 +165,7 @@ std::shared_ptr QueryContext::intermediate_query_statistic() { auto query_statistic = std::make_shared(); // Not transmit delta if it's the final sink if (_is_final_sink) { - return query_statistic; + return nullptr; } query_statistic->add_cpu_costs(_delta_cpu_cost_ns.exchange(0)); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 597b6c70333f06..d63d07a7769f24 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -500,10 +500,6 @@ Status RuntimeState::_build_global_dict(const GlobalDictLists& global_dict_list, return Status::OK(); } -std::shared_ptr RuntimeState::intermediate_query_statistic() { - return _query_ctx->intermediate_query_statistic(); -} - std::shared_ptr RuntimeState::query_recv() { return _query_ctx->maintained_query_recv(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 8280d27fa1ae5d..5c3e874ee368db 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -412,7 +412,6 @@ class RuntimeState { void set_enable_pipeline_engine(bool enable_pipeline_engine) { _enable_pipeline_engine = enable_pipeline_engine; } bool enable_pipeline_engine() const { return _enable_pipeline_engine; } - std::shared_ptr intermediate_query_statistic(); std::shared_ptr query_recv(); [[nodiscard]] Status reset_epoch();