Skip to content

Commit

Permalink
[fix](move-memtable) fix commit may fail due to duplicated reports (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Apr 19, 2024
1 parent 64ceab4 commit 049729a
Show file tree
Hide file tree
Showing 5 changed files with 414 additions and 29 deletions.
12 changes: 12 additions & 0 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ class LoadStreamStub {

std::string to_string();

// for tests only
void add_success_tablet(int64_t tablet_id) {
std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
_success_tablets.push_back(tablet_id);
}

// for tests only
void add_failed_tablet(int64_t tablet_id, Status reason) {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
_failed_tablets[tablet_id] = reason;
}

private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Expand Down
86 changes: 57 additions & 29 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,39 +575,28 @@ Status VTabletWriterV2::close(Status exec_status) {

// calculate and submit commit info
if (is_last_sink) {
std::unordered_map<int64_t, int> failed_tablets;
std::unordered_map<int64_t, Status> failed_reason;
std::vector<TTabletCommitInfo> tablet_commit_infos;

_load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams) {
for (auto [tablet_id, reason] : stream->failed_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
failed_tablets[tablet_id]++;
failed_reason[tablet_id] = reason;
}
for (auto tablet_id : stream->success_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
commit_info.backendId = dst_id;
tablet_commit_infos.emplace_back(std::move(commit_info));
DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
auto streams = _load_stream_map->at(_tablets_for_node.begin()->first);
int64_t tablet_id = -1;
for (auto& stream : *streams) {
const auto& tablets = stream->success_tablets();
if (tablets.size() > 0) {
tablet_id = tablets[0];
break;
}
}
if (tablet_id != -1) {
LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id;
streams->front()->add_failed_tablet(tablet_id,
Status::InternalError("fault injection"));
} else {
LOG(INFO) << "fault injection: failed to inject failed tablet_id";
}
});

for (auto [tablet_id, replicas] : failed_tablets) {
if (replicas > (_num_replicas - 1) / 2) {
return failed_reason.at(tablet_id);
}
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
RETURN_IF_ERROR(
_create_commit_info(tablet_commit_infos, _load_stream_map, _num_replicas));
_state->tablet_commit_infos().insert(
_state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
Expand Down Expand Up @@ -660,4 +649,43 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
}
}

Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
std::shared_ptr<LoadStreamMap> load_stream_map,
int num_replicas) {
std::unordered_map<int64_t, int> failed_tablets;
std::unordered_map<int64_t, Status> failed_reason;
load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams) {
for (auto [tablet_id, reason] : stream->failed_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
failed_tablets[tablet_id]++;
failed_reason[tablet_id] = reason;
}
for (auto tablet_id : stream->success_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
}
known_tablets.insert(tablet_id);
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
commit_info.backendId = dst_id;
tablet_commit_infos.emplace_back(std::move(commit_info));
}
}
});

for (auto [tablet_id, replicas] : failed_tablets) {
if (replicas > (num_replicas - 1) / 2) {
LOG(INFO) << "tablet " << tablet_id
<< " failed on majority backends: " << failed_reason[tablet_id];
return failed_reason.at(tablet_id);
}
}
return Status::OK();
}

} // namespace doris::vectorized
7 changes: 7 additions & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ class VTabletWriterV2 final : public AsyncResultWriter {

Status on_partitions_created(TCreatePartitionResult* result);

#ifndef BE_TEST
private:
#endif
static Status _create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
std::shared_ptr<LoadStreamMap> load_stream_map,
int num_replicas);

private:
Status _init_row_distribution();

Expand Down
239 changes: 239 additions & 0 deletions be/test/vec/sink/vtablet_writer_v2_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/sink/writer/vtablet_writer_v2.h"

#include <gtest/gtest.h>

#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {

class TestVTabletWriterV2 : public ::testing::Test {
public:
TestVTabletWriterV2() = default;
~TestVTabletWriterV2() = default;
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};

const int64_t src_id = 1000;

static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t node_id,
std::vector<int64_t> success_tablets,
std::unordered_map<int64_t, Status> failed_tablets) {
auto stub = load_stream_map->get_or_create(node_id);
for (const auto& tablet_id : success_tablets) {
stub->at(0)->add_success_tablet(tablet_id);
}
for (const auto& [tablet_id, reason] : failed_tablets) {
stub->at(0)->add_failed_tablet(tablet_id, reason);
}
}

TEST_F(TestVTabletWriterV2, one_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 1;
add_stream(load_stream_map, 1001, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 2);
}

TEST_F(TestVTabletWriterV2, one_replica_fail) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 1;
add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_EQ(st, Status::InternalError("test"));
}

TEST_F(TestVTabletWriterV2, two_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 2;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, two_replica_fail) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 2;
add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_EQ(st, Status::InternalError("test"));
}

TEST_F(TestVTabletWriterV2, normal) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 6);
}

TEST_F(TestVTabletWriterV2, miss_one) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}

TEST_F(TestVTabletWriterV2, miss_two) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, fail_one) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}

TEST_F(TestVTabletWriterV2, fail_one_duplicate) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
// Duplicate tablets from same node should be ignored
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}

TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_same_node) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {},
{{1, Status::InternalError("test")}, {2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_diff_node) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {2}, {{1, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, fail_two_same_tablet) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}

TEST_F(TestVTabletWriterV2, fail_two_miss_one_same_tablet) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1}, {});
add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map,
num_replicas);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}

} // namespace doris
Loading

0 comments on commit 049729a

Please sign in to comment.