Skip to content

Commit

Permalink
[BugFix] fix compatibility issue when collecting query statistics (St…
Browse files Browse the repository at this point in the history
…arRocks#29678)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
  • Loading branch information
silverbullet233 committed Aug 23, 2023
1 parent 8c6d9e4 commit 4e0eca5
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
17 changes: 10 additions & 7 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
scan->end_driver_process(this);
}

_update_statistics(total_chunks_moved, total_rows_moved, time_spent);
_update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
});

if (ScanOperator* scan = source_scan_operator()) {
Expand Down Expand Up @@ -220,7 +220,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
Expand Down Expand Up @@ -301,7 +301,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
Expand Down Expand Up @@ -657,11 +657,12 @@ void PipelineDriver::_update_driver_acct(size_t total_chunks_moved, size_t total
driver_acct().update_last_time_spent(time_spent);
}

void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent) {
void PipelineDriver::_update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved,
size_t time_spent) {
_update_driver_acct(total_chunks_moved, total_rows_moved, time_spent);

// Update statistics of scan operator
_update_scan_statistics();
_update_scan_statistics(state);

// Update cpu cost of this query
int64_t runtime_ns = driver_acct().get_last_time_spent();
Expand All @@ -671,15 +672,17 @@ void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_
query_ctx()->incr_cpu_cost(accounted_cpu_cost);
}

void PipelineDriver::_update_scan_statistics() {
void PipelineDriver::_update_scan_statistics(RuntimeState* state) {
if (ScanOperator* scan = source_scan_operator()) {
int64_t scan_rows = scan->get_last_scan_rows_num();
int64_t scan_bytes = scan->get_last_scan_bytes();
int64_t table_id = scan->get_scan_table_id();
if (scan_rows > 0 || scan_bytes > 0) {
query_ctx()->incr_cur_scan_rows_num(scan_rows);
query_ctx()->incr_cur_scan_bytes(scan_bytes);
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
if (state->enable_collect_table_level_scan_stats()) {
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ class PipelineDriver {

// Update metrics when the driver yields.
void _update_driver_acct(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics();
void _update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics(RuntimeState* state);
void _update_overhead_timer();

RuntimeState* _runtime_state = nullptr;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ class RuntimeState {
return _query_options.__isset.rpc_http_min_size ? _query_options.rpc_http_min_size : kRpcHttpMinSize;
}

bool enable_collect_table_level_scan_stats() const {
return _query_options.__isset.enable_collect_table_level_scan_stats &&
_query_options.enable_collect_table_level_scan_stats;
}

private:
// Set per-query state.
void _init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String ENABLE_SIMPLIFY_CASE_WHEN = "enable_simplify_case_when";

public static final String ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS = "enable_collect_table_level_scan_stats";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(ENABLE_SPILLING)
Expand Down Expand Up @@ -964,6 +966,11 @@ public int getConnectorIoTasksPerScanOperator() {
@VarAttr(name = ENABLE_SIMPLIFY_CASE_WHEN, flag = VariableMgr.INVISIBLE)
private boolean enableSimplifyCaseWhen = true;

// This variable is introduced to solve compatibility issues/
// see more details: https://github.com/StarRocks/starrocks/pull/29678
@VarAttr(name = ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS)
private boolean enableCollectTableLevelScanStats = true;

private int exprChildrenLimit = -1;

public int getExprChildrenLimit() {
Expand Down Expand Up @@ -1905,6 +1912,7 @@ public TQueryOptions toThrift() {
tResult.setConnector_io_tasks_slow_io_latency_ms(connectorIoTasksSlowIoLatency);
tResult.setConnector_scan_use_query_mem_ratio(connectorScanUseQueryMemRatio);
tResult.setScan_use_query_mem_ratio(scanUseQueryMemRatio);
tResult.setEnable_collect_table_level_scan_stats(enableCollectTableLevelScanStats);
return tResult;
}

Expand Down
4 changes: 3 additions & 1 deletion gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ struct TQueryOptions {
92: optional bool enable_connector_adaptive_io_tasks = true;
93: optional i32 connector_io_tasks_slow_io_latency_ms = 50;
94: optional double scan_use_query_mem_ratio = 0.25;
95: optional double connector_scan_use_query_mem_ratio = 0.3;
95: optional double connector_scan_use_query_mem_ratio = 0.3;

102: optional bool enable_collect_table_level_scan_stats;
}


Expand Down

0 comments on commit 4e0eca5

Please sign in to comment.