Skip to content

Commit

Permalink
[BugFix] fix compatibility issue when collecting query statistics (St…
Browse files Browse the repository at this point in the history
…arRocks#29678)

Signed-off-by: silverbullet233 <3675229+silverbullet233@users.noreply.github.com>
  • Loading branch information
silverbullet233 committed Aug 23, 2023
1 parent dc57906 commit 7c8232b
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 10 deletions.
17 changes: 10 additions & 7 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
size_t total_rows_moved = 0;
int64_t time_spent = 0;
Status return_status = Status::OK();
DeferOp defer([&]() { _update_statistics(total_chunks_moved, total_rows_moved, time_spent); });
DeferOp defer([&]() { _update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent); });
while (true) {
RETURN_IF_LIMIT_EXCEEDED(runtime_state, "Pipeline");

Expand All @@ -138,7 +138,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
Expand Down Expand Up @@ -203,7 +203,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int w
// We rely on the exchange operator to pass query statistics,
// so when the scan operator finishes,
// we need to update the scan stats immediately to ensure that the exchange operator can send all the data before the end
_update_scan_statistics();
_update_scan_statistics(runtime_state);
RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
}
RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
Expand Down Expand Up @@ -515,11 +515,12 @@ void PipelineDriver::_update_driver_acct(size_t total_chunks_moved, size_t total
driver_acct().update_last_time_spent(time_spent);
}

void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent) {
void PipelineDriver::_update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved,
size_t time_spent) {
_update_driver_acct(total_chunks_moved, total_rows_moved, time_spent);

// Update statistics of scan operator
_update_scan_statistics();
_update_scan_statistics(state);

// Update cpu cost of this query
int64_t runtime_ns = driver_acct().get_last_time_spent();
Expand All @@ -529,15 +530,17 @@ void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_
query_ctx()->incr_cpu_cost(accounted_cpu_cost);
}

void PipelineDriver::_update_scan_statistics() {
void PipelineDriver::_update_scan_statistics(RuntimeState* state) {
if (ScanOperator* scan = source_scan_operator()) {
int64_t scan_rows = scan->get_last_scan_rows_num();
int64_t scan_bytes = scan->get_last_scan_bytes();
int64_t table_id = scan->get_scan_table_id();
if (scan_rows > 0 || scan_bytes > 0) {
query_ctx()->incr_cur_scan_rows_num(scan_rows);
query_ctx()->incr_cur_scan_bytes(scan_bytes);
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
if (state->enable_collect_table_level_scan_stats()) {
query_ctx()->update_scan_stats(table_id, scan_rows, scan_bytes);
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ class PipelineDriver {

// Update metrics when the driver yields.
void _update_driver_acct(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_statistics(size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics();
void _update_statistics(RuntimeState* state, size_t total_chunks_moved, size_t total_rows_moved, size_t time_spent);
void _update_scan_statistics(RuntimeState* state);
void _update_overhead_timer();

RuntimeState* _runtime_state = nullptr;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ class RuntimeState {
std::shared_ptr<QueryStatistics> intermediate_query_statistic();
std::shared_ptr<QueryStatisticsRecvr> query_recv();

bool enable_collect_table_level_scan_stats() const {
return _query_options.__isset.enable_collect_table_level_scan_stats &&
_query_options.enable_collect_table_level_scan_stats;
}

private:
// Set per-query state.
void _init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String SELECT_RATIO_THRESHOLD = "SELECT_RATIO_THRESHOLD";

public static final String ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS = "enable_collect_table_level_scan_stats";

// Limitations
// mem limit can't smaller than bufferpool's default page size
public static final int MIN_EXEC_MEM_LIMIT = 2097152;
Expand Down Expand Up @@ -614,6 +616,9 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
@VarAttr(name = SELECT_RATIO_THRESHOLD, flag = VariableMgr.INVISIBLE)
private double selectRatioThreshold = 0.15;

@VarAttr(name = ENABLE_COLLECT_TABLE_LEVEL_SCAN_STATS)
private boolean enableCollectTableLevelScanStats = true;

public int getStatisticCollectParallelism() {
return statisticCollectParallelism;
}
Expand Down Expand Up @@ -1174,7 +1179,7 @@ public TQueryOptions toThrift() {

tResult.setEnable_tablet_internal_parallel(enableTabletInternalParallel);
tResult.setEnable_pipeline_query_statistic(enablePipelineQueryStatistic);

tResult.setEnable_collect_table_level_scan_stats(enableCollectTableLevelScanStats);
return tResult;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ struct TQueryOptions {

67: optional bool enable_pipeline_query_statistic = false;


102: optional bool enable_collect_table_level_scan_stats;
// The following params only exist on 2.2 2.3, to avoid upgrade inconsistency
// (if start from a low number, say 80, this id may be used by another param in the new version),
// start from 1000
Expand Down

0 comments on commit 7c8232b

Please sign in to comment.