From 4a610dd6dbb93a89d86d96ebf1e37aac0ec189ba Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 14 Nov 2023 08:28:49 +0800 Subject: [PATCH] [fix](move-memtable) pass num local sink to backends (#26897) --- be/src/olap/delta_writer_v2.cpp | 3 ++ be/src/pipeline/pipeline_fragment_context.cpp | 1 + .../pipeline_x_fragment_context.cpp | 1 + be/src/runtime/plan_fragment_executor.cpp | 1 + be/src/runtime/runtime_state.h | 5 +++ be/src/vec/sink/delta_writer_v2_pool.cpp | 35 +++++++++++----- be/src/vec/sink/delta_writer_v2_pool.h | 17 ++++---- be/src/vec/sink/load_stream_stub.cpp | 23 +++++++---- be/src/vec/sink/load_stream_stub.h | 9 ++-- be/src/vec/sink/load_stream_stub_pool.cpp | 41 +++++++++++++------ be/src/vec/sink/load_stream_stub_pool.h | 26 ++++++++++-- be/src/vec/sink/vtablet_sink_v2.cpp | 33 +++++++++------ be/src/vec/sink/vtablet_sink_v2.h | 13 ++++-- .../io/fs/stream_sink_file_writer_test.cpp | 2 +- .../vec/exec/delta_writer_v2_pool_test.cpp | 8 ++-- .../vec/exec/load_stream_stub_pool_test.cpp | 12 +++--- .../doris/planner/StreamLoadPlanner.java | 2 + .../java/org/apache/doris/qe/Coordinator.java | 18 +++++--- gensrc/thrift/PaloInternalService.thrift | 3 ++ 19 files changed, 177 insertions(+), 76 deletions(-) diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index c87cf7510a43fb..0a4108970a60c4 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -100,6 +100,9 @@ Status DeltaWriterV2::init() { return Status::OK(); } // build tablet schema in request level + if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { + return Status::InternalError("failed to find tablet schema for {}", _req.index_id); + } _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_streams[0]->tablet_schema(_req.index_id)); RowsetWriterContext context; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 02b991a7aa6654..9610122bc026c4 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -310,6 +310,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->set_num_per_fragment_instances(request.num_senders); _runtime_state->set_load_stream_per_node(request.load_stream_per_node); _runtime_state->set_total_load_streams(request.total_load_streams); + _runtime_state->set_num_local_sink(request.num_local_sink); if (request.fragment.__isset.output_sink) { RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index aa0297097174e0..47805a04772337 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -203,6 +203,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->set_num_per_fragment_instances(request.num_senders); _runtime_state->set_load_stream_per_node(request.load_stream_per_node); _runtime_state->set_total_load_streams(request.total_load_streams); + _runtime_state->set_num_local_sink(request.num_local_sink); // 2. Build pipelines with operators in this fragment. auto root_pipeline = add_pipeline(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 9327f96a5d553b..3af870c7f07149 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -214,6 +214,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { _runtime_state->set_num_per_fragment_instances(params.num_senders); _runtime_state->set_load_stream_per_node(request.load_stream_per_node); _runtime_state->set_total_load_streams(request.total_load_streams); + _runtime_state->set_num_local_sink(request.num_local_sink); // set up sink, if required if (request.fragment.__isset.output_sink) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index aae2fb3cce401a..3b420511fa012a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -319,6 +319,10 @@ class RuntimeState { int total_load_streams() const { return _total_load_streams; } + void set_num_local_sink(int num_local_sink) { _num_local_sink = num_local_sink; } + + int num_local_sink() const { return _num_local_sink; } + bool disable_stream_preaggregations() const { return _query_options.disable_stream_preaggregations; } @@ -553,6 +557,7 @@ class RuntimeState { int _num_per_fragment_instances = 0; int _load_stream_per_node = 0; int _total_load_streams = 0; + int _num_local_sink = 0; // The backend id on which this fragment instance runs int64_t _backend_id = -1; diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp index b9057136d9709b..dc9a2765a55b17 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.cpp +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -25,7 +25,8 @@ class TExpr; namespace vectorized { -DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), _use_cnt(1) {} +DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2Pool* pool) + : _load_id(load_id), _use_cnt(num_use), _pool(pool) {} DeltaWriterV2Map::~DeltaWriterV2Map() = default; @@ -38,9 +39,15 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create( } Status DeltaWriterV2Map::close(RuntimeProfile* profile) { - if (--_use_cnt > 0) { + int num_use = --_use_cnt; + if (num_use > 0) { + LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " , use_cnt = " << num_use; return Status::OK(); } + LOG(INFO) << "closing DeltaWriterV2Map " << _load_id; + if (_pool != nullptr) { + _pool->erase(_load_id); + } Status status = Status::OK(); _map.for_each([&status](auto& entry) { if (status.ok()) { @@ -59,6 +66,11 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) { } void DeltaWriterV2Map::cancel(Status status) { + int num_use = --_use_cnt; + LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = " << num_use; + if (num_use == 0 && _pool != nullptr) { + _pool->erase(_load_id); + } _map.for_each([&status](auto& entry) { static_cast(entry.second->cancel_with_status(status)); }); @@ -68,23 +80,24 @@ DeltaWriterV2Pool::DeltaWriterV2Pool() = default; DeltaWriterV2Pool::~DeltaWriterV2Pool() = default; -std::shared_ptr DeltaWriterV2Pool::get_or_create(PUniqueId load_id) { +std::shared_ptr DeltaWriterV2Pool::get_or_create(PUniqueId load_id, + int num_sink) { UniqueId id {load_id}; std::lock_guard lock(_mutex); - std::shared_ptr map = _pool[id].lock(); + std::shared_ptr map = _pool[id]; if (map) { - map->grab(); return map; } - auto deleter = [this](DeltaWriterV2Map* m) { - std::lock_guard lock(_mutex); - _pool.erase(m->unique_id()); - delete m; - }; - map = std::shared_ptr(new DeltaWriterV2Map(id), deleter); + map = std::make_shared(id, num_sink, this); _pool[id] = map; return map; } +void DeltaWriterV2Pool::erase(UniqueId load_id) { + std::lock_guard lock(_mutex); + LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id; + _pool.erase(load_id); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/delta_writer_v2_pool.h b/be/src/vec/sink/delta_writer_v2_pool.h index 8439062440a5a4..f05b144200fe1f 100644 --- a/be/src/vec/sink/delta_writer_v2_pool.h +++ b/be/src/vec/sink/delta_writer_v2_pool.h @@ -58,26 +58,24 @@ class RuntimeProfile; namespace vectorized { +class DeltaWriterV2Pool; + class DeltaWriterV2Map { public: - DeltaWriterV2Map(UniqueId load_id); + DeltaWriterV2Map(UniqueId load_id, int num_use = 1, DeltaWriterV2Pool* pool = nullptr); ~DeltaWriterV2Map(); - void grab() { ++_use_cnt; } - // get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map DeltaWriterV2* get_or_create(int64_t tablet_id, std::function()> creator); // close all delta writers in this DeltaWriterV2Map if there is no other users - Status close(RuntimeProfile* profile); + Status close(RuntimeProfile* profile = nullptr); // cancel all delta writers in this DeltaWriterV2Map void cancel(Status status); - UniqueId unique_id() const { return _load_id; } - size_t size() const { return _map.size(); } private: @@ -89,6 +87,7 @@ class DeltaWriterV2Map { UniqueId _load_id; TabletToDeltaWriterV2Map _map; std::atomic _use_cnt; + DeltaWriterV2Pool* _pool; }; class DeltaWriterV2Pool { @@ -97,7 +96,9 @@ class DeltaWriterV2Pool { ~DeltaWriterV2Pool(); - std::shared_ptr get_or_create(PUniqueId load_id); + std::shared_ptr get_or_create(PUniqueId load_id, int num_sink = 1); + + void erase(UniqueId load_id); size_t size() { std::lock_guard lock(_mutex); @@ -106,7 +107,7 @@ class DeltaWriterV2Pool { private: std::mutex _mutex; - std::unordered_map> _pool; + std::unordered_map> _pool; }; } // namespace vectorized diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index c2f7f246f30d60..75b814dc23f2de 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -83,14 +83,16 @@ void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) { _close_cv.notify_all(); } -LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id) - : _load_id(load_id), +LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use) + : _use_cnt(num_use), + _load_id(load_id), _src_id(src_id), _tablet_schema_for_index(std::make_shared()), _enable_unique_mow_for_index(std::make_shared()) {}; LoadStreamStub::LoadStreamStub(LoadStreamStub& stub) - : _load_id(stub._load_id), + : _use_cnt(stub._use_cnt.load()), + _load_id(stub._load_id), _src_id(stub._src_id), _tablet_schema_for_index(stub._tablet_schema_for_index), _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {}; @@ -107,7 +109,6 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, int total_streams, bool enable_profile) { - _num_open++; std::unique_lock lock(_mutex); if (_is_init.load()) { return Status::OK(); @@ -190,15 +191,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector& tablets_to_commit) { - if (--_num_open > 0) { + { + std::lock_guard lock(_tablets_to_commit_mutex); + _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), + tablets_to_commit.end()); + } + if (--_use_cnt > 0) { return Status::OK(); } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); - for (const auto& tablet : tablets_to_commit) { - *header.add_tablets_to_commit() = tablet; + { + std::lock_guard lock(_tablets_to_commit_mutex); + for (const auto& tablet : _tablets_to_commit) { + *header.add_tablets_to_commit() = tablet; + } } return _encode_and_send(header); } diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 3650b2aeae2188..b17acea4cb337c 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -134,7 +134,7 @@ class LoadStreamStub { public: // construct new stub - LoadStreamStub(PUniqueId load_id, int64_t src_id); + LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use); // copy constructor, shared_ptr members are shared LoadStreamStub(LoadStreamStub& stub); @@ -177,7 +177,7 @@ class LoadStreamStub { } std::shared_ptr tablet_schema(int64_t index_id) const { - return _tablet_schema_for_index->at(index_id); + return (*_tablet_schema_for_index)[index_id]; } bool enable_unique_mow(int64_t index_id) const { @@ -203,7 +203,10 @@ class LoadStreamStub { std::atomic _is_init; bthread::Mutex _mutex; - std::atomic _num_open; + std::atomic _use_cnt; + + std::mutex _tablets_to_commit_mutex; + std::vector _tablets_to_commit; std::mutex _buffer_mutex; std::mutex _send_mutex; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index 5848ca5a51aed9..ec7e53211fb41f 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -24,33 +24,50 @@ class TExpr; namespace stream_load { +LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool) + : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {} + +void LoadStreams::release() { + int num_use = --_use_cnt; + if (num_use == 0) { + LOG(INFO) << "releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id; + _pool->erase(_load_id, _dst_id); + } else { + LOG(INFO) << "no releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id + << ", use_cnt = " << num_use; + } +} + LoadStreamStubPool::LoadStreamStubPool() = default; LoadStreamStubPool::~LoadStreamStubPool() = default; -std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id, - int64_t dst_id, int num_streams) { + +std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id, + int64_t dst_id, int num_streams, + int num_sink) { auto key = std::make_pair(UniqueId(load_id), dst_id); std::lock_guard lock(_mutex); - std::shared_ptr streams = _pool[key].lock(); + std::shared_ptr streams = _pool[key]; if (streams) { return streams; } DCHECK(num_streams > 0) << "stream num should be greater than 0"; - auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id}); - auto deleter = [this, key](Streams* s) { - std::lock_guard lock(_mutex); - _pool.erase(key); - _template_stubs.erase(key.first); - delete s; - }; - streams = std::shared_ptr(new Streams(), deleter); + DCHECK(num_sink > 0) << "sink num should be greater than 0"; + auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink}); + streams = std::make_shared(load_id, dst_id, num_sink, this); for (int32_t i = 0; i < num_streams; i++) { // copy construct, internal tablet schema map will be shared among all stubs - streams->emplace_back(new LoadStreamStub {*it->second}); + streams->streams().emplace_back(new LoadStreamStub {*it->second}); } _pool[key] = streams; return streams; } +void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) { + std::lock_guard lock(_mutex); + _pool.erase(std::make_pair(load_id, dst_id)); + _template_stubs.erase(load_id); +} + } // namespace stream_load } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index 73b41fdd61adc8..2cf55be4915cca 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -70,16 +70,36 @@ class LoadStreamStub; namespace stream_load { +class LoadStreamStubPool; + using Streams = std::vector>; +class LoadStreams { +public: + LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool); + + void release(); + + Streams& streams() { return _streams; } + +private: + Streams _streams; + UniqueId _load_id; + int64_t _dst_id; + std::atomic _use_cnt; + LoadStreamStubPool* _pool; +}; + class LoadStreamStubPool { public: LoadStreamStubPool(); ~LoadStreamStubPool(); - std::shared_ptr get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id, - int num_streams); + std::shared_ptr get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id, + int num_streams, int num_sink); + + void erase(UniqueId load_id, int64_t dst_id); size_t size() { std::lock_guard lock(_mutex); @@ -95,7 +115,7 @@ class LoadStreamStubPool { private: std::mutex _mutex; std::unordered_map> _template_stubs; - std::unordered_map, std::weak_ptr> _pool; + std::unordered_map, std::shared_ptr> _pool; }; } // namespace stream_load diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 3aa23e10595197..4e93d11e2e8d3f 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -155,10 +155,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) { _num_senders = state->num_per_fragment_instances(); _stream_per_node = state->load_stream_per_node(); _total_streams = state->total_load_streams(); + _num_local_sink = state->num_local_sink(); DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0"; DCHECK(_total_streams > 0) << "total load streams should be greator than 0"; + DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0"; LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << _stream_per_node - << ", total_streams " << _total_streams; + << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink; _is_high_priority = (state->execution_timeout() <= config::load_task_high_priority_threshold_second); @@ -197,8 +199,8 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) { // Prepare the exprs to run. RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); if (config::share_delta_writers) { - _delta_writer_for_tablet = - ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id); + _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create( + _load_id, _num_local_sink); } else { _delta_writer_for_tablet = std::make_shared(_load_id); } @@ -226,18 +228,17 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) { if (node_info == nullptr) { return Status::InternalError("Unknown node {} in tablet location", dst_id); } - std::shared_ptr streams; - streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, src_id, dst_id, _stream_per_node); + auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); // get tablet schema from each backend only in the 1st stream - for (auto& stream : *streams | std::ranges::views::take(1)) { + for (auto& stream : streams->streams() | std::ranges::views::take(1)) { const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, _state->enable_profile())); } // for the rest streams, open without getting tablet schema - for (auto& stream : *streams | std::ranges::views::drop(1)) { + for (auto& stream : streams->streams() | std::ranges::views::drop(1)) { RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, {}, _total_streams, _state->enable_profile())); @@ -300,7 +301,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) { return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); } for (auto& node_id : location->node_ids) { - streams.emplace_back(_streams_for_node[node_id]->at(_stream_index)); + streams.emplace_back(_streams_for_node[node_id]->streams().at(_stream_index)); } _stream_index = (_stream_index + 1) % _stream_per_node; return Status::OK(); @@ -393,6 +394,9 @@ Status VOlapTableSinkV2::_cancel(Status status) { _delta_writer_for_tablet->cancel(status); _delta_writer_for_tablet.reset(); } + for (const auto& [_, streams] : _streams_for_node) { + streams->release(); + } return Status::OK(); } @@ -415,6 +419,11 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); + // release streams from the pool first, to prevent memory leak + for (const auto& [_, streams] : _streams_for_node) { + streams->release(); + } + { SCOPED_TIMER(_close_writer_timer); // close all delta writers if this is the last user @@ -425,14 +434,14 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { { // send CLOSE_LOAD to all streams, return ERROR if any for (const auto& [_, streams] : _streams_for_node) { - RETURN_IF_ERROR(_close_load(*streams)); + RETURN_IF_ERROR(_close_load(streams->streams())); } } { SCOPED_TIMER(_close_load_timer); for (const auto& [_, streams] : _streams_for_node) { - for (const auto& stream : *streams) { + for (const auto& stream : streams->streams()) { RETURN_IF_ERROR(stream->close_wait()); } } @@ -440,7 +449,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { std::vector tablet_commit_infos; for (const auto& [node_id, streams] : _streams_for_node) { - for (const auto& stream : *streams) { + for (const auto& stream : streams->streams()) { for (auto tablet_id : stream->success_tablets()) { TTabletCommitInfo commit_info; commit_info.tabletId = tablet_id; diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 6f369286677b29..71f85c8156c146 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -77,6 +77,10 @@ class TExpr; class TabletSchema; class TupleDescriptor; +namespace stream_load { +class LoadStreams; +} + namespace vectorized { class OlapTableBlockConvertor; @@ -156,8 +160,9 @@ class VOlapTableSinkV2 final : public DataSink { // To support multiple senders, we maintain a channel for each sender. int _sender_id = -1; int _num_senders = -1; - int _stream_per_node = 0; - int _total_streams = 0; + int _stream_per_node = -1; + int _total_streams = -1; + int _num_local_sink = -1; bool _is_high_priority = false; bool _write_file_cache = false; @@ -204,7 +209,9 @@ class VOlapTableSinkV2 final : public DataSink { std::unordered_map> _tablets_for_node; std::unordered_map> _indexes_from_node; - std::unordered_map> _streams_for_node; + std::unordered_map> + _streams_for_node; + size_t _stream_index = 0; std::shared_ptr _delta_writer_for_tablet; diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index c6ac4a3f501ff2..c52b59f01e401c 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -51,7 +51,7 @@ static std::atomic g_num_request; class StreamSinkFileWriterTest : public testing::Test { class MockStreamStub : public LoadStreamStub { public: - MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id) {}; + MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id, 1) {}; virtual ~MockStreamStub() = default; diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp b/be/test/vec/exec/delta_writer_v2_pool_test.cpp index 30b56b65d1150f..d44fd17a761b0e 100644 --- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp +++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp @@ -42,9 +42,9 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) { EXPECT_EQ(2, pool.size()); EXPECT_EQ(map, map3); EXPECT_NE(map, map2); - map.reset(); - map2.reset(); - map3.reset(); + EXPECT_TRUE(map->close().ok()); + EXPECT_TRUE(map2->close().ok()); + EXPECT_TRUE(map3->close().ok()); EXPECT_EQ(0, pool.size()); } @@ -62,7 +62,7 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { EXPECT_EQ(2, map->size()); EXPECT_EQ(writer, writer3); EXPECT_NE(writer, writer2); - map.reset(); + static_cast(map->close()); EXPECT_EQ(0, pool.size()); } diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index 929b906aab7d66..73f0950889673b 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -36,16 +36,16 @@ TEST_F(LoadStreamStubPoolTest, test) { PUniqueId load_id; load_id.set_hi(1); load_id.set_hi(2); - auto streams1 = pool.get_or_create(load_id, src_id, 101, 5); - auto streams2 = pool.get_or_create(load_id, src_id, 102, 5); - auto streams3 = pool.get_or_create(load_id, src_id, 101, 5); + auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1); + auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1); + auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1); EXPECT_EQ(2, pool.size()); EXPECT_EQ(1, pool.templates_size()); EXPECT_EQ(streams1, streams3); EXPECT_NE(streams1, streams2); - streams1.reset(); - streams2.reset(); - streams3.reset(); + streams1->release(); + streams2->release(); + streams3->release(); EXPECT_EQ(0, pool.size()); EXPECT_EQ(0, pool.templates_size()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index f4131235da359c..ce8b08086abb35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -300,6 +300,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde params.setParams(execParams); params.setLoadStreamPerNode(taskInfo.getStreamPerNode()); params.setTotalLoadStreams(taskInfo.getStreamPerNode()); + params.setNumLocalSink(1); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setQueryType(TQueryType.LOAD); queryOptions.setQueryTimeout(timeout); @@ -503,6 +504,7 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns pipParams.setNumSenders(1); pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode()); pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode()); + pipParams.setNumLocalSink(1); TPipelineInstanceParams localParams = new TPipelineInstanceParams(); localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + fragmentInstanceIdIndex)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 90fb91304c2781..868a992822032e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -683,7 +683,7 @@ private void sendFragment() throws TException, RpcException, UserException { int backendIdx = 0; int profileFragmentId = 0; long memoryLimit = queryOptions.getMemLimit(); - Set backendsWithOlapTableSink = Sets.newHashSet(); + Map numSinkOnBackend = Maps.newHashMap(); beToExecStates.clear(); // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, // else use exec_plan_fragments directly. @@ -753,7 +753,7 @@ private void sendFragment() throws TException, RpcException, UserException { states.addState(execState); if (tParam.getFragment().getOutputSink() != null && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { - backendsWithOlapTableSink.add(execState.backend.getId()); + numSinkOnBackend.merge(execState.backend.getId(), 1, Integer::sum); } ++backendIdx; } @@ -765,7 +765,10 @@ private void sendFragment() throws TException, RpcException, UserException { if (tParam.getFragment().getOutputSink() != null && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { tParam.setLoadStreamPerNode(loadStreamPerNode); - tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode); + tParam.setTotalLoadStreams(numSinkOnBackend.size() * loadStreamPerNode); + tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId())); + LOG.info("num local sink for backend {} is {}", tParam.getBackendId(), + numSinkOnBackend.get(tParam.getBackendId())); } } profileFragmentId += 1; @@ -844,7 +847,7 @@ private void sendPipelineCtx() throws TException, RpcException, UserException { } } - Set backendsWithOlapTableSink = Sets.newHashSet(); + int numBackendsWithSink = 0; // 3. group PipelineExecContext by BE. // So that we can use one RPC to send all fragment instances of a BE. for (Map.Entry entry : tParams.entrySet()) { @@ -881,7 +884,7 @@ private void sendPipelineCtx() throws TException, RpcException, UserException { if (entry.getValue().getFragment().getOutputSink() != null && entry.getValue().getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { - backendsWithOlapTableSink.add(backendId); + numBackendsWithSink++; } ++backendIdx; } @@ -894,7 +897,10 @@ private void sendPipelineCtx() throws TException, RpcException, UserException { && entry.getValue().getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { entry.getValue().setLoadStreamPerNode(loadStreamPerNode); - entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() * loadStreamPerNode); + entry.getValue().setTotalLoadStreams(numBackendsWithSink * loadStreamPerNode); + entry.getValue().setNumLocalSink(entry.getValue().getLocalParams().size()); + LOG.info("num local sink for backend {} is {}", entry.getValue().getBackendId(), + entry.getValue().getNumLocalSink()); } } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index e55ab89e3285fb..7c55842735cdc9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -461,6 +461,8 @@ struct TExecPlanFragmentParams { // total num of load streams the downstream backend will see 27: optional i32 total_load_streams + + 28: optional i32 num_local_sink } struct TExecPlanFragmentParamsList { @@ -678,6 +680,7 @@ struct TPipelineFragmentParams { 30: optional bool group_commit = false; 31: optional i32 load_stream_per_node // num load stream for each sink backend 32: optional i32 total_load_streams // total num of load streams the downstream backend will see + 33: optional i32 num_local_sink } struct TPipelineFragmentParamsList {