diff --git a/be/src/exec/pipeline/operator.h b/be/src/exec/pipeline/operator.h index 53f52e5869847..c9fac35928990 100644 --- a/be/src/exec/pipeline/operator.h +++ b/be/src/exec/pipeline/operator.h @@ -227,6 +227,7 @@ class Operator { // the memory that can be freed by the current operator size_t revocable_mem_bytes() { return _revocable_mem_bytes; } void set_revocable_mem_bytes(size_t bytes) { _revocable_mem_bytes = bytes; } + int32_t get_driver_sequence() const { return _driver_sequence; } protected: OperatorFactory* _factory; diff --git a/be/src/exec/pipeline/scan/chunk_source.cpp b/be/src/exec/pipeline/scan/chunk_source.cpp index 25a1638598b9b..e7d430a9ca0b1 100644 --- a/be/src/exec/pipeline/scan/chunk_source.cpp +++ b/be/src/exec/pipeline/scan/chunk_source.cpp @@ -18,13 +18,15 @@ #include "common/statusor.h" #include "exec/pipeline/scan/balanced_chunk_buffer.h" +#include "exec/pipeline/scan/scan_operator.h" #include "exec/workgroup/work_group.h" #include "runtime/runtime_state.h" namespace starrocks::pipeline { -ChunkSource::ChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, +ChunkSource::ChunkSource(ScanOperator* scan_op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, BalancedChunkBuffer& chunk_buffer) - : _scan_operator_seq(scan_operator_id), + : _scan_op(scan_op), + _scan_operator_seq(scan_op->get_driver_sequence()), _runtime_profile(runtime_profile), _morsel(std::move(morsel)), _chunk_buffer(chunk_buffer), @@ -37,20 +39,6 @@ Status ChunkSource::prepare(RuntimeState* state) { return Status::OK(); } -StatusOr ChunkSource::get_next_chunk_from_buffer() { - ChunkPtr chunk = nullptr; - _chunk_buffer.try_get(_scan_operator_seq, &chunk); - return chunk; -} - -bool ChunkSource::has_output() const { - return !_chunk_buffer.empty(_scan_operator_seq); -} - -bool ChunkSource::has_shared_output() const { - return !_chunk_buffer.all_empty(); -} - void ChunkSource::pin_chunk_token(ChunkBufferTokenPtr chunk_token) { _chunk_token = std::move(chunk_token); } @@ -72,7 +60,7 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_ SCOPED_RAW_TIMER(&time_spent_ns); if (_chunk_token == nullptr && (_chunk_token = _chunk_buffer.limiter()->pin(1)) == nullptr) { - return _status; + break; } ChunkPtr chunk; @@ -108,7 +96,6 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_ break; } } - return _status; } diff --git a/be/src/exec/pipeline/scan/chunk_source.h b/be/src/exec/pipeline/scan/chunk_source.h index c7759cc1e9763..d4d0e7ecd66eb 100644 --- a/be/src/exec/pipeline/scan/chunk_source.h +++ b/be/src/exec/pipeline/scan/chunk_source.h @@ -29,13 +29,14 @@ class RuntimeProfile; namespace pipeline { +class ScanOperator; class BalancedChunkBuffer; class ChunkBufferToken; using ChunkBufferTokenPtr = std::unique_ptr; class ChunkSource { public: - ChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + ChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, BalancedChunkBuffer& chunk_buffer); virtual ~ChunkSource() = default; @@ -47,10 +48,7 @@ class ChunkSource { // Return true if eos is not reached // Return false if eos is reached or error occurred bool has_next_chunk() const { return _status.ok(); } - bool has_output() const; - bool has_shared_output() const; - StatusOr get_next_chunk_from_buffer(); Status buffer_next_batch_chunks_blocking(RuntimeState* state, size_t batch_size, const workgroup::WorkGroup* running_wg); @@ -82,6 +80,7 @@ class ChunkSource { // if it runs in the worker thread owned by other workgroup, which has running drivers. static constexpr int64_t YIELD_PREEMPT_MAX_TIME_SPENT = 5'000'000L; + ScanOperator* _scan_op; const int32_t _scan_operator_seq; RuntimeProfile* _runtime_profile; // The morsel will own by pipeline driver diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.cpp b/be/src/exec/pipeline/scan/connector_scan_operator.cpp index adbda557f3527..6873cc7e51030 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/connector_scan_operator.cpp @@ -71,8 +71,8 @@ void ConnectorScanOperator::do_close(RuntimeState* state) {} ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { auto* scan_node = down_cast(_scan_node); auto* factory = down_cast(_factory); - return std::make_shared(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(), - std::move(morsel), this, scan_node, factory->get_chunk_buffer()); + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), + std::move(morsel), scan_node, factory->get_chunk_buffer()); } void ConnectorScanOperator::attach_chunk_source(int32_t source_index) { @@ -153,10 +153,9 @@ connector::ConnectorType ConnectorScanOperator::connector_type() { } // ==================== ConnectorChunkSource ==================== -ConnectorChunkSource::ConnectorChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, - MorselPtr&& morsel, ScanOperator* op, ConnectorScanNode* scan_node, - BalancedChunkBuffer& chunk_buffer) - : ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), chunk_buffer), +ConnectorChunkSource::ConnectorChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer) + : ChunkSource(op, runtime_profile, std::move(morsel), chunk_buffer), _scan_node(scan_node), _limit(scan_node->limit()), _runtime_in_filters(op->runtime_in_filters()), diff --git a/be/src/exec/pipeline/scan/connector_scan_operator.h b/be/src/exec/pipeline/scan/connector_scan_operator.h index ecbd2fe399f3c..ec41130313b54 100644 --- a/be/src/exec/pipeline/scan/connector_scan_operator.h +++ b/be/src/exec/pipeline/scan/connector_scan_operator.h @@ -63,9 +63,9 @@ class ConnectorScanOperator : public ScanOperator { Status do_prepare(RuntimeState* state) override; void do_close(RuntimeState* state) override; ChunkSourcePtr create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) override; + connector::ConnectorType connector_type(); - // TODO: refactor it into the base class void attach_chunk_source(int32_t source_index) override; void detach_chunk_source(int32_t source_index) override; bool has_shared_chunk_source() const override; @@ -81,8 +81,8 @@ class ConnectorScanOperator : public ScanOperator { class ConnectorChunkSource : public ChunkSource { public: - ConnectorChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, - ScanOperator* op, ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer); + ConnectorChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer); ~ConnectorChunkSource() override; @@ -91,7 +91,7 @@ class ConnectorChunkSource : public ChunkSource { const std::string get_custom_coredump_msg() const override; protected: - virtual bool _reach_eof() { return _limit != -1 && _rows_read >= _limit; } + virtual bool _reach_eof() const { return _limit != -1 && _rows_read >= _limit; } Status _open_data_source(RuntimeState* state); connector::DataSourcePtr _data_source; diff --git a/be/src/exec/pipeline/scan/meta_chunk_source.cpp b/be/src/exec/pipeline/scan/meta_chunk_source.cpp index 33447ed5790f4..481be7ef8fcb7 100644 --- a/be/src/exec/pipeline/scan/meta_chunk_source.cpp +++ b/be/src/exec/pipeline/scan/meta_chunk_source.cpp @@ -19,10 +19,9 @@ namespace starrocks::pipeline { -MetaChunkSource::MetaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, +MetaChunkSource::MetaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, MetaScanContextPtr scan_ctx) - : ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()), - _scan_ctx(scan_ctx) {} + : ChunkSource(op, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()), _scan_ctx(scan_ctx) {} MetaChunkSource::~MetaChunkSource() {} diff --git a/be/src/exec/pipeline/scan/meta_chunk_source.h b/be/src/exec/pipeline/scan/meta_chunk_source.h index ca12ca9c1a1d8..f35a675267df2 100644 --- a/be/src/exec/pipeline/scan/meta_chunk_source.h +++ b/be/src/exec/pipeline/scan/meta_chunk_source.h @@ -24,8 +24,7 @@ namespace starrocks::pipeline { class MetaChunkSource final : public ChunkSource { public: - MetaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, - MetaScanContextPtr scan_ctx); + MetaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, MetaScanContextPtr scan_ctx); ~MetaChunkSource() override; diff --git a/be/src/exec/pipeline/scan/meta_scan_operator.cpp b/be/src/exec/pipeline/scan/meta_scan_operator.cpp index 8635f0602df66..a860f99d53189 100644 --- a/be/src/exec/pipeline/scan/meta_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/meta_scan_operator.cpp @@ -59,7 +59,7 @@ Status MetaScanOperator::do_prepare(RuntimeState* state) { void MetaScanOperator::do_close(RuntimeState* state) {} ChunkSourcePtr MetaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { - return std::make_shared(_driver_sequence, _runtime_profile.get(), std::move(morsel), _ctx); + return std::make_shared(this, _runtime_profile.get(), std::move(morsel), _ctx); } ChunkPtr MetaScanOperator::get_chunk_from_buffer() { diff --git a/be/src/exec/pipeline/scan/olap_chunk_source.cpp b/be/src/exec/pipeline/scan/olap_chunk_source.cpp index e9b63e0f9c841..7578291707b3f 100644 --- a/be/src/exec/pipeline/scan/olap_chunk_source.cpp +++ b/be/src/exec/pipeline/scan/olap_chunk_source.cpp @@ -33,9 +33,9 @@ namespace starrocks::pipeline { -OlapChunkSource::OlapChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, +OlapChunkSource::OlapChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, OlapScanNode* scan_node, OlapScanContext* scan_ctx) - : ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()), + : ChunkSource(op, runtime_profile, std::move(morsel), scan_ctx->get_chunk_buffer()), _scan_node(scan_node), _scan_ctx(scan_ctx), _limit(scan_node->limit()), diff --git a/be/src/exec/pipeline/scan/olap_chunk_source.h b/be/src/exec/pipeline/scan/olap_chunk_source.h index b96e30560694d..b4965e9dfb3db 100644 --- a/be/src/exec/pipeline/scan/olap_chunk_source.h +++ b/be/src/exec/pipeline/scan/olap_chunk_source.h @@ -40,8 +40,8 @@ class OlapScanContext; class OlapChunkSource final : public ChunkSource { public: - OlapChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, - OlapScanNode* scan_node, OlapScanContext* scan_ctx); + OlapChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, OlapScanNode* scan_node, + OlapScanContext* scan_ctx); ~OlapChunkSource() override; diff --git a/be/src/exec/pipeline/scan/olap_meta_scan_operator.cpp b/be/src/exec/pipeline/scan/olap_meta_scan_operator.cpp index d5058efe0760d..f3ff65924e1a3 100644 --- a/be/src/exec/pipeline/scan/olap_meta_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/olap_meta_scan_operator.cpp @@ -64,7 +64,7 @@ Status OlapMetaScanOperator::do_prepare(RuntimeState* state) { void OlapMetaScanOperator::do_close(RuntimeState* state) {} ChunkSourcePtr OlapMetaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { - return std::make_shared(_driver_sequence, _runtime_profile.get(), std::move(morsel), _ctx); + return std::make_shared(this, _runtime_profile.get(), std::move(morsel), _ctx); } ChunkPtr OlapMetaScanOperator::get_chunk_from_buffer() { diff --git a/be/src/exec/pipeline/scan/olap_scan_operator.cpp b/be/src/exec/pipeline/scan/olap_scan_operator.cpp index 732faf595881d..e127b615d2863 100644 --- a/be/src/exec/pipeline/scan/olap_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/olap_scan_operator.cpp @@ -96,8 +96,8 @@ void OlapScanOperator::do_close(RuntimeState* state) {} ChunkSourcePtr OlapScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { auto* olap_scan_node = down_cast(_scan_node); - return std::make_shared(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(), - std::move(morsel), olap_scan_node, _ctx.get()); + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), std::move(morsel), + olap_scan_node, _ctx.get()); } void OlapScanOperator::attach_chunk_source(int32_t source_index) { diff --git a/be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp b/be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp index 19cfb174a14fb..f799f5ec7ea5f 100644 --- a/be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp +++ b/be/src/exec/pipeline/scan/olap_schema_chunk_source.cpp @@ -21,9 +21,9 @@ namespace starrocks::pipeline { -OlapSchemaChunkSource::OlapSchemaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, - MorselPtr&& morsel, const OlapSchemaScanContextPtr& ctx) - : ChunkSource(scan_operator_id, runtime_profile, std::move(morsel), ctx->get_chunk_buffer()), _ctx(ctx) {} +OlapSchemaChunkSource::OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + const OlapSchemaScanContextPtr& ctx) + : ChunkSource(op, runtime_profile, std::move(morsel), ctx->get_chunk_buffer()), _ctx(ctx) {} OlapSchemaChunkSource::~OlapSchemaChunkSource() = default; diff --git a/be/src/exec/pipeline/scan/olap_schema_chunk_source.h b/be/src/exec/pipeline/scan/olap_schema_chunk_source.h index e75937b1fa33b..216655c4f00dc 100644 --- a/be/src/exec/pipeline/scan/olap_schema_chunk_source.h +++ b/be/src/exec/pipeline/scan/olap_schema_chunk_source.h @@ -29,7 +29,7 @@ namespace pipeline { class OlapSchemaChunkSource final : public ChunkSource { public: - OlapSchemaChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + OlapSchemaChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, const OlapSchemaScanContextPtr& ctx); ~OlapSchemaChunkSource() override; diff --git a/be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp b/be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp index f5a2a4523536b..f07671da5265f 100644 --- a/be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/olap_schema_scan_operator.cpp @@ -55,7 +55,7 @@ Status OlapSchemaScanOperator::do_prepare(RuntimeState* state) { void OlapSchemaScanOperator::do_close(RuntimeState* state) {} ChunkSourcePtr OlapSchemaScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { - return std::make_shared(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(), + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), std::move(morsel), _ctx); } diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index 652f4a0de5401..30324220206e1 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -75,7 +75,6 @@ Status ScanOperator::prepare(RuntimeState* state) { "PeakScanTaskQueueSize", TUnit::UNIT, RuntimeProfile::Counter::create_strategy(TUnit::UNIT)); RETURN_IF_ERROR(do_prepare(state)); - return Status::OK(); } @@ -150,9 +149,11 @@ bool ScanOperator::has_output() const { if (buffer_full) { return chunk_number > 0; } + if (_num_running_io_tasks >= _io_tasks_per_scan_operator) { return false; } + // Can pick up more morsels or submit more tasks if (!_morsel_queue->empty()) { return true; @@ -220,16 +221,14 @@ StatusOr ScanOperator::pull_chunk(RuntimeState* state) { _peak_buffer_size_counter->set(buffer_size()); RETURN_IF_ERROR(_try_to_trigger_next_scan(state)); - ChunkPtr res = get_chunk_from_buffer(); - if (res == nullptr) { - return nullptr; + if (res != nullptr) { + // for query cache mechanism, we should emit EOS chunk when we receive the last chunk. + auto [tablet_id, is_eos] = _should_emit_eos(res); + eval_runtime_bloom_filters(res.get()); + res->owner_info().set_owner_id(tablet_id, is_eos); } - // for query cache mechanism, we should emit EOS chunk when we receive the last chunk. - auto [tablet_id, is_eos] = _should_emit_eos(res); - eval_runtime_bloom_filters(res.get()); - res->owner_info().set_owner_id(tablet_id, is_eos); return res; } @@ -250,7 +249,6 @@ int64_t ScanOperator::global_rf_wait_timeout_ns() const { return 1000'000L * global_rf_collector->scan_wait_timeout_ms(); } - Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) { if (_num_running_io_tasks >= _io_tasks_per_scan_operator) { return Status::OK(); @@ -258,7 +256,6 @@ Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) { if (_unpluging && num_buffered_chunks() >= _buffer_unplug_threshold()) { return Status::OK(); } - // Avoid uneven distribution when io tasks execute very fast, so we start // traverse the chunk_source array from last visit idx int cnt = _io_tasks_per_scan_operator; @@ -460,7 +457,6 @@ Status ScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_index) if (morsel != nullptr) { COUNTER_UPDATE(_morsels_counter, 1); - _chunk_sources[chunk_source_index] = create_chunk_source(std::move(morsel), chunk_source_index); auto status = _chunk_sources[chunk_source_index]->prepare(state); if (!status.ok()) { diff --git a/be/src/exec/stream/scan/stream_scan_operator.cpp b/be/src/exec/stream/scan/stream_scan_operator.cpp index 5143c8daebc46..7386d0853da82 100644 --- a/be/src/exec/stream/scan/stream_scan_operator.cpp +++ b/be/src/exec/stream/scan/stream_scan_operator.cpp @@ -118,8 +118,8 @@ Status StreamScanOperator::_pickup_morsel(RuntimeState* state, int chunk_source_ ChunkSourcePtr StreamScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) { auto* scan_node = down_cast(_scan_node); auto* factory = down_cast(_factory); - return std::make_shared(_driver_sequence, _chunk_source_profiles[chunk_source_index].get(), - std::move(morsel), this, scan_node, factory->get_chunk_buffer()); + return std::make_shared(this, _chunk_source_profiles[chunk_source_index].get(), + std::move(morsel), scan_node, factory->get_chunk_buffer()); } bool StreamScanOperator::is_finished() const { @@ -333,9 +333,9 @@ void StreamScanOperator::_close_chunk_source_unlocked(RuntimeState* state, int c } } -StreamChunkSource::StreamChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, - ScanOperator* op, ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer) - : ConnectorChunkSource(scan_operator_id, runtime_profile, std::move(morsel), op, scan_node, chunk_buffer) {} +StreamChunkSource::StreamChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, + ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer) + : ConnectorChunkSource(op, runtime_profile, std::move(morsel), scan_node, chunk_buffer) {} Status StreamChunkSource::prepare(RuntimeState* state) { RETURN_IF_ERROR(ConnectorChunkSource::prepare(state)); @@ -359,7 +359,7 @@ void StreamChunkSource::reset_status() { _get_stream_data_source()->reset_status(); } -bool StreamChunkSource::_reach_eof() { +bool StreamChunkSource::_reach_eof() const { connector::StreamDataSource* data_source = _get_stream_data_source(); return (_epoch_rows_limit != -1 && data_source->num_rows_read_in_epoch() >= _epoch_rows_limit) || (_epoch_time_limit != -1 && data_source->cpu_time_spent_in_epoch() >= _epoch_time_limit); diff --git a/be/src/exec/stream/scan/stream_scan_operator.h b/be/src/exec/stream/scan/stream_scan_operator.h index 04b3858f42892..b8996dd47bc52 100644 --- a/be/src/exec/stream/scan/stream_scan_operator.h +++ b/be/src/exec/stream/scan/stream_scan_operator.h @@ -102,7 +102,7 @@ class StreamScanOperator final : public ConnectorScanOperator { class StreamChunkSource : public ConnectorChunkSource { public: - StreamChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel, ScanOperator* op, + StreamChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel, ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer); Status prepare(RuntimeState* state) override; @@ -116,9 +116,9 @@ class StreamChunkSource : public ConnectorChunkSource { } protected: - bool _reach_eof() override; + bool _reach_eof() const override; - connector::StreamDataSource* _get_stream_data_source() { + connector::StreamDataSource* _get_stream_data_source() const { return down_cast(_data_source.get()); } diff --git a/be/test/exec/stream/stream_operators_test.cpp b/be/test/exec/stream/stream_operators_test.cpp index d91bb08f87e6e..b75bc45a51e28 100644 --- a/be/test/exec/stream/stream_operators_test.cpp +++ b/be/test/exec/stream/stream_operators_test.cpp @@ -27,6 +27,12 @@ namespace starrocks::stream { +#define ASSERT_IF_ERROR(stmt) \ + do { \ + auto&& st__ = (stmt); \ + ASSERT_TRUE(st__.ok()) << st__; \ + } while (0) + bool GeneratorStreamSourceOperator::is_trigger_finished(const EpochInfo& epoch_info) { auto trigger_mode = epoch_info.trigger_mode; switch (trigger_mode) { @@ -77,12 +83,12 @@ class StreamOperatorsTest : public StreamPipelineTest, public StreamTestBase { void CheckResult(std::vector epoch_results, std::vector>> expect_results) { - DCHECK(!epoch_results.empty()); + ASSERT_TRUE(!epoch_results.empty()); for (size_t i = 0; i < epoch_results.size(); i++) { auto result = epoch_results[i]; auto columns = result->columns(); auto expect = expect_results[i]; - DCHECK_EQ(columns.size(), expect.size()); + ASSERT_EQ(columns.size(), expect.size()); for (size_t j = 0; j < expect.size(); j++) { CheckColumn(columns[j], expect[j]); } @@ -177,12 +183,12 @@ void StreamOperatorsTest::_generate_morse_queue(ConnectorScanNode* scan_node, auto morsel_queue_factory = scan_node->convert_scan_range_to_morsel_queue_factory( scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), degree_of_parallelism, true, TTabletInternalParallelMode::type::AUTO); - DCHECK(morsel_queue_factory.ok()); + ASSERT_TRUE(morsel_queue_factory.ok()); morsel_queue_factories.emplace(scan_node->id(), std::move(morsel_queue_factory).value()); } TEST_F(StreamOperatorsTest, Dop_1) { - DCHECK_IF_ERROR(start_mv([&]() { + ASSERT_IF_ERROR(start_mv([&]() { _degree_of_parallelism = 1; _pipeline_builder = [&](RuntimeState* state) { OpFactories op_factories{ @@ -197,15 +203,15 @@ TEST_F(StreamOperatorsTest, Dop_1) { })); EpochInfo epoch_info{.epoch_id = 0, .trigger_mode = TriggerMode::MANUAL}; - DCHECK_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); - DCHECK_IF_ERROR(wait_until_epoch_finished(epoch_info)); + ASSERT_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); + ASSERT_IF_ERROR(wait_until_epoch_finished(epoch_info)); CheckResult(fetch_results(epoch_info), {{{1, 2, 3, 4}, {5, 6, 7, 8}}}); stop_mv(); } TEST_F(StreamOperatorsTest, MultiDop_4) { - DCHECK_IF_ERROR(start_mv([&]() { + ASSERT_IF_ERROR(start_mv([&]() { _degree_of_parallelism = 4; _pipeline_builder = [&](RuntimeState* state) { OpFactories op_factories; @@ -226,8 +232,8 @@ TEST_F(StreamOperatorsTest, MultiDop_4) { })); EpochInfo epoch_info{.epoch_id = 0, .trigger_mode = TriggerMode::MANUAL}; - DCHECK_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); - DCHECK_IF_ERROR(wait_until_epoch_finished(epoch_info)); + ASSERT_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); + ASSERT_IF_ERROR(wait_until_epoch_finished(epoch_info)); CheckResult(fetch_results(epoch_info), {{{1, 2, 3, 4}, {5, 6, 7, 0}}, // chunk 0 {{1, 2, 3, 4}, {5, 6, 7, 0}}, // chunk 1 {{1, 2, 3, 4}, {5, 6, 7, 0}}, @@ -237,7 +243,7 @@ TEST_F(StreamOperatorsTest, MultiDop_4) { } TEST_F(StreamOperatorsTest, Test_StreamAggregator_Dop1) { - DCHECK_IF_ERROR(start_mv([&]() { + ASSERT_IF_ERROR(start_mv([&]() { _degree_of_parallelism = 1; _pipeline_builder = [&](RuntimeState* state) { _slot_infos = std::vector>{ @@ -280,8 +286,8 @@ TEST_F(StreamOperatorsTest, Test_StreamAggregator_Dop1) { for (auto i = 0; i < 3; i++) { EpochInfo epoch_info{.epoch_id = i, .trigger_mode = TriggerMode::MANUAL}; - DCHECK_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); - DCHECK_IF_ERROR(wait_until_epoch_finished(epoch_info)); + ASSERT_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); + ASSERT_IF_ERROR(wait_until_epoch_finished(epoch_info)); CheckResult(fetch_results(epoch_info), {{{1, 2, 3, 0}, {i + 1, i + 1, i + 1, i + 1}}}); } @@ -290,7 +296,7 @@ TEST_F(StreamOperatorsTest, Test_StreamAggregator_Dop1) { } TEST_F(StreamOperatorsTest, Test_StreamAggregator_MultiDop) { - DCHECK_IF_ERROR(start_mv([&]() { + ASSERT_IF_ERROR(start_mv([&]() { _degree_of_parallelism = 4; _pipeline_builder = [&](RuntimeState* state) { _slot_infos = std::vector>{ @@ -337,8 +343,8 @@ TEST_F(StreamOperatorsTest, Test_StreamAggregator_MultiDop) { for (auto i = 0; i < 10; i++) { EpochInfo epoch_info{.epoch_id = i, .trigger_mode = TriggerMode::MANUAL}; - DCHECK_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); - DCHECK_IF_ERROR(wait_until_epoch_finished(epoch_info)); + ASSERT_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); + ASSERT_IF_ERROR(wait_until_epoch_finished(epoch_info)); CheckResult(fetch_results(epoch_info), {{{1, 2, 3, 4}, {(i + 1) * 4, (i + 1) * 4, (i + 1) * 4, (i + 1) * 4}}}); sleep(0.5); @@ -347,7 +353,7 @@ TEST_F(StreamOperatorsTest, Test_StreamAggregator_MultiDop) { } TEST_F(StreamOperatorsTest, binlog_dop_1) { - DCHECK_IF_ERROR(start_mv([&]() { + ASSERT_IF_ERROR(start_mv([&]() { _degree_of_parallelism = 1; _pipeline_builder = [&](RuntimeState* state) { auto* descs = _create_table_desc(2, 4); @@ -371,14 +377,14 @@ TEST_F(StreamOperatorsTest, binlog_dop_1) { EpochInfo epoch_info{ .epoch_id = 0, .max_exec_millis = -1, .max_scan_rows = -1, .trigger_mode = TriggerMode::MANUAL}; - DCHECK_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); - DCHECK_IF_ERROR(wait_until_epoch_finished(epoch_info)); + ASSERT_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); + ASSERT_IF_ERROR(wait_until_epoch_finished(epoch_info)); CheckResult(fetch_results(epoch_info), {{{1, 2, 3, 4}, {5, 6, 7, 8}}}); stop_mv(); } TEST_F(StreamOperatorsTest, binlog_dop_1_multi_epoch) { - DCHECK_IF_ERROR(start_mv([&]() { + ASSERT_IF_ERROR(start_mv([&]() { _degree_of_parallelism = 1; _pipeline_builder = [&](RuntimeState* state) { auto* descs = _create_table_desc(2, 4); @@ -403,8 +409,8 @@ TEST_F(StreamOperatorsTest, binlog_dop_1_multi_epoch) { for (auto i = 0; i < 3; i++) { EpochInfo epoch_info{ .epoch_id = 0, .max_exec_millis = -1, .max_scan_rows = -1, .trigger_mode = TriggerMode::MANUAL}; - DCHECK_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); - DCHECK_IF_ERROR(wait_until_epoch_finished(epoch_info)); + ASSERT_IF_ERROR(start_epoch(_tablet_ids, epoch_info)); + ASSERT_IF_ERROR(wait_until_epoch_finished(epoch_info)); CheckResult(fetch_results(epoch_info), {{{1, 2, 3, 4}, {5, 6, 7, 8}}}); }