Skip to content

Commit

Permalink
[BugFix] check partial update segment file exist before rewrite (Star…
Browse files Browse the repository at this point in the history
…Rocks#40609)

Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha authored Feb 2, 2024
1 parent f43901e commit 0831e5b
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 16 deletions.
10 changes: 4 additions & 6 deletions be/src/storage/lake/meta_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,17 @@ void MetaFileBuilder::apply_opwrite(const TxnLogPB_OpWrite& op_write, const std:

auto segment_size_size = rowset->segment_size_size();
auto segment_file_size = rowset->segments_size();
bool upgrage_from_old_version = (segment_size_size != segment_file_size);
LOG_IF(ERROR, segment_size_size > 0 && segment_size_size != segment_file_size)
<< "segment_size size != segment file size, tablet: " << _tablet.id() << ", rowset: " << rowset->id()
<< ", segment file size: " << segment_file_size << ", segment_size size: " << segment_size_size;

for (const auto& replace_seg : replace_segments) {
// when handle partial update, replace old segments with new rewrite segments
rowset->set_segments(replace_seg.first, replace_seg.second.path);

// update new rewrite segments size
if (LIKELY(!upgrage_from_old_version)) {
rowset->set_segment_size(replace_seg.first, replace_seg.second.size.value());
}
}
if (!replace_segments.empty()) {
// NOT record segment size in rowset generated by partial update
rowset->clear_segment_size();
}

rowset->set_id(_tablet_meta->next_rowset_id());
Expand Down
44 changes: 34 additions & 10 deletions be/src/storage/lake/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,18 @@ Status RowsetUpdateState::_prepare_partial_update_states(const TxnLogPB_OpWrite&
return Status::OK();
}

StatusOr<bool> RowsetUpdateState::file_exist(const std::string& full_path) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(full_path));
auto st = fs->path_exists(full_path);
if (st.ok()) {
return true;
} else if (st.is_not_found()) {
return false;
} else {
return st;
}
}

Status RowsetUpdateState::rewrite_segment(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata,
Tablet* tablet, std::map<int, FileInfo>* replace_segments,
std::vector<std::string>* orphan_files) {
Expand Down Expand Up @@ -491,34 +503,46 @@ Status RowsetUpdateState::rewrite_segment(const TxnLogPB_OpWrite& op_write, cons
const auto& dest_path = op_write.rewrite_segments(i);
DCHECK(src_path != dest_path);

bool skip_because_file_exist = false;
int64_t t_rewrite_start = MonotonicMillis();
if (op_write.txn_meta().has_auto_increment_partial_update_column_id() &&
!_auto_increment_partial_update_states[i].skip_rewrite) {
FileInfo file_info{.path = tablet->segment_location(dest_path)};
RETURN_IF_ERROR(SegmentRewriter::rewrite(
tablet->segment_location(src_path), &file_info, tablet_schema,
_auto_increment_partial_update_states[i], read_column_ids,
_partial_update_states.size() != 0 ? &_partial_update_states[i].write_columns : nullptr, op_write,
tablet));
ASSIGN_OR_RETURN(bool skip_rewrite, file_exist(file_info.path));
if (!skip_rewrite) {
RETURN_IF_ERROR(SegmentRewriter::rewrite(
tablet->segment_location(src_path), &file_info, tablet_schema,
_auto_increment_partial_update_states[i], read_column_ids,
_partial_update_states.size() != 0 ? &_partial_update_states[i].write_columns : nullptr,
op_write, tablet));
} else {
skip_because_file_exist = true;
}
file_info.path = dest_path;
(*replace_segments)[i] = file_info;
} else if (_partial_update_states.size() != 0) {
const FooterPointerPB& partial_rowset_footer = txn_meta.partial_rowset_footers(i);
FileInfo file_info{.path = tablet->segment_location(dest_path)};
// if rewrite fail, let segment gc to clean dest segment file
RETURN_IF_ERROR(SegmentRewriter::rewrite(tablet->segment_location(src_path), &file_info, tablet_schema,
read_column_ids, _partial_update_states[i].write_columns, i,
partial_rowset_footer));
ASSIGN_OR_RETURN(bool skip_rewrite, file_exist(file_info.path));
if (!skip_rewrite) {
RETURN_IF_ERROR(SegmentRewriter::rewrite(tablet->segment_location(src_path), &file_info, tablet_schema,
read_column_ids, _partial_update_states[i].write_columns, i,
partial_rowset_footer));
} else {
skip_because_file_exist = true;
}
file_info.path = dest_path;
(*replace_segments)[i] = file_info;
} else {
need_rename[i] = false;
}
int64_t t_rewrite_end = MonotonicMillis();
LOG(INFO) << strings::Substitute(
"lake apply partial segment tablet:$0 rowset:$1 seg:$2 #column:$3 #rewrite:$4ms [$5 -> $6]",
"lake apply partial segment tablet:$0 rowset:$1 seg:$2 #column:$3 #rewrite:$4ms [$5 -> $6] "
"skip_because_file_exist:$7",
tablet->id(), rowset_meta.id(), i, read_column_ids.size(), t_rewrite_end - t_rewrite_start, src_path,
dest_path);
dest_path, skip_because_file_exist);
}

