Skip to content

Commit

Permalink
[BugFix] fix delete flag lost in persistent index minor compact
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha committed Dec 8, 2023
1 parent 0f22dff commit 15e1582
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 4 deletions.
9 changes: 5 additions & 4 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3979,8 +3979,8 @@ Status PersistentIndex::flush_advance() {
}

Status PersistentIndex::_flush_l0() {
// when l2 exist, must flush l0 with Delete Flag
return _l0->flush_to_immutable_index(_path, _version, false, !_l2_vec.empty());
// when l1 or l2 exist, must flush l0 with Delete Flag
return _l0->flush_to_immutable_index(_path, _version, false, !_l2_vec.empty() || !_l1_vec.empty());
}

Status PersistentIndex::_reload(const PersistentIndexMetaPB& index_meta) {
Expand Down Expand Up @@ -4552,7 +4552,7 @@ Status PersistentIndex::_minor_compaction(PersistentIndexMetaPB* index_meta) {
need_snapshot = true;
}
LOG(INFO) << "PersistentIndex minor compaction, link from tmp-l1: " << tmp_l1_filename
<< " to l1: " << new_l1_filename;
<< " to l1: " << new_l1_filename << " snapshot: " << need_snapshot;
} else if (tmp_l1_cnt > 1) {
// step 1.b
auto writer = std::make_unique<ImmutableIndexWriter>();
Expand All @@ -4561,8 +4561,9 @@ Status PersistentIndex::_minor_compaction(PersistentIndexMetaPB* index_meta) {
// 1, remove delete key when l2 not exist
// 2. skip merge l1, only merge tmp-l1 and l0
RETURN_IF_ERROR(_reload_usage_and_size_by_key_length(_has_l1 ? 1 : 0, _l1_vec.size(), false));
// keep delete flag when l2 or older l1 exist
RETURN_IF_ERROR(_merge_compaction_internal(writer.get(), _has_l1 ? 1 : 0, _l1_vec.size(),
_usage_and_size_by_key_length, !_l2_vec.empty()));
_usage_and_size_by_key_length, !_l2_vec.empty() || _has_l1));
RETURN_IF_ERROR(writer->finish());
LOG(INFO) << "PersistentIndex minor compaction, merge tmp l1, merge cnt: " << _l1_vec.size()
<< ", output: " << new_l1_filename;
Expand Down
259 changes: 259 additions & 0 deletions be/test/storage/persistent_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2549,6 +2549,265 @@ TEST_P(PersistentIndexTest, test_l0_append_load_small_data) {
ASSERT_TRUE(fs::remove_all(kPersistentIndexDir).ok());
}

TEST_P(PersistentIndexTest, test_keep_del_in_minor_compact) {
int64_t old_config = config::l0_max_mem_usage;
config::l0_max_mem_usage = 100;
FileSystem* fs = FileSystem::Default();
const std::string kPersistentIndexDir = "./PersistentIndexTest_test_test_keep_del_in_minor_compact";
const std::string kIndexFile = "./PersistentIndexTest_test_test_keep_del_in_minor_compact/index.l0.0.0";
bool created;
ASSERT_OK(fs->create_dir_if_missing(kPersistentIndexDir, &created));

using Key = std::string;
PersistentIndexMetaPB index_meta;
const int N = 1000;
int64_t cur_version = 0;
// insert
vector<Key> keys(N);
vector<Slice> key_slices;
vector<IndexValue> values;
key_slices.reserve(N);
for (int i = 0; i < N; i++) {
keys[i] = "test_varlen_" + std::to_string(i);
values.emplace_back(i);
key_slices.emplace_back(keys[i]);
}
// erase
vector<vector<Key>> erase_keys(10);
vector<vector<Slice>> erase_key_slices(10);
erase_key_slices.reserve(N);
for (int i = 0; i < 10; i++) {
erase_keys[i].resize(N / 10);
for (int j = 0; j < N / 10; j++) {
erase_keys[i][j] = "test_varlen_" + std::to_string(i * (N / 10) + j);
erase_key_slices[i].emplace_back(erase_keys[i][j]);
}
}

{
ASSIGN_OR_ABORT(auto wfile, FileSystem::Default()->new_writable_file(kIndexFile));
ASSERT_OK(wfile->close());
}

{
EditVersion version(cur_version++, 0);
index_meta.set_key_size(0);
index_meta.set_size(0);
version.to_pb(index_meta.mutable_version());
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
l0_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
IndexSnapshotMetaPB* snapshot_meta = l0_meta->mutable_snapshot();
version.to_pb(snapshot_meta->mutable_version());

PersistentIndex index(kPersistentIndexDir);

// insert
ASSERT_OK(index.load(index_meta));
ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
ASSERT_OK(index.insert(N, key_slices.data(), values.data(), false));
ASSERT_OK(index.commit(&index_meta));
ASSERT_OK(index.on_commited());

// delete
vector<IndexValue> erase_old_values(N / 10);
ASSERT_TRUE(index.prepare(EditVersion(cur_version++, 0), N).ok());
for (int i = 0; i < 10; i++) {
ASSERT_TRUE(index.erase(N / 10, erase_key_slices[i].data(), erase_old_values.data()).ok());
}
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());

std::vector<IndexValue> get_values(keys.size());
ASSERT_TRUE(index.get(keys.size(), key_slices.data(), get_values.data()).ok());
ASSERT_EQ(keys.size(), get_values.size());
for (int i = 0; i < N; i++) {
ASSERT_EQ(NullIndexValue, get_values[i].get_value());
}
}
config::l0_max_mem_usage = old_config;
}

