Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix compatibility issue when collecting query statistics #29678

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
scan->end_driver_process(this);
}

_update_statistics(total_chunks_moved, total_rows_moved, time_spent);
_update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
});

if (ScanOperator* scan = source_scan_operator()) {
Expand Down Expand Up @@ -260,7 +260,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
_adjust_memory_usage(runtime_state, query_mem_tracker.get(), next_op, nullptr);
Expand Down Expand Up @@ -348,7 +348,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
_adjust_memory_usage(runtime_state, query_mem_tracker.get(), next_op, nullptr);
Expand Down Expand Up @@ -778,11 +778,12 @@ void PipelineDriver::_update_driver_acct(size_t total_chunks_moved, size_t total
driver_acct().update_last_time_spent(time_spent);
}

void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent) {
void PipelineDriver::_update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved,
size_t time_spent) {
_update_driver_acct(total_chunks_moved, total_rows_moved, time_spent);

// Update statistics of scan operator
_update_scan_statistics();
_update_scan_statistics(state);

// Update cpu cost of this query
int64_t runtime_ns = driver_acct().get_last_time_spent();
Expand All @@ -792,15 +793,17 @@ void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_
query_ctx()->incr_cpu_cost(accounted_cpu_cost);
}

void PipelineDriver::_update_scan_statistics() {
void PipelineDriver::_update_scan_statistics(RuntimeState* state) {
if (ScanOperator* scan = source_scan_operator()) {
int64_t scan_rows = scan->get_last_scan_rows_num();
int64_t scan_bytes = scan->get_last_scan_bytes();
int64_t table_id = scan->get_scan_table_id();
if (scan_rows > 0 || scan_bytes > 0) {
query_ctx()->incr_cur_scan_rows_num(scan_rows);
query_ctx()->incr_cur_scan_bytes(scan_bytes);
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
if (state->enable_collect_table_level_scan_stats()) {
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ class PipelineDriver {

// Update metrics when the driver yields.
void _update_driver_acct(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics();
void _update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics(RuntimeState* state);
void _update_overhead_timer();

RuntimeState* _runtime_state = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/stream_pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ StatusOr<DriverState> StreamPipelineDriver::process(RuntimeState* runtime_state,
StatusOr<DriverState> resturn_state = Status::OK();
DeferOp defer([&]() {
if (return_status.ok()) {
_update_statistics(total_chunks_moved, total_rows_moved, time_spent);
_update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
}
});

Expand Down Expand Up @@ -186,7 +186,7 @@ StatusOr<DriverState> StreamPipelineDriver::_handle_finish_operators(RuntimeStat
StatusOr<DriverState> resturn_state = Status::OK();
DeferOp defer([&]() {
if (return_status.ok()) {
_update_statistics(total_chunks_moved, total_rows_moved, time_spent);
_update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
}
});

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ class RuntimeState {

bool use_page_cache();

bool enable_collect_table_level_scan_stats() const {
return _query_options.__isset.enable_collect_table_level_scan_stats &&
_query_options.enable_collect_table_level_scan_stats;
}

private:
// Set per-query state.
void _init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ public static MaterializedViewRewriteMode parse(String str) {

public static final String CONSISTENT_HASH_VIRTUAL_NUMBER = "consistent_hash_virtual_number";

public static final String ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS = "enable_collect_table_level_scan_stats";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(MAX_EXECUTION_TIME)
Expand Down Expand Up @@ -1225,6 +1227,11 @@ public void setEnableParallelMerge(boolean enableParallelMerge) {
@VarAttr(name = ENABLE_COUNT_STAR_OPTIMIZATION, flag = VariableMgr.INVISIBLE)
private boolean enableCountStarOptimization = true;

// This variable is introduced to solve compatibility issues/
// see more details: https://github.com/StarRocks/starrocks/pull/29678
@VarAttr(name = ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS)
private boolean enableCollectTableLevelScanStats = true;

private int exprChildrenLimit = -1;

public int getExprChildrenLimit() {
Expand Down Expand Up @@ -2489,6 +2496,7 @@ public TQueryOptions toThrift() {
tResult.setConnector_io_tasks_slow_io_latency_ms(connectorIoTasksSlowIoLatency);
tResult.setConnector_scan_use_query_mem_ratio(connectorScanUseQueryMemRatio);
tResult.setScan_use_query_mem_ratio(scanUseQueryMemRatio);
tResult.setEnable_collect_table_level_scan_stats(enableCollectTableLevelScanStats);
return tResult;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ struct TQueryOptions {
97: optional i64 load_profile_collect_second;

101: optional i64 runtime_profile_report_interval = 30;

102: optional bool enable_collect_table_level_scan_stats;
}


Expand Down