Skip to content

Commit

Permalink
[BugFix] fix lake pk table compaction when state cache not exist
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha committed Nov 14, 2023
1 parent 852240f commit a52958b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 2 deletions.
11 changes: 9 additions & 2 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,8 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op
uint32_t max_src_rssid = max_rowset_id + input_rowset->segments_size() - 1;

// 2. update primary index, and generate delete info.
TRACE_COUNTER_INCREMENT("output_rowsets_size", compaction_state.pk_cols.size());
for (size_t i = 0; i < compaction_state.pk_cols.size(); i++) {
TRACE_COUNTER_INCREMENT("output_rowsets_size", output_rowset->num_segments());
for (size_t i = 0; i < output_rowset->num_segments(); i++) {
RETURN_IF_ERROR(compaction_state.load_segments(output_rowset.get(), this, tablet_schema, i));
TRACE_COUNTER_INCREMENT("state_bytes", compaction_state.memory_usage());
auto& pk_col = compaction_state.pk_cols[i];
Expand Down Expand Up @@ -680,6 +680,13 @@ bool UpdateManager::TEST_check_compaction_cache_absent(uint32_t tablet_id, int64
}
}

void UpdateManager::TEST_remove_compaction_cache(uint32_t tablet_id, int64_t txn_id) {
auto compaction_entry = _compaction_cache.get(cache_key(tablet_id, txn_id));
if (compaction_entry != nullptr) {
_compaction_cache.remove(compaction_entry);
}
}

void UpdateManager::preload_update_state(const TxnLog& txnlog, Tablet* tablet) {
// use tabletid-txnid as update state cache's key, so it can retry safe.
auto state_entry = _update_state_cache.get_or_create(cache_key(tablet->id(), txnlog.txn_id()));
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class UpdateManager {

bool TEST_check_update_state_cache_absent(uint32_t tablet_id, int64_t txn_id);
bool TEST_check_compaction_cache_absent(uint32_t tablet_id, int64_t txn_id);
void TEST_remove_compaction_cache(uint32_t tablet_id, int64_t txn_id);

Status update_primary_index_memory_limit(int32_t update_memory_limit_percent) {
int64_t byte_limits = ParseUtil::parse_mem_spec(config::mem_limit, MemInfo::physical_mem());
Expand Down
79 changes: 79 additions & 0 deletions be/test/storage/lake/primary_key_compaction_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,85 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_sorted) {
EXPECT_EQ(_update_mgr->compaction_state_mem_tracker()->consumption(), 0);
}

TEST_P(LakePrimaryKeyCompactionTest, test_remove_compaction_state) {
// Prepare data for writing
auto chunk0 = generate_data(kChunkSize, 0);
auto indexes = std::vector<uint32_t>(kChunkSize);
for (int i = 0; i < kChunkSize; i++) {
indexes[i] = i;
}

auto version = 1;
auto tablet_id = _tablet_metadata->id();
for (int i = 0; i < 3; i++) {
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
ASSERT_EQ(kChunkSize, read(version));
ASSIGN_OR_ABORT(auto new_tablet_metadata1, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata1->rowsets_size(), 3);

ExecEnv::GetInstance()->delete_file_thread_pool()->wait();
// make sure delvecs have been generated
for (int i = 0; i < 2; i++) {
auto itr = new_tablet_metadata1->delvec_meta().version_to_file().find(version - i);
EXPECT_TRUE(itr != new_tablet_metadata1->delvec_meta().version_to_file().end());
auto delvec_file = itr->second;
EXPECT_TRUE(fs::path_exist(_lp->delvec_location(tablet_id, delvec_file.name())));
}

ASSIGN_OR_ABORT(auto tablet, _tablet_mgr->get_tablet(tablet_id));

auto txn_id = next_id();
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(_tablet_metadata->id(), version, txn_id));
check_task(task);
CompactionTask::Progress progress;
ASSERT_OK(task->execute(&progress, CompactionTask::kNoCancelFn));
EXPECT_EQ(100, progress.value());
// remove compaction state
_update_mgr->TEST_remove_compaction_cache(tablet_id, txn_id);
EXPECT_TRUE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id));
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
EXPECT_TRUE(_update_mgr->TEST_check_compaction_cache_absent(tablet_id, txn_id));
version++;
ASSERT_EQ(kChunkSize, read(version));
// write again
{
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
// check again
ASSERT_EQ(kChunkSize, read(version));
}

INSTANTIATE_TEST_SUITE_P(LakePrimaryKeyCompactionTest, LakePrimaryKeyCompactionTest,
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5, false},
CompactionParam{VERTICAL_COMPACTION, 1, false},
Expand Down

0 comments on commit a52958b

Please sign in to comment.