TEST_P(PersistentIndexTest, test_keep_del_in_minor_compact2) {
int64_t old_config = config::l0_max_mem_usage;
config::l0_max_mem_usage = 100;
FileSystem* fs = FileSystem::Default();
const std::string kPersistentIndexDir = "./PersistentIndexTest_test_test_keep_del_in_minor_compact2";
const std::string kIndexFile = "./PersistentIndexTest_test_test_keep_del_in_minor_compact2/index.l0.0.0";
bool created;
ASSERT_OK(fs->create_dir_if_missing(kPersistentIndexDir, &created));

using Key = std::string;
PersistentIndexMetaPB index_meta;
const int N = 1000;
int64_t cur_version = 0;
// insert
vector<Key> keys(N);
vector<Slice> key_slices;
vector<IndexValue> values;
key_slices.reserve(N);
for (int i = 0; i < N; i++) {
keys[i] = "test_varlen_" + std::to_string(i);
values.emplace_back(i);
key_slices.emplace_back(keys[i]);
}
// erase
vector<vector<Key>> erase_keys(11);
vector<vector<Slice>> erase_key_slices(11);
erase_key_slices.reserve(N);
for (int i = 0; i < 10; i++) {
erase_keys[i].resize(N / 10);
for (int j = 0; j < N / 10; j++) {
erase_keys[i][j] = "test_varlen_" + std::to_string(i * (N / 10) + j);
erase_key_slices[i].emplace_back(erase_keys[i][j]);
}
}
// append no exist delete keys
erase_keys[10].resize(N / 10);
for (int j = 0; j < N / 10; j++) {
erase_keys[10][j] = "test_varlen_" + std::to_string(N + j);
erase_key_slices[10].emplace_back(erase_keys[10][j]);
}

{
ASSIGN_OR_ABORT(auto wfile, FileSystem::Default()->new_writable_file(kIndexFile));
ASSERT_OK(wfile->close());
}

{
EditVersion version(cur_version++, 0);
index_meta.set_key_size(0);
index_meta.set_size(0);
version.to_pb(index_meta.mutable_version());
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
l0_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
IndexSnapshotMetaPB* snapshot_meta = l0_meta->mutable_snapshot();
version.to_pb(snapshot_meta->mutable_version());

PersistentIndex index(kPersistentIndexDir);

// insert
ASSERT_OK(index.load(index_meta));
ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
ASSERT_OK(index.insert(N, key_slices.data(), values.data(), false));
ASSERT_OK(index.commit(&index_meta));
ASSERT_OK(index.on_commited());

// delete
vector<IndexValue> erase_old_values(N / 10);
ASSERT_TRUE(index.prepare(EditVersion(cur_version++, 0), N + N / 10).ok());
for (int i = 0; i < 11; i++) {
ASSERT_TRUE(index.erase(N / 10, erase_key_slices[i].data(), erase_old_values.data()).ok());
}
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());

std::vector<IndexValue> get_values(keys.size());
ASSERT_TRUE(index.get(keys.size(), key_slices.data(), get_values.data()).ok());
ASSERT_EQ(keys.size(), get_values.size());
for (int i = 0; i < N; i++) {
ASSERT_EQ(NullIndexValue, get_values[i].get_value());
}
}
config::l0_max_mem_usage = old_config;
}

