diff --git a/be/src/exec/pipeline/pipeline_builder.cpp b/be/src/exec/pipeline/pipeline_builder.cpp index 8c1c612af8ebe..fcea9c512a6b5 100644 --- a/be/src/exec/pipeline/pipeline_builder.cpp +++ b/be/src/exec/pipeline/pipeline_builder.cpp @@ -244,7 +244,7 @@ void PipelineBuilderContext::interpolate_spill_process(size_t plan_node_id, const SpillProcessChannelFactoryPtr& spill_channel_factory, size_t dop) { OpFactories spill_process_operators; - auto spill_process_factory = std::make_shared(next_operator_id(), "spill-process", + auto spill_process_factory = std::make_shared(next_operator_id(), "spill_process", plan_node_id, spill_channel_factory); spill_process_factory->set_degree_of_parallelism(dop); spill_process_operators.emplace_back(std::move(spill_process_factory)); diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index bb0fbda38ac8e..6248c9891556f 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -448,8 +448,6 @@ RuntimeProfile* GlobalDriverExecutor::_build_merged_instance_profile(QueryContex continue; } - _remove_non_core_metrics(query_ctx, driver_profiles); - auto* merged_driver_profile = RuntimeProfile::merge_isomorphic_profiles(query_ctx->object_pool(), driver_profiles); @@ -474,29 +472,4 @@ RuntimeProfile* GlobalDriverExecutor::_build_merged_instance_profile(QueryContex return new_instance_profile; } - -void GlobalDriverExecutor::_remove_non_core_metrics(QueryContext* query_ctx, - std::vector& driver_profiles) { - if (query_ctx->profile_level() > TPipelineProfileLevel::CORE_METRICS) { - return; - } - - for (auto* driver_profile : driver_profiles) { - driver_profile->remove_counters(std::set{"DriverTotalTime", "ActiveTime", "PendingTime"}); - - std::vector operator_profiles; - driver_profile->get_children(&operator_profiles); - - for (auto* operator_profile : operator_profiles) { - RuntimeProfile* common_metrics = operator_profile->get_child("CommonMetrics"); - DCHECK(common_metrics != nullptr); - common_metrics->remove_counters(std::set{"OperatorTotalTime"}); - - RuntimeProfile* unique_metrics = operator_profile->get_child("UniqueMetrics"); - DCHECK(unique_metrics != nullptr); - unique_metrics->remove_counters(std::set{"ScanTime", "WaitTime"}); - } - } -} - } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/pipeline_driver_executor.h b/be/src/exec/pipeline/pipeline_driver_executor.h index 294c84dea919d..20b1a649933f9 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.h +++ b/be/src/exec/pipeline/pipeline_driver_executor.h @@ -93,7 +93,6 @@ class GlobalDriverExecutor final : public FactoryMethod _get_next_driver(std::queue& local_driver_queue); void _finalize_driver(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state); RuntimeProfile* _build_merged_instance_profile(QueryContext* query_ctx, FragmentContext* fragment_ctx); - void _remove_non_core_metrics(QueryContext* query_ctx, std::vector& driver_profiles); void _finalize_epoch(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state); diff --git a/be/src/exec/spill/spiller.cpp b/be/src/exec/spill/spiller.cpp index 333cbb25b79cb..7ffd272125fe3 100644 --- a/be/src/exec/spill/spiller.cpp +++ b/be/src/exec/spill/spiller.cpp @@ -41,32 +41,35 @@ namespace starrocks::spill { SpillProcessMetrics::SpillProcessMetrics(RuntimeProfile* profile, std::atomic_int64_t* total_spill_bytes_) { - _spiller_metrics = std::make_shared("SpillerMetrics"); + DCHECK(profile != nullptr); total_spill_bytes = total_spill_bytes_; - profile->add_child(_spiller_metrics.get(), true, nullptr); - - append_data_timer = ADD_TIMER(_spiller_metrics.get(), "AppendDataTime"); - spill_rows = ADD_COUNTER(_spiller_metrics.get(), "RowsSpilled", TUnit::UNIT); - flush_timer = ADD_TIMER(_spiller_metrics.get(), "FlushTime"); - write_io_timer = ADD_TIMER(_spiller_metrics.get(), "WriteIOTime"); - restore_rows = ADD_COUNTER(_spiller_metrics.get(), "RowsRestored", TUnit::UNIT); - restore_from_buffer_timer = ADD_TIMER(_spiller_metrics.get(), "RestoreTime"); - read_io_timer = ADD_TIMER(_spiller_metrics.get(), "ReadIOTime"); - flush_bytes = ADD_COUNTER(_spiller_metrics.get(), "BytesFlushToDisk", TUnit::BYTES); - restore_bytes = ADD_COUNTER(_spiller_metrics.get(), "BytesRestoreFromDisk", TUnit::BYTES); - serialize_timer = ADD_TIMER(_spiller_metrics.get(), "SerializeTime"); - deserialize_timer = ADD_TIMER(_spiller_metrics.get(), "DeserializeTime"); - mem_table_peak_memory_usage = _spiller_metrics->AddHighWaterMarkCounter( - "MemTablePeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES)); - input_stream_peak_memory_usage = _spiller_metrics->AddHighWaterMarkCounter( - "InputStreamPeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES)); - - shuffle_timer = ADD_TIMER(_spiller_metrics.get(), "ShuffleTime"); - split_partition_timer = ADD_TIMER(_spiller_metrics.get(), "SplitPartitionTime"); - restore_from_mem_table_rows = ADD_COUNTER(_spiller_metrics.get(), "RowsRestoreFromMemTable", TUnit::UNIT); - restore_from_mem_table_bytes = ADD_COUNTER(_spiller_metrics.get(), "BytesRestoreFromMemTable", TUnit::UNIT); - partition_writer_peak_memory_usage = _spiller_metrics->AddHighWaterMarkCounter( - "PartitionWriterPeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES)); + + std::string parent = "SpillStatistics"; + ADD_TIMER(profile, parent); + + append_data_timer = ADD_CHILD_TIMER(profile, "AppendDataTime", parent); + spill_rows = ADD_CHILD_COUNTER(profile, "RowsSpilled", TUnit::UNIT, parent); + flush_timer = ADD_CHILD_TIMER(profile, "FlushTime", parent); + write_io_timer = ADD_CHILD_TIMER(profile, "WriteIOTime", parent); + restore_rows = ADD_CHILD_COUNTER(profile, "RowsRestored", TUnit::UNIT, parent); + restore_from_buffer_timer = ADD_CHILD_TIMER(profile, "RestoreTime", parent); + read_io_timer = ADD_CHILD_TIMER(profile, "ReadIOTime", parent); + flush_bytes = ADD_CHILD_COUNTER(profile, "BytesFlushToDisk", TUnit::BYTES, parent); + restore_bytes = ADD_CHILD_COUNTER(profile, "BytesRestoreFromDisk", TUnit::BYTES, parent); + serialize_timer = ADD_CHILD_TIMER(profile, "SerializeTime", parent); + deserialize_timer = ADD_CHILD_TIMER(profile, "DeserializeTime", parent); + mem_table_peak_memory_usage = profile->AddHighWaterMarkCounter( + "MemTablePeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES), parent); + input_stream_peak_memory_usage = profile->AddHighWaterMarkCounter( + "InputStreamPeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES), parent); + + shuffle_timer = ADD_CHILD_TIMER(profile, "ShuffleTime", parent); + split_partition_timer = ADD_CHILD_TIMER(profile, "SplitPartitionTime", parent); + restore_from_mem_table_rows = ADD_CHILD_COUNTER(profile, "RowsRestoreFromMemTable", TUnit::UNIT, parent); + restore_from_mem_table_bytes = ADD_CHILD_COUNTER(profile, "BytesRestoreFromMemTable", TUnit::UNIT, parent); + partition_writer_peak_memory_usage = + profile->AddHighWaterMarkCounter("PartitionWriterPeakMemoryBytes", TUnit::BYTES, + RuntimeProfile::Counter::create_strategy(TUnit::BYTES), parent); } Status Spiller::prepare(RuntimeState* state) { @@ -142,4 +145,4 @@ Status Spiller::_acquire_input_stream(RuntimeState* state) { return Status::OK(); } -} // namespace starrocks::spill \ No newline at end of file +} // namespace starrocks::spill diff --git a/be/src/exec/spill/spiller.h b/be/src/exec/spill/spiller.h index 6593c6a358892..d8e0a014fa61b 100644 --- a/be/src/exec/spill/spiller.h +++ b/be/src/exec/spill/spiller.h @@ -43,10 +43,6 @@ namespace starrocks::spill { // some metrics for spill struct SpillProcessMetrics { -private: - // For profile - std::shared_ptr _spiller_metrics; - public: SpillProcessMetrics() = default; SpillProcessMetrics(RuntimeProfile* profile, std::atomic_int64_t* total_spill_bytes); @@ -226,4 +222,4 @@ class Spiller : public std::enable_shared_from_this { std::atomic_bool _is_cancel = false; }; -} // namespace starrocks::spill \ No newline at end of file +} // namespace starrocks::spill diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 4029e785d9a85..791681171a3bb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -2568,19 +2568,10 @@ public TQueryOptions toThrift() { tResult.setRuntime_filter_send_timeout_ms(global_runtime_filter_rpc_timeout); tResult.setRuntime_filter_scan_wait_time_ms(runtimeFilterScanWaitTime); tResult.setPipeline_dop(pipelineDop); - switch (pipelineProfileLevel) { - case 0: - tResult.setPipeline_profile_level(TPipelineProfileLevel.CORE_METRICS); - break; - case 1: - tResult.setPipeline_profile_level(TPipelineProfileLevel.ALL_METRICS); - break; - case 2: - tResult.setPipeline_profile_level(TPipelineProfileLevel.DETAIL); - break; - default: - tResult.setPipeline_profile_level(TPipelineProfileLevel.CORE_METRICS); - break; + if (pipelineProfileLevel == 2) { + tResult.setPipeline_profile_level(TPipelineProfileLevel.DETAIL); + } else { + tResult.setPipeline_profile_level(TPipelineProfileLevel.MERGE); } tResult.setEnable_tablet_internal_parallel(enableTabletInternalParallel); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java index a561824eae39a..e531df9930c2a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java @@ -49,7 +49,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -302,6 +301,7 @@ public RuntimeProfile buildMergedQueryProfile() { int profileLevel = sessionVariable.getPipelineProfileLevel(); if (profileLevel >= TPipelineProfileLevel.DETAIL.getValue()) { + // We don't guarantee the detail level profile can work well with visualization feature. return queryProfile; } @@ -387,7 +387,6 @@ public RuntimeProfile buildMergedQueryProfile() { mergedInstanceProfile.getChildList().forEach(pair -> { RuntimeProfile pipelineProfile = pair.first; - foldUnnecessaryLimitOperators(pipelineProfile); setOperatorStatus(pipelineProfile); newFragmentProfile.addChild(pipelineProfile); }); @@ -507,36 +506,6 @@ private List getUnfinishedInstanceIds() { .collect(Collectors.toList()); } - /** - * Remove unnecessary LimitOperator, which has same input rows and output rows - * to keep the profile concise - */ - private void foldUnnecessaryLimitOperators(RuntimeProfile pipelineProfile) { - SessionVariable sessionVariable = connectContext.getSessionVariable(); - if (!sessionVariable.isProfileLimitFold()) { - return; - } - - List foldNames = Lists.newArrayList(); - for (Pair child : pipelineProfile.getChildList()) { - RuntimeProfile operatorProfile = child.first; - if (operatorProfile.getName().contains("LIMIT")) { - RuntimeProfile commonMetrics = operatorProfile.getChild("CommonMetrics"); - Preconditions.checkNotNull(commonMetrics); - Counter pullRowNum = commonMetrics.getCounter("PullRowNum"); - Counter pushRowNum = commonMetrics.getCounter("PushRowNum"); - if (pullRowNum == null || pushRowNum == null) { - continue; - } - if (Objects.equals(pullRowNum.getValue(), pushRowNum.getValue())) { - foldNames.add(operatorProfile.getName()); - } - } - } - - foldNames.forEach(pipelineProfile::removeChild); - } - private void setOperatorStatus(RuntimeProfile pipelineProfile) { for (Pair child : pipelineProfile.getChildList()) { RuntimeProfile operatorProfile = child.first; diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ExplainAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/ExplainAnalyzer.java index 16dd8ff4d382e..986fcdd7408e9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ExplainAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ExplainAnalyzer.java @@ -824,7 +824,8 @@ private static Counter searchMetric(NodeInfo nodeInfo, boolean searchSubordinate private static Counter searchMetricFromUpperLevel(NodeInfo nodeInfo, String... nameLevels) { for (int i = 0; i < nodeInfo.operatorProfiles.size(); i++) { RuntimeProfile operatorProfile = nodeInfo.operatorProfiles.get(i); - if (i < nodeInfo.operatorProfiles.size() - 1 && operatorProfile.getName().contains("_SINK (")) { + if (i < nodeInfo.operatorProfiles.size() - 1 && (operatorProfile.getName().contains("_SINK (") + || operatorProfile.getName().contains("_BUILD ("))) { continue; } RuntimeProfile cur = getLastLevel(operatorProfile, nameLevels); diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index a3fc9b5cc6e76..a93bb73891637 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -95,9 +95,8 @@ struct TLoadErrorHubInfo { } enum TPipelineProfileLevel { - CORE_METRICS, - ALL_METRICS, - DETAIL + MERGE = 1; + DETAIL = 2; } enum TSpillMode {