Skip to content

Commit

Permalink
[BugFix] fix compatibility issue when collecting query statistics (#2…
Browse files Browse the repository at this point in the history
…9678)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
  • Loading branch information
silverbullet233 authored Aug 22, 2023
1 parent ea21474 commit be2f71b
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 11 deletions.
17 changes: 10 additions & 7 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
scan->end_driver_process(this);
}

_update_statistics(total_chunks_moved, total_rows_moved, time_spent);
_update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
});

if (ScanOperator* scan = source_scan_operator()) {
Expand Down Expand Up @@ -260,7 +260,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
_adjust_memory_usage(runtime_state, query_mem_tracker.get(), next_op, nullptr);
Expand Down Expand Up @@ -348,7 +348,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
_adjust_memory_usage(runtime_state, query_mem_tracker.get(), next_op, nullptr);
Expand Down Expand Up @@ -778,11 +778,12 @@ void PipelineDriver::_update_driver_acct(size_t total_chunks_moved, size_t total
driver_acct().update_last_time_spent(time_spent);
}

void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent) {
void PipelineDriver::_update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved,
size_t time_spent) {
_update_driver_acct(total_chunks_moved, total_rows_moved, time_spent);

// Update statistics of scan operator
_update_scan_statistics();
_update_scan_statistics(state);

// Update cpu cost of this query
int64_t runtime_ns = driver_acct().get_last_time_spent();
Expand All @@ -792,15 +793,17 @@ void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_
query_ctx()->incr_cpu_cost(accounted_cpu_cost);
}

void PipelineDriver::_update_scan_statistics() {
void PipelineDriver::_update_scan_statistics(RuntimeState* state) {
if (ScanOperator* scan = source_scan_operator()) {
int64_t scan_rows = scan->get_last_scan_rows_num();
int64_t scan_bytes = scan->get_last_scan_bytes();
int64_t table_id = scan->get_scan_table_id();
if (scan_rows > 0 || scan_bytes > 0) {
query_ctx()->incr_cur_scan_rows_num(scan_rows);
query_ctx()->incr_cur_scan_bytes(scan_bytes);
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
if (state->enable_collect_table_level_scan_stats()) {
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ class PipelineDriver {

// Update metrics when the driver yields.
void _update_driver_acct(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics();
void _update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics(RuntimeState* state);
void _update_overhead_timer();

RuntimeState* _runtime_state = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/stream_pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ StatusOr<DriverState> StreamPipelineDriver::process(RuntimeState* runtime_state,
StatusOr<DriverState> resturn_state = Status::OK();
DeferOp defer([&]() {
if (return_status.ok()) {
_update_statistics(total_chunks_moved, total_rows_moved, time_spent);
_update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
}
});

Expand Down Expand Up @@ -186,7 +186,7 @@ StatusOr<DriverState> StreamPipelineDriver::_handle_finish_operators(RuntimeStat
StatusOr<DriverState> resturn_state = Status::OK();
DeferOp defer([&]() {
if (return_status.ok()) {
_update_statistics(total_chunks_moved, total_rows_moved, time_spent);
_update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
}
});

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ class RuntimeState {

bool use_page_cache();

bool enable_collect_table_level_scan_stats() const {
return _query_options.__isset.enable_collect_table_level_scan_stats &&
_query_options.enable_collect_table_level_scan_stats;
}

private:
// Set per-query state.
void _init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ public static MaterializedViewRewriteMode parse(String str) {

public static final String CONSISTENT_HASH_VIRTUAL_NUMBER = "consistent_hash_virtual_number";

public static final String ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS = "enable_collect_table_level_scan_stats";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(MAX_EXECUTION_TIME)
Expand Down Expand Up @@ -1225,6 +1227,11 @@ public void setEnableParallelMerge(boolean enableParallelMerge) {
@VarAttr(name = ENABLE_COUNT_STAR_OPTIMIZATION, flag = VariableMgr.INVISIBLE)
private boolean enableCountStarOptimization = true;

// This variable is introduced to solve compatibility issues/
// see more details: https://github.com/StarRocks/starrocks/pull/29678
@VarAttr(name = ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS)
private boolean enableCollectTableLevelScanStats = true;

private int exprChildrenLimit = -1;

public int getExprChildrenLimit() {
Expand Down Expand Up @@ -2489,6 +2496,7 @@ public TQueryOptions toThrift() {
tResult.setConnector_io_tasks_slow_io_latency_ms(connectorIoTasksSlowIoLatency);
tResult.setConnector_scan_use_query_mem_ratio(connectorScanUseQueryMemRatio);
tResult.setScan_use_query_mem_ratio(scanUseQueryMemRatio);
tResult.setEnable_collect_table_level_scan_stats(enableCollectTableLevelScanStats);
return tResult;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ struct TQueryOptions {
97: optional i64 load_profile_collect_second;

101: optional i64 runtime_profile_report_interval = 30;

102: optional bool enable_collect_table_level_scan_stats;
}


Expand Down

0 comments on commit be2f71b

Please sign in to comment.