TEST_P(PersistentIndexTest, test_snapshot_with_minor_compact) {
int64_t old_config = config::l0_max_mem_usage;
config::l0_max_mem_usage = 100;
FileSystem* fs = FileSystem::Default();
const std::string kPersistentIndexDir = "./PersistentIndexTest_test_snapshot_with_minor_compact";
const std::string kIndexFile = "./PersistentIndexTest_test_snapshot_with_minor_compact/index.l0.0.0";
bool created;
ASSERT_OK(fs->create_dir_if_missing(kPersistentIndexDir, &created));

using Key = std::string;
PersistentIndexMetaPB index_meta;
const int N = 1000;
int64_t cur_version = 0;
// insert
vector<Key> keys(N);
vector<Slice> key_slices;
vector<IndexValue> values;
key_slices.reserve(N);
for (int i = 0; i < N; i++) {
keys[i] = "test_varlen_" + std::to_string(i);
values.emplace_back(i);
key_slices.emplace_back(keys[i]);
}
// erase
vector<vector<Key>> erase_keys(11);
vector<vector<Slice>> erase_key_slices(11);
erase_key_slices.reserve(N);
for (int i = 0; i < 10; i++) {
erase_keys[i].resize(N / 10);
for (int j = 0; j < N / 10; j++) {
erase_keys[i][j] = "test_varlen_" + std::to_string(i * (N / 10) + j);
erase_key_slices[i].emplace_back(erase_keys[i][j]);
}
}
// extra keys to insert
// insert
vector<Key> extra_keys(2);
vector<Slice> extra_key_slices;
vector<IndexValue> extra_values;
for (int i = 0; i < 2; i++) {
extra_keys[i] = "test_varlen_" + std::to_string(N + i);
extra_values.emplace_back(N + i);
extra_key_slices.emplace_back(extra_keys[i]);
}

{
ASSIGN_OR_ABORT(auto wfile, FileSystem::Default()->new_writable_file(kIndexFile));
ASSERT_OK(wfile->close());
}

{
EditVersion version(cur_version++, 0);
index_meta.set_key_size(0);
index_meta.set_size(0);
version.to_pb(index_meta.mutable_version());
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
l0_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
IndexSnapshotMetaPB* snapshot_meta = l0_meta->mutable_snapshot();
version.to_pb(snapshot_meta->mutable_version());

PersistentIndex index(kPersistentIndexDir);

// insert
ASSERT_OK(index.load(index_meta));
ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
ASSERT_OK(index.insert(N, key_slices.data(), values.data(), false));
// insert extra keys, so we can trigger dump snapshot later
ASSERT_OK(index.insert(2, extra_key_slices.data(), extra_values.data(), false));
ASSERT_OK(index.commit(&index_meta));
ASSERT_OK(index.on_commited());

// delete
vector<IndexValue> erase_old_values(N / 10);
ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
for (int i = 0; i < 10; i++) {
ASSERT_OK(index.erase(N / 10, erase_key_slices[i].data(), erase_old_values.data()));
}
ASSERT_OK(index.commit(&index_meta));
ASSERT_OK(index.on_commited());

std::vector<IndexValue> get_values(keys.size());
ASSERT_OK(index.get(keys.size(), key_slices.data(), get_values.data()));
ASSERT_EQ(keys.size(), get_values.size());
for (int i = 0; i < N; i++) {
ASSERT_EQ(NullIndexValue, get_values[i].get_value());
}
// check extra keys
std::vector<IndexValue> get_extra_values(2);
ASSERT_OK(index.get(extra_keys.size(), extra_key_slices.data(), get_extra_values.data()));
ASSERT_EQ(2, get_extra_values.size());
for (int i = 0; i < 2; i++) {
ASSERT_EQ(N + i, get_extra_values[i].get_value());
}
}
config::l0_max_mem_usage = old_config;
}

INSTANTIATE_TEST_SUITE_P(PersistentIndexTest, PersistentIndexTest,
::testing::Values(PersistentIndexTestParam{true}, PersistentIndexTestParam{false}));

Expand Down

0 comments on commit 15e1582

Please sign in to comment.