Skip to content

Commit

Permalink
[Enhancement] Refine profile to support visualization refactor(2) (St…
Browse files Browse the repository at this point in the history
…arRocks#30381)

* [Enhancement] Refine profile to support visualization refactor(2)

Signed-off-by: liuyehcf <1559500551@qq.com>

* update enum value

Signed-off-by: liuyehcf <1559500551@qq.com>

---------

Signed-off-by: liuyehcf <1559500551@qq.com>
  • Loading branch information
liuyehcf authored Sep 7, 2023
1 parent e056a01 commit 653ae3e
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 109 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void PipelineBuilderContext::interpolate_spill_process(size_t plan_node_id,
const SpillProcessChannelFactoryPtr& spill_channel_factory,
size_t dop) {
OpFactories spill_process_operators;
auto spill_process_factory = std::make_shared<SpillProcessOperatorFactory>(next_operator_id(), "spill-process",
auto spill_process_factory = std::make_shared<SpillProcessOperatorFactory>(next_operator_id(), "spill_process",
plan_node_id, spill_channel_factory);
spill_process_factory->set_degree_of_parallelism(dop);
spill_process_operators.emplace_back(std::move(spill_process_factory));
Expand Down
27 changes: 0 additions & 27 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ RuntimeProfile* GlobalDriverExecutor::_build_merged_instance_profile(QueryContex
continue;
}

_remove_non_core_metrics(query_ctx, driver_profiles);

auto* merged_driver_profile =
RuntimeProfile::merge_isomorphic_profiles(query_ctx->object_pool(), driver_profiles);

Expand All @@ -474,29 +472,4 @@ RuntimeProfile* GlobalDriverExecutor::_build_merged_instance_profile(QueryContex

return new_instance_profile;
}

void GlobalDriverExecutor::_remove_non_core_metrics(QueryContext* query_ctx,
std::vector<RuntimeProfile*>& driver_profiles) {
if (query_ctx->profile_level() > TPipelineProfileLevel::CORE_METRICS) {
return;
}

for (auto* driver_profile : driver_profiles) {
driver_profile->remove_counters(std::set<std::string>{"DriverTotalTime", "ActiveTime", "PendingTime"});

std::vector<RuntimeProfile*> operator_profiles;
driver_profile->get_children(&operator_profiles);

for (auto* operator_profile : operator_profiles) {
RuntimeProfile* common_metrics = operator_profile->get_child("CommonMetrics");
DCHECK(common_metrics != nullptr);
common_metrics->remove_counters(std::set<std::string>{"OperatorTotalTime"});

RuntimeProfile* unique_metrics = operator_profile->get_child("UniqueMetrics");
DCHECK(unique_metrics != nullptr);
unique_metrics->remove_counters(std::set<std::string>{"ScanTime", "WaitTime"});
}
}
}

} // namespace starrocks::pipeline
1 change: 0 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
StatusOr<DriverRawPtr> _get_next_driver(std::queue<DriverRawPtr>& local_driver_queue);
void _finalize_driver(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state);
RuntimeProfile* _build_merged_instance_profile(QueryContext* query_ctx, FragmentContext* fragment_ctx);
void _remove_non_core_metrics(QueryContext* query_ctx, std::vector<RuntimeProfile*>& driver_profiles);

void _finalize_epoch(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state);

Expand Down
55 changes: 29 additions & 26 deletions be/src/exec/spill/spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,35 @@
namespace starrocks::spill {

SpillProcessMetrics::SpillProcessMetrics(RuntimeProfile* profile, std::atomic_int64_t* total_spill_bytes_) {
_spiller_metrics = std::make_shared<RuntimeProfile>("SpillerMetrics");
DCHECK(profile != nullptr);
total_spill_bytes = total_spill_bytes_;
profile->add_child(_spiller_metrics.get(), true, nullptr);

append_data_timer = ADD_TIMER(_spiller_metrics.get(), "AppendDataTime");
spill_rows = ADD_COUNTER(_spiller_metrics.get(), "RowsSpilled", TUnit::UNIT);
flush_timer = ADD_TIMER(_spiller_metrics.get(), "FlushTime");
write_io_timer = ADD_TIMER(_spiller_metrics.get(), "WriteIOTime");
restore_rows = ADD_COUNTER(_spiller_metrics.get(), "RowsRestored", TUnit::UNIT);
restore_from_buffer_timer = ADD_TIMER(_spiller_metrics.get(), "RestoreTime");
read_io_timer = ADD_TIMER(_spiller_metrics.get(), "ReadIOTime");
flush_bytes = ADD_COUNTER(_spiller_metrics.get(), "BytesFlushToDisk", TUnit::BYTES);
restore_bytes = ADD_COUNTER(_spiller_metrics.get(), "BytesRestoreFromDisk", TUnit::BYTES);
serialize_timer = ADD_TIMER(_spiller_metrics.get(), "SerializeTime");
deserialize_timer = ADD_TIMER(_spiller_metrics.get(), "DeserializeTime");
mem_table_peak_memory_usage = _spiller_metrics->AddHighWaterMarkCounter(
"MemTablePeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES));
input_stream_peak_memory_usage = _spiller_metrics->AddHighWaterMarkCounter(
"InputStreamPeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES));

shuffle_timer = ADD_TIMER(_spiller_metrics.get(), "ShuffleTime");
split_partition_timer = ADD_TIMER(_spiller_metrics.get(), "SplitPartitionTime");
restore_from_mem_table_rows = ADD_COUNTER(_spiller_metrics.get(), "RowsRestoreFromMemTable", TUnit::UNIT);
restore_from_mem_table_bytes = ADD_COUNTER(_spiller_metrics.get(), "BytesRestoreFromMemTable", TUnit::UNIT);
partition_writer_peak_memory_usage = _spiller_metrics->AddHighWaterMarkCounter(
"PartitionWriterPeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES));

std::string parent = "SpillStatistics";
ADD_TIMER(profile, parent);

append_data_timer = ADD_CHILD_TIMER(profile, "AppendDataTime", parent);
spill_rows = ADD_CHILD_COUNTER(profile, "RowsSpilled", TUnit::UNIT, parent);
flush_timer = ADD_CHILD_TIMER(profile, "FlushTime", parent);
write_io_timer = ADD_CHILD_TIMER(profile, "WriteIOTime", parent);
restore_rows = ADD_CHILD_COUNTER(profile, "RowsRestored", TUnit::UNIT, parent);
restore_from_buffer_timer = ADD_CHILD_TIMER(profile, "RestoreTime", parent);
read_io_timer = ADD_CHILD_TIMER(profile, "ReadIOTime", parent);
flush_bytes = ADD_CHILD_COUNTER(profile, "BytesFlushToDisk", TUnit::BYTES, parent);
restore_bytes = ADD_CHILD_COUNTER(profile, "BytesRestoreFromDisk", TUnit::BYTES, parent);
serialize_timer = ADD_CHILD_TIMER(profile, "SerializeTime", parent);
deserialize_timer = ADD_CHILD_TIMER(profile, "DeserializeTime", parent);
mem_table_peak_memory_usage = profile->AddHighWaterMarkCounter(
"MemTablePeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES), parent);
input_stream_peak_memory_usage = profile->AddHighWaterMarkCounter(
"InputStreamPeakMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES), parent);

shuffle_timer = ADD_CHILD_TIMER(profile, "ShuffleTime", parent);
split_partition_timer = ADD_CHILD_TIMER(profile, "SplitPartitionTime", parent);
restore_from_mem_table_rows = ADD_CHILD_COUNTER(profile, "RowsRestoreFromMemTable", TUnit::UNIT, parent);
restore_from_mem_table_bytes = ADD_CHILD_COUNTER(profile, "BytesRestoreFromMemTable", TUnit::UNIT, parent);
partition_writer_peak_memory_usage =
profile->AddHighWaterMarkCounter("PartitionWriterPeakMemoryBytes", TUnit::BYTES,
RuntimeProfile::Counter::create_strategy(TUnit::BYTES), parent);
}

Status Spiller::prepare(RuntimeState* state) {
Expand Down Expand Up @@ -142,4 +145,4 @@ Status Spiller::_acquire_input_stream(RuntimeState* state) {

return Status::OK();
}
} // namespace starrocks::spill
} // namespace starrocks::spill
6 changes: 1 addition & 5 deletions be/src/exec/spill/spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ namespace starrocks::spill {

// some metrics for spill
struct SpillProcessMetrics {
private:
// For profile
std::shared_ptr<RuntimeProfile> _spiller_metrics;

public:
SpillProcessMetrics() = default;
SpillProcessMetrics(RuntimeProfile* profile, std::atomic_int64_t* total_spill_bytes);
Expand Down Expand Up @@ -226,4 +222,4 @@ class Spiller : public std::enable_shared_from_this<Spiller> {
std::atomic_bool _is_cancel = false;
};

} // namespace starrocks::spill
} // namespace starrocks::spill
17 changes: 4 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2568,19 +2568,10 @@ public TQueryOptions toThrift() {
tResult.setRuntime_filter_send_timeout_ms(global_runtime_filter_rpc_timeout);
tResult.setRuntime_filter_scan_wait_time_ms(runtimeFilterScanWaitTime);
tResult.setPipeline_dop(pipelineDop);
switch (pipelineProfileLevel) {
case 0:
tResult.setPipeline_profile_level(TPipelineProfileLevel.CORE_METRICS);
break;
case 1:
tResult.setPipeline_profile_level(TPipelineProfileLevel.ALL_METRICS);
break;
case 2:
tResult.setPipeline_profile_level(TPipelineProfileLevel.DETAIL);
break;
default:
tResult.setPipeline_profile_level(TPipelineProfileLevel.CORE_METRICS);
break;
if (pipelineProfileLevel == 2) {
tResult.setPipeline_profile_level(TPipelineProfileLevel.DETAIL);
} else {
tResult.setPipeline_profile_level(TPipelineProfileLevel.MERGE);
}

tResult.setEnable_tablet_internal_parallel(enableTabletInternalParallel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -302,6 +301,7 @@ public RuntimeProfile buildMergedQueryProfile() {

int profileLevel = sessionVariable.getPipelineProfileLevel();
if (profileLevel >= TPipelineProfileLevel.DETAIL.getValue()) {
// We don't guarantee the detail level profile can work well with visualization feature.
return queryProfile;
}

Expand Down Expand Up @@ -387,7 +387,6 @@ public RuntimeProfile buildMergedQueryProfile() {

mergedInstanceProfile.getChildList().forEach(pair -> {
RuntimeProfile pipelineProfile = pair.first;
foldUnnecessaryLimitOperators(pipelineProfile);
setOperatorStatus(pipelineProfile);
newFragmentProfile.addChild(pipelineProfile);
});
Expand Down Expand Up @@ -507,36 +506,6 @@ private List<String> getUnfinishedInstanceIds() {
.collect(Collectors.toList());
}

/**
* Remove unnecessary LimitOperator, which has same input rows and output rows
* to keep the profile concise
*/
private void foldUnnecessaryLimitOperators(RuntimeProfile pipelineProfile) {
SessionVariable sessionVariable = connectContext.getSessionVariable();
if (!sessionVariable.isProfileLimitFold()) {
return;
}

List<String> foldNames = Lists.newArrayList();
for (Pair<RuntimeProfile, Boolean> child : pipelineProfile.getChildList()) {
RuntimeProfile operatorProfile = child.first;
if (operatorProfile.getName().contains("LIMIT")) {
RuntimeProfile commonMetrics = operatorProfile.getChild("CommonMetrics");
Preconditions.checkNotNull(commonMetrics);
Counter pullRowNum = commonMetrics.getCounter("PullRowNum");
Counter pushRowNum = commonMetrics.getCounter("PushRowNum");
if (pullRowNum == null || pushRowNum == null) {
continue;
}
if (Objects.equals(pullRowNum.getValue(), pushRowNum.getValue())) {
foldNames.add(operatorProfile.getName());
}
}
}

foldNames.forEach(pipelineProfile::removeChild);
}

private void setOperatorStatus(RuntimeProfile pipelineProfile) {
for (Pair<RuntimeProfile, Boolean> child : pipelineProfile.getChildList()) {
RuntimeProfile operatorProfile = child.first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,8 @@ private static Counter searchMetric(NodeInfo nodeInfo, boolean searchSubordinate
private static Counter searchMetricFromUpperLevel(NodeInfo nodeInfo, String... nameLevels) {
for (int i = 0; i < nodeInfo.operatorProfiles.size(); i++) {
RuntimeProfile operatorProfile = nodeInfo.operatorProfiles.get(i);
if (i < nodeInfo.operatorProfiles.size() - 1 && operatorProfile.getName().contains("_SINK (")) {
if (i < nodeInfo.operatorProfiles.size() - 1 && (operatorProfile.getName().contains("_SINK (")
|| operatorProfile.getName().contains("_BUILD ("))) {
continue;
}
RuntimeProfile cur = getLastLevel(operatorProfile, nameLevels);
Expand Down
5 changes: 2 additions & 3 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ struct TLoadErrorHubInfo {
}

enum TPipelineProfileLevel {
CORE_METRICS,
ALL_METRICS,
DETAIL
MERGE = 1;
DETAIL = 2;
}

enum TSpillMode {
Expand Down

0 comments on commit 653ae3e

Please sign in to comment.