Skip to content

Commit

Permalink
[Refactor] refactor on chunk source and scan operator (StarRocks#21405)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirtysalt authored Apr 12, 2023
1 parent b2d80a3 commit 2d341ed
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 91 deletions.
1 change: 1 addition & 0 deletions be/src/exec/pipeline/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class Operator {
// the memory that can be freed by the current operator
size_t revocable_mem_bytes() { return _revocable_mem_bytes; }
void set_revocable_mem_bytes(size_t bytes) { _revocable_mem_bytes = bytes; }
int32_t get_driver_sequence() const { return _driver_sequence; }

protected:
OperatorFactory* _factory;
Expand Down
23 changes: 5 additions & 18 deletions be/src/exec/pipeline/scan/chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

#include "common/statusor.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/pipeline/scan/scan_operator.h"
#include "exec/workgroup/work_group.h"
#include "runtime/runtime_state.h"
namespace starrocks::pipeline {

ChunkSource::ChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ChunkSource::ChunkSource(ScanOperator* scan_op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
BalancedChunkBuffer& chunk_buffer)
: _scan_operator_seq(scan_operator_id),
: _scan_op(scan_op),
_scan_operator_seq(scan_op->get_driver_sequence()),
_runtime_profile(runtime_profile),
_morsel(std::move(morsel)),
_chunk_buffer(chunk_buffer),
Expand All @@ -37,20 +39,6 @@ Status ChunkSource::prepare(RuntimeState* state) {
return Status::OK();
}

StatusOr<ChunkPtr> ChunkSource::get_next_chunk_from_buffer() {
ChunkPtr chunk = nullptr;
_chunk_buffer.try_get(_scan_operator_seq, &chunk);
return chunk;
}

bool ChunkSource::has_output() const {
return !_chunk_buffer.empty(_scan_operator_seq);
}

bool ChunkSource::has_shared_output() const {
return !_chunk_buffer.all_empty();
}

void ChunkSource::pin_chunk_token(ChunkBufferTokenPtr chunk_token) {
_chunk_token = std::move(chunk_token);
}
Expand All @@ -72,7 +60,7 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_
SCOPED_RAW_TIMER(&time_spent_ns);

if (_chunk_token == nullptr && (_chunk_token = _chunk_buffer.limiter()->pin(1)) == nullptr) {
return _status;
break;
}

ChunkPtr chunk;
Expand Down Expand Up @@ -108,7 +96,6 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_
break;
}
}

return _status;
}

Expand Down
7 changes: 3 additions & 4 deletions be/src/exec/pipeline/scan/chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ class RuntimeProfile;

namespace pipeline {

class ScanOperator;
class BalancedChunkBuffer;
class ChunkBufferToken;
using ChunkBufferTokenPtr = std::unique_ptr<ChunkBufferToken>;

class ChunkSource {
public:
ChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
BalancedChunkBuffer& chunk_buffer);

virtual ~ChunkSource() = default;
Expand All @@ -47,10 +48,7 @@ class ChunkSource {
// Return true if eos is not reached
// Return false if eos is reached or error occurred
bool has_next_chunk() const { return _status.ok(); }
bool has_output() const;
bool has_shared_output() const;

StatusOr<ChunkPtr> get_next_chunk_from_buffer();
Status buffer_next_batch_chunks_blocking(RuntimeState* state, size_t batch_size,
const workgroup::WorkGroup* running_wg);

Expand Down Expand Up @@ -82,6 +80,7 @@ class ChunkSource {
// if it runs in the worker thread owned by other workgroup, which has running drivers.
static constexpr int64_t YIELD_PREEMPT_MAX_TIME_SPENT = 5'000'000L;

ScanOperator* _scan_op;
const int32_t _scan_operator_seq;
RuntimeProfile* _runtime_profile;
// The morsel will own by pipeline driver
Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ void ConnectorScanOperator::do_close(RuntimeState* state) {}
ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* scan_node = down_cast<ConnectorScanNode*>(_scan_node);
auto* factory = down_cast<ConnectorScanOperatorFactory*>(_factory);
return std::make_shared<ConnectorChunkSource>(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), this, scan_node, factory->get_chunk_buffer());
return std::make_shared<ConnectorChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), scan_node, factory->get_chunk_buffer());
}

void ConnectorScanOperator::attach_chunk_source(int32_t source_index) {
Expand Down Expand Up @@ -153,10 +153,9 @@ connector::ConnectorType ConnectorScanOperator::connector_type() {
}

// ==================== ConnectorChunkSource ====================
ConnectorChunkSource::ConnectorChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile,
MorselPtr&& morsel, ScanOperator* op, ConnectorScanNode* scan_node,
BalancedChunkBuffer& chunk_buffer)
: ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), chunk_buffer),
ConnectorChunkSource::ConnectorChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer)
: ChunkSource(op, runtime_profile, std::move(morsel), chunk_buffer),
_scan_node(scan_node),
_limit(scan_node->limit()),
_runtime_in_filters(op->runtime_in_filters()),
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class ConnectorScanOperator : public ScanOperator {
Status do_prepare(RuntimeState* state) override;
void do_close(RuntimeState* state) override;
ChunkSourcePtr create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) override;

