Skip to content

Commit

Permalink
[Enhancement] Refine ScanOperator profile metrics (#9412)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyehcf authored Aug 1, 2022
1 parent 671b5a5 commit 12c19a2
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 6 deletions.
9 changes: 8 additions & 1 deletion be/src/exec/pipeline/scan/chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ ChunkSource::ChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profi
_chunk_token(nullptr),
_executor_type(executor_type) {}

Status ChunkSource::prepare(RuntimeState* state) {
_scan_timer = ADD_TIMER(_runtime_profile, "ScanTime");
_io_task_wait_timer = ADD_TIMER(_runtime_profile, "IOTaskWaitTime");
_io_task_exec_timer = ADD_TIMER(_runtime_profile, "IOTaskExecTime");
return Status::OK();
}

StatusOr<vectorized::ChunkPtr> ChunkSource::get_next_chunk_from_buffer() {
vectorized::ChunkPtr chunk = nullptr;
_chunk_buffer.try_get(_scan_operator_seq, &chunk);
Expand Down Expand Up @@ -90,4 +97,4 @@ std::pair<Status, size_t> ChunkSource::buffer_next_batch_chunks_blocking(Runtime
}

using namespace vectorized;
} // namespace starrocks::pipeline
} // namespace starrocks::pipeline
12 changes: 11 additions & 1 deletion be/src/exec/pipeline/scan/chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ChunkSource {

virtual ~ChunkSource() = default;

virtual Status prepare(RuntimeState* state) = 0;
virtual Status prepare(RuntimeState* state);

virtual void close(RuntimeState* state) = 0;

Expand All @@ -48,6 +48,10 @@ class ChunkSource {
int64_t get_scan_rows() const { return _scan_rows_num; }
int64_t get_scan_bytes() const { return _scan_bytes; }

RuntimeProfile::Counter* scan_timer() { return _scan_timer; }
RuntimeProfile::Counter* io_task_wait_timer() { return _io_task_wait_timer; }
RuntimeProfile::Counter* io_task_exec_timer() { return _io_task_exec_timer; }

void pin_chunk_token(ChunkBufferTokenPtr chunk_token);
void unpin_chunk_token();

Expand Down Expand Up @@ -76,6 +80,12 @@ class ChunkSource {
ChunkBufferTokenPtr _chunk_token;

const workgroup::ScanExecutorType _executor_type;

private:
// _scan_timer = _io_task_wait_timer + _io_task_exec_timer
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _io_task_wait_timer = nullptr;
RuntimeProfile::Counter* _io_task_exec_timer = nullptr;
};

using ChunkSourcePtr = std::shared_ptr<ChunkSource>;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ ConnectorChunkSource::~ConnectorChunkSource() {
}

Status ConnectorChunkSource::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ChunkSource::prepare(state));
_runtime_state = state;
return Status::OK();
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void OlapChunkSource::close(RuntimeState* state) {
}

Status OlapChunkSource::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ChunkSource::prepare(state));
_runtime_state = state;
const TOlapScanNode& thrift_olap_scan_node = _scan_node->thrift_olap_scan_node();
const TupleDescriptor* tuple_desc = state->desc_tbl().get_tuple_descriptor(thrift_olap_scan_node.tuple_id);
Expand All @@ -73,7 +74,6 @@ Status OlapChunkSource::prepare(RuntimeState* state) {
}

void OlapChunkSource::_init_counter(RuntimeState* state) {
_scan_timer = ADD_TIMER(_runtime_profile, "ScanTime");
_bytes_read_counter = ADD_COUNTER(_runtime_profile, "BytesRead", TUnit::BYTES);
_rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT);

Expand Down Expand Up @@ -313,7 +313,6 @@ Status OlapChunkSource::_read_chunk_from_storage(RuntimeState* state, vectorized
return Status::Cancelled("canceled state");
}

SCOPED_TIMER(_scan_timer);
do {
RETURN_IF_ERROR(state->check_mem_limit("read chunk from storage"));
RETURN_IF_ERROR(_prj_iter->get_next(chunk));
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class OlapChunkSource final : public ChunkSource {
RuntimeProfile::Counter* _rows_read_counter = nullptr;

RuntimeProfile::Counter* _expr_filter_timer = nullptr;
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _create_seg_iter_timer = nullptr;
RuntimeProfile::Counter* _tablet_counter = nullptr;
RuntimeProfile::Counter* _reader_init_timer = nullptr;
Expand Down
14 changes: 13 additions & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "exec/pipeline/scan/scan_operator.h"

#include <util/time.h>

#include "column/chunk.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/limit_operator.h"
Expand Down Expand Up @@ -305,7 +307,9 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
task.workgroup = _workgroup;
// TODO: consider more factors, such as scan bytes and i/o time.
task.priority = vectorized::OlapScanNode::compute_priority(_submit_task_counter->value());
task.work_function = [wp = _query_ctx, this, state, chunk_source_index, query_trace_ctx, driver_id](int worker_id) {
const auto io_task_start_nano = MonotonicNanos();
task.work_function = [wp = _query_ctx, this, state, chunk_source_index, query_trace_ctx, driver_id,
io_task_start_nano](int worker_id) {
if (auto sp = wp.lock()) {
// Set driver_id here to share some driver-local contents.
// Current it's used by ExprContext's driver-local state
Expand All @@ -318,6 +322,14 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
QUERY_TRACE_ASYNC_START("io_task", category, query_trace_ctx);

auto& chunk_source = _chunk_sources[chunk_source_index];

DeferOp timer_defer([chunk_source]() {
COUNTER_UPDATE(chunk_source->scan_timer(), chunk_source->io_task_wait_timer()->value() +
chunk_source->io_task_exec_timer()->value());
});
COUNTER_UPDATE(chunk_source->io_task_wait_timer(), MonotonicNanos() - io_task_start_nano);
SCOPED_TIMER(chunk_source->io_task_exec_timer());

int64_t prev_cpu_time = chunk_source->get_cpu_time_spent();
int64_t prev_scan_rows = chunk_source->get_scan_rows();
int64_t prev_scan_bytes = chunk_source->get_scan_bytes();
Expand Down

0 comments on commit 12c19a2

Please sign in to comment.