Skip to content

Commit

Permalink
[fix](move-memtable) pass num local sink to backends (apache#26897)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored and 胥剑旭 committed Dec 14, 2023
1 parent 195842b commit 4a610dd
Show file tree
Hide file tree
Showing 19 changed files with 177 additions and 76 deletions.
3 changes: 3 additions & 0 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ Status DeltaWriterV2::init() {
return Status::OK();
}
// build tablet schema in request level
if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
}
_build_current_tablet_schema(_req.index_id, _req.table_schema_param,
*_streams[0]->tablet_schema(_req.index_id));
RowsetWriterContext context;
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);

if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);

// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
_runtime_state->set_num_per_fragment_instances(params.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);

// set up sink, if required
if (request.fragment.__isset.output_sink) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ class RuntimeState {

int total_load_streams() const { return _total_load_streams; }

void set_num_local_sink(int num_local_sink) { _num_local_sink = num_local_sink; }

int num_local_sink() const { return _num_local_sink; }

bool disable_stream_preaggregations() const {
return _query_options.disable_stream_preaggregations;
}
Expand Down Expand Up @@ -553,6 +557,7 @@ class RuntimeState {
int _num_per_fragment_instances = 0;
int _load_stream_per_node = 0;
int _total_load_streams = 0;
int _num_local_sink = 0;

// The backend id on which this fragment instance runs
int64_t _backend_id = -1;
Expand Down
35 changes: 24 additions & 11 deletions be/src/vec/sink/delta_writer_v2_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class TExpr;

namespace vectorized {

DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), _use_cnt(1) {}
DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2Pool* pool)
: _load_id(load_id), _use_cnt(num_use), _pool(pool) {}

DeltaWriterV2Map::~DeltaWriterV2Map() = default;

Expand All @@ -38,9 +39,15 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(
}

Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
if (--_use_cnt > 0) {
int num_use = --_use_cnt;
if (num_use > 0) {
LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " , use_cnt = " << num_use;
return Status::OK();
}
LOG(INFO) << "closing DeltaWriterV2Map " << _load_id;
if (_pool != nullptr) {
_pool->erase(_load_id);
}
Status status = Status::OK();
_map.for_each([&status](auto& entry) {
if (status.ok()) {
Expand All @@ -59,6 +66,11 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
}

void DeltaWriterV2Map::cancel(Status status) {
int num_use = --_use_cnt;
LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = " << num_use;
if (num_use == 0 && _pool != nullptr) {
_pool->erase(_load_id);
}
_map.for_each([&status](auto& entry) {
static_cast<void>(entry.second->cancel_with_status(status));
});
Expand All @@ -68,23 +80,24 @@ DeltaWriterV2Pool::DeltaWriterV2Pool() = default;

DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;

std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id) {
std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id,
int num_sink) {
UniqueId id {load_id};
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<DeltaWriterV2Map> map = _pool[id].lock();
std::shared_ptr<DeltaWriterV2Map> map = _pool[id];
if (map) {
map->grab();
return map;
}
auto deleter = [this](DeltaWriterV2Map* m) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(m->unique_id());
delete m;
};
map = std::shared_ptr<DeltaWriterV2Map>(new DeltaWriterV2Map(id), deleter);
map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this);
_pool[id] = map;
return map;
}

void DeltaWriterV2Pool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id;
_pool.erase(load_id);
}

} // namespace vectorized
} // namespace doris
17 changes: 9 additions & 8 deletions be/src/vec/sink/delta_writer_v2_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,24 @@ class RuntimeProfile;

namespace vectorized {

class DeltaWriterV2Pool;

class DeltaWriterV2Map {
public:
DeltaWriterV2Map(UniqueId load_id);
DeltaWriterV2Map(UniqueId load_id, int num_use = 1, DeltaWriterV2Pool* pool = nullptr);

~DeltaWriterV2Map();

void grab() { ++_use_cnt; }

// get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map
DeltaWriterV2* get_or_create(int64_t tablet_id,
std::function<std::unique_ptr<DeltaWriterV2>()> creator);

// close all delta writers in this DeltaWriterV2Map if there is no other users
Status close(RuntimeProfile* profile);
Status close(RuntimeProfile* profile = nullptr);

// cancel all delta writers in this DeltaWriterV2Map
void cancel(Status status);

UniqueId unique_id() const { return _load_id; }

size_t size() const { return _map.size(); }

private:
Expand All @@ -89,6 +87,7 @@ class DeltaWriterV2Map {
UniqueId _load_id;
TabletToDeltaWriterV2Map _map;
std::atomic<int> _use_cnt;
DeltaWriterV2Pool* _pool;
};

class DeltaWriterV2Pool {
Expand All @@ -97,7 +96,9 @@ class DeltaWriterV2Pool {

~DeltaWriterV2Pool();

std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id);
std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id, int num_sink = 1);

void erase(UniqueId load_id);

size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
Expand All @@ -106,7 +107,7 @@ class DeltaWriterV2Pool {

private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
std::unordered_map<UniqueId, std::shared_ptr<DeltaWriterV2Map>> _pool;
};

} // namespace vectorized
Expand Down
23 changes: 16 additions & 7 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
_close_cv.notify_all();
}

LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
: _load_id(load_id),
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
: _use_cnt(num_use),
_load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {};

LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
: _load_id(stub._load_id),
: _use_cnt(stub._use_cnt.load()),
_load_id(stub._load_id),
_src_id(stub._src_id),
_tablet_schema_for_index(stub._tablet_schema_for_index),
_enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
Expand All @@ -107,7 +109,6 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
bool enable_profile) {
_num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init.load()) {
return Status::OK();
Expand Down Expand Up @@ -190,15 +191,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64

// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (--_num_open > 0) {
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
}
if (--_use_cnt > 0) {
return Status::OK();
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets_to_commit() = tablet;
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
for (const auto& tablet : _tablets_to_commit) {
*header.add_tablets_to_commit() = tablet;
}
}
return _encode_and_send(header);
}
Expand Down
9 changes: 6 additions & 3 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class LoadStreamStub {

public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id);
LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);

// copy constructor, shared_ptr members are shared
LoadStreamStub(LoadStreamStub& stub);
Expand Down Expand Up @@ -177,7 +177,7 @@ class LoadStreamStub {
}

std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
return _tablet_schema_for_index->at(index_id);
return (*_tablet_schema_for_index)[index_id];
}

bool enable_unique_mow(int64_t index_id) const {
Expand All @@ -203,7 +203,10 @@ class LoadStreamStub {
std::atomic<bool> _is_init;
bthread::Mutex _mutex;

std::atomic<int> _num_open;
std::atomic<int> _use_cnt;

std::mutex _tablets_to_commit_mutex;
std::vector<PTabletID> _tablets_to_commit;

std::mutex _buffer_mutex;
std::mutex _send_mutex;
Expand Down
41 changes: 29 additions & 12 deletions be/src/vec/sink/load_stream_stub_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,50 @@ class TExpr;

namespace stream_load {

LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool)
: _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}

void LoadStreams::release() {
int num_use = --_use_cnt;
if (num_use == 0) {
LOG(INFO) << "releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id;
_pool->erase(_load_id, _dst_id);
} else {
LOG(INFO) << "no releasing streams for load_id = " << _load_id << ", dst_id = " << _dst_id
<< ", use_cnt = " << num_use;
}
}

LoadStreamStubPool::LoadStreamStubPool() = default;

LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id, int num_streams) {

std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id, int num_streams,
int num_sink) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _pool[key].lock();
std::shared_ptr<LoadStreams> streams = _pool[key];
if (streams) {
return streams;
}
DCHECK(num_streams > 0) << "stream num should be greater than 0";
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id});
auto deleter = [this, key](Streams* s) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(key);
_template_stubs.erase(key.first);
delete s;
};
streams = std::shared_ptr<Streams>(new Streams(), deleter);
DCHECK(num_sink > 0) << "sink num should be greater than 0";
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink});
streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all stubs
streams->emplace_back(new LoadStreamStub {*it->second});
streams->streams().emplace_back(new LoadStreamStub {*it->second});
}
_pool[key] = streams;
return streams;
}

void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(std::make_pair(load_id, dst_id));
_template_stubs.erase(load_id);
}

} // namespace stream_load
} // namespace doris
26 changes: 23 additions & 3 deletions be/src/vec/sink/load_stream_stub_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,36 @@ class LoadStreamStub;

namespace stream_load {

class LoadStreamStubPool;

using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;

class LoadStreams {
public:
LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool);

void release();

Streams& streams() { return _streams; }

private:
Streams _streams;
UniqueId _load_id;
int64_t _dst_id;
std::atomic<int> _use_cnt;
LoadStreamStubPool* _pool;
};

class LoadStreamStubPool {
public:
LoadStreamStubPool();

~LoadStreamStubPool();

std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
int num_streams);
std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
int num_streams, int num_sink);

void erase(UniqueId load_id, int64_t dst_id);

size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
Expand All @@ -95,7 +115,7 @@ class LoadStreamStubPool {
private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> _template_stubs;
std::unordered_map<std::pair<UniqueId, int64_t>, std::weak_ptr<Streams>> _pool;
std::unordered_map<std::pair<UniqueId, int64_t>, std::shared_ptr<LoadStreams>> _pool;
};

} // namespace stream_load
Expand Down
Loading

0 comments on commit 4a610dd

Please sign in to comment.