connector::ConnectorType connector_type();

// TODO: refactor it into the base class
void attach_chunk_source(int32_t source_index) override;
void detach_chunk_source(int32_t source_index) override;
bool has_shared_chunk_source() const override;
Expand All @@ -81,8 +81,8 @@ class ConnectorScanOperator : public ScanOperator {

class ConnectorChunkSource : public ChunkSource {
public:
ConnectorChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ScanOperator* op, ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer);
ConnectorChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer);

~ConnectorChunkSource() override;

Expand All @@ -91,7 +91,7 @@ class ConnectorChunkSource : public ChunkSource {
const std::string get_custom_coredump_msg() const override;

protected:
virtual bool _reach_eof() { return _limit != -1 && _rows_read >= _limit; }
virtual bool _reach_eof() const { return _limit != -1 && _rows_read >= _limit; }
Status _open_data_source(RuntimeState* state);

connector::DataSourcePtr _data_source;
Expand Down
5 changes: 2 additions & 3 deletions be/src/exec/pipeline/scan/meta_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

namespace starrocks::pipeline {

MetaChunkSource::MetaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
MetaChunkSource::MetaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
MetaScanContextPtr scan_ctx)
: ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()),
_scan_ctx(scan_ctx) {}
: ChunkSource(op, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()), _scan_ctx(scan_ctx) {}

MetaChunkSource::~MetaChunkSource() {}

Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/scan/meta_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ namespace starrocks::pipeline {

class MetaChunkSource final : public ChunkSource {
public:
MetaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
MetaScanContextPtr scan_ctx);
MetaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, MetaScanContextPtr scan_ctx);

~MetaChunkSource() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/meta_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Status MetaScanOperator::do_prepare(RuntimeState* state) {
void MetaScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr MetaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
return std::make_shared<MetaChunkSource>(_driver_sequence, _runtime_profile.get(), std::move(morsel), _ctx);
return std::make_shared<MetaChunkSource>(this, _runtime_profile.get(), std::move(morsel), _ctx);
}

ChunkPtr MetaScanOperator::get_chunk_from_buffer() {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@

namespace starrocks::pipeline {

OlapChunkSource::OlapChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
OlapChunkSource::OlapChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
OlapScanNode* scan_node, OlapScanContext* scan_ctx)
: ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()),
: ChunkSource(op, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()),
_scan_node(scan_node),
_scan_ctx(scan_ctx),
_limit(scan_node->limit()),
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class OlapScanContext;

class OlapChunkSource final : public ChunkSource {
public:
OlapChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
OlapScanNode* scan_node, OlapScanContext* scan_ctx);
OlapChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, OlapScanNode* scan_node,
OlapScanContext* scan_ctx);

~OlapChunkSource() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/olap_meta_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Status OlapMetaScanOperator::do_prepare(RuntimeState* state) {
void OlapMetaScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr OlapMetaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
return std::make_shared<OlapMetaChunkSource>(_driver_sequence, _runtime_profile.get(), std::move(morsel), _ctx);
return std::make_shared<OlapMetaChunkSource>(this, _runtime_profile.get(), std::move(morsel), _ctx);
}

ChunkPtr OlapMetaScanOperator::get_chunk_from_buffer() {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/scan/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ void OlapScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr OlapScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* olap_scan_node = down_cast<OlapScanNode*>(_scan_node);
return std::make_shared<OlapChunkSource>(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), olap_scan_node, _ctx.get());
return std::make_shared<OlapChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(), std::move(morsel),
olap_scan_node, _ctx.get());
}

void OlapScanOperator::attach_chunk_source(int32_t source_index) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace starrocks::pipeline {

OlapSchemaChunkSource::OlapSchemaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile,
MorselPtr&& morsel, const OlapSchemaScanContextPtr& ctx)
: ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), ctx->get_chunk_buffer()), _ctx(ctx) {}
OlapSchemaChunkSource::OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
const OlapSchemaScanContextPtr& ctx)
: ChunkSource(op, runtime_profile, std::move(morsel), ctx->get_chunk_buffer()), _ctx(ctx) {}

OlapSchemaChunkSource::~OlapSchemaChunkSource() = default;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/olap_schema_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace pipeline {

class OlapSchemaChunkSource final : public ChunkSource {
public:
OlapSchemaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
const OlapSchemaScanContextPtr& ctx);

~OlapSchemaChunkSource() override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status OlapSchemaScanOperator::do_prepare(RuntimeState* state) {
void OlapSchemaScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr OlapSchemaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
return std::make_shared<OlapSchemaChunkSource>(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(),
return std::make_shared<OlapSchemaChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), _ctx);
}