// rename segment file
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/lake/rowset_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class RowsetUpdateState {

const std::vector<std::unique_ptr<Column>>& auto_increment_deletes() const;

static StatusOr<bool> file_exist(const std::string& full_path);

private:
Status _do_load(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata, Tablet* tablet, bool need_lock);

Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/rowset/segment_rewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "storage/rowset/segment.h"
#include "storage/rowset/segment_options.h"
#include "storage/rowset/segment_writer.h"
#include "testutil/sync_point.h"
#include "util/filesystem_util.h"
#include "util/raw_container.h"
#include "util/slice.h"
Expand Down Expand Up @@ -62,6 +63,7 @@ Status SegmentRewriter::rewrite(const std::string& src_path, FileInfo* dest_path
uint64_t segment_file_size;
RETURN_IF_ERROR(writer.append_chunk(*chunk));
RETURN_IF_ERROR(writer.finalize_columns(&index_size));
TEST_ERROR_POINT("SegmentRewriter::rewrite1");
RETURN_IF_ERROR(writer.finalize_footer(&segment_file_size));

dest_path->size = segment_file_size;
Expand Down Expand Up @@ -152,6 +154,7 @@ Status SegmentRewriter::rewrite(const std::string& src_path, const std::string&
uint64_t segment_file_size;
RETURN_IF_ERROR(writer.append_chunk(*chunk));
RETURN_IF_ERROR(writer.finalize_columns(&index_size));
TEST_ERROR_POINT("SegmentRewriter::rewrite2");
RETURN_IF_ERROR(writer.finalize_footer(&segment_file_size));

return Status::OK();
Expand Down Expand Up @@ -248,6 +251,7 @@ Status SegmentRewriter::rewrite(const std::string& src_path, FileInfo* dest_path
uint64_t segment_file_size;
RETURN_IF_ERROR(writer.append_chunk(*chunk));
RETURN_IF_ERROR(writer.finalize_columns(&index_size));
TEST_ERROR_POINT("SegmentRewriter::rewrite3");
RETURN_IF_ERROR(writer.finalize_footer(&segment_file_size));

dest_path->size = segment_file_size;
Expand Down Expand Up @@ -282,6 +286,7 @@ Status SegmentRewriter::rewrite(const std::string& src_path, const TabletSchemaC
uint64_t segment_file_size;
RETURN_IF_ERROR(writer.append_chunk(*chunk));
RETURN_IF_ERROR(writer.finalize_columns(&index_size));
TEST_ERROR_POINT("SegmentRewriter::rewrite4");
RETURN_IF_ERROR(writer.finalize_footer(&segment_file_size));

return Status::OK();
Expand Down
76 changes: 76 additions & 0 deletions be/test/storage/lake/partial_update_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1023,4 +1023,80 @@ TEST_P(LakePartialUpdateTest, test_partial_update_retry_rewrite_check) {
}
}

TEST_P(LakePartialUpdateTest, test_partial_update_retry_check_file_exist) {
if (GetParam().enable_persistent_index) return;
auto chunk0 = generate_data(kChunkSize, 0, false, 3);
auto chunk1 = generate_data(kChunkSize, 0, true, 5);
auto indexes = std::vector<uint32_t>(kChunkSize);
for (int i = 0; i < kChunkSize; i++) {
indexes[i] = i;
}

auto version = 1;
auto tablet_id = _tablet_metadata->id();
// normal write
{
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
ASSERT_EQ(kChunkSize, check(version, [](int c0, int c1, int c2) { return (c0 * 3 == c1) && (c0 * 4 == c2); }));
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 1);

// partial update
auto txn_id = next_id();
{
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();
}
// retry because put meta fail
for (int i = 0; i < 2; i++) {
TEST_ENABLE_ERROR_POINT("TabletManager::put_tablet_metadata",
Status::IOError("injected put tablet metadata error"));

SyncPoint::GetInstance()->EnableProcessing();

DeferOp defer([]() {
TEST_DISABLE_ERROR_POINT("TabletManager::put_tablet_metadata");
SyncPoint::GetInstance()->DisableProcessing();
});
_tablet_mgr->prune_metacache();
ASSERT_ERROR(publish_single_version(tablet_id, version + 1, txn_id));
auto txn_log_st = _tablet_mgr->get_txn_log(tablet_id, txn_id);
EXPECT_TRUE(txn_log_st.ok());
auto& txn_log = txn_log_st.value();
auto segment = txn_log->op_write().rewrite_segments(0);
std::string filename = _tablet_mgr->segment_location(tablet_id, segment);
ASSIGN_OR_ABORT(bool file_exist, RowsetUpdateState::file_exist(filename));
EXPECT_TRUE(file_exist);
}
}

} // namespace starrocks::lake

0 comments on commit 0831e5b

Please sign in to comment.