Expand Down
18 changes: 7 additions & 11 deletions be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ Status ScanOperator::prepare(RuntimeState* state) {
"PeakScanTaskQueueSize", TUnit::UNIT, RuntimeProfile::Counter::create_strategy(TUnit::UNIT));

RETURN_IF_ERROR(do_prepare(state));

return Status::OK();
}

Expand Down Expand Up @@ -150,9 +149,11 @@ bool ScanOperator::has_output() const {
if (buffer_full) {
return chunk_number > 0;
}

if (_num_running_io_tasks >= _io_tasks_per_scan_operator) {
return false;
}

// Can pick up more morsels or submit more tasks
if (!_morsel_queue->empty()) {
return true;
Expand Down Expand Up @@ -220,16 +221,14 @@ StatusOr<ChunkPtr> ScanOperator::pull_chunk(RuntimeState* state) {
_peak_buffer_size_counter->set(buffer_size());

RETURN_IF_ERROR(_try_to_trigger_next_scan(state));

ChunkPtr res = get_chunk_from_buffer();
if (res == nullptr) {
return nullptr;
if (res != nullptr) {
// for query cache mechanism, we should emit EOS chunk when we receive the last chunk.
auto [tablet_id, is_eos] = _should_emit_eos(res);
eval_runtime_bloom_filters(res.get());
res->owner_info().set_owner_id(tablet_id, is_eos);
}

// for query cache mechanism, we should emit EOS chunk when we receive the last chunk.
auto [tablet_id, is_eos] = _should_emit_eos(res);
eval_runtime_bloom_filters(res.get());
res->owner_info().set_owner_id(tablet_id, is_eos);
return res;
}

Expand All @@ -250,15 +249,13 @@ int64_t ScanOperator::global_rf_wait_timeout_ns() const {

return 1000'000L * global_rf_collector->scan_wait_timeout_ms();
}

Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) {
if (_num_running_io_tasks >= _io_tasks_per_scan_operator) {
return Status::OK();
}
if (_unpluging && num_buffered_chunks() >= _buffer_unplug_threshold()) {
return Status::OK();
}

// Avoid uneven distribution when io tasks execute very fast, so we start
// traverse the chunk_source array from last visit idx
int cnt = _io_tasks_per_scan_operator;
Expand Down Expand Up @@ -460,7 +457,6 @@ Status ScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_index)

if (morsel != nullptr) {
COUNTER_UPDATE(_morsels_counter, 1);

_chunk_sources[chunk_source_index] = create_chunk_source(std::move(morsel), chunk_source_index);
auto status = _chunk_sources[chunk_source_index]->prepare(state);
if (!status.ok()) {
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/stream/scan/stream_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ Status StreamScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_
ChunkSourcePtr StreamScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* scan_node = down_cast<ConnectorScanNode*>(_scan_node);
auto* factory = down_cast<StreamScanOperatorFactory*>(_factory);
return std::make_shared<StreamChunkSource>(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), this, scan_node, factory->get_chunk_buffer());
return std::make_shared<StreamChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), scan_node, factory->get_chunk_buffer());
}

bool StreamScanOperator::is_finished() const {
Expand Down Expand Up @@ -333,9 +333,9 @@ void StreamScanOperator::_close_chunk_source_unlocked(RuntimeState* state, int c
}
}

StreamChunkSource::StreamChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ScanOperator* op, ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer)
: ConnectorChunkSource(scan_operator_id, runtime_profile, std::move(morsel), op, scan_node, chunk_buffer) {}
StreamChunkSource::StreamChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer)
: ConnectorChunkSource(op, runtime_profile, std::move(morsel), scan_node, chunk_buffer) {}

Status StreamChunkSource::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ConnectorChunkSource::prepare(state));
Expand All @@ -359,7 +359,7 @@ void StreamChunkSource::reset_status() {
_get_stream_data_source()->reset_status();
}

bool StreamChunkSource::_reach_eof() {
bool StreamChunkSource::_reach_eof() const {
connector::StreamDataSource* data_source = _get_stream_data_source();
return (_epoch_rows_limit != -1 && data_source->num_rows_read_in_epoch() >= _epoch_rows_limit) ||
(_epoch_time_limit != -1 && data_source->cpu_time_spent_in_epoch() >= _epoch_time_limit);
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/stream/scan/stream_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class StreamScanOperator final : public ConnectorScanOperator {

class StreamChunkSource : public ConnectorChunkSource {
public:
StreamChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, ScanOperator* op,
StreamChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer);

Status prepare(RuntimeState* state) override;
Expand All @@ -116,9 +116,9 @@ class StreamChunkSource : public ConnectorChunkSource {
}

protected:
bool _reach_eof() override;
bool _reach_eof() const override;

connector::StreamDataSource* _get_stream_data_source() {
connector::StreamDataSource* _get_stream_data_source() const {
return down_cast<connector::StreamDataSource*>(_data_source.get());
}

Expand Down
Loading

0 comments on commit 2d341ed

Please sign in to comment.