Skip to content

Commit

Permalink
[BugFix] fix partial update column mode execute concurrent with incre…
Browse files Browse the repository at this point in the history
…mental clone issue (#34555)

This PR contains these parts:
1. remove `clear_txn_meta` after applying finish. Because if we clear the txn meta and then this rowset has been incremental clone, then the source tablet can't apply this rowset to generate dcg, which will cause data loss.
2. Add a meta tool to print dcg.
  • Loading branch information
luohaha authored Nov 16, 2023
1 parent 035e033 commit dd32eb6
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 7 deletions.
3 changes: 2 additions & 1 deletion be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class RowsetMeta {
// return semgent_footer position and size if rowset is partial_rowset
const FooterPointerPB* partial_rowset_footer(uint32_t segment_id) const {
if (!_rowset_meta_pb->has_txn_meta() || _rowset_meta_pb->txn_meta().has_merge_condition() ||
_rowset_meta_pb->txn_meta().has_auto_increment_partial_update_column_id()) {
_rowset_meta_pb->txn_meta().has_auto_increment_partial_update_column_id() ||
_rowset_meta_pb->num_update_files() > 0) {
return nullptr;
}
return &_rowset_meta_pb->txn_meta().partial_rowset_footers(segment_id);
Expand Down
13 changes: 9 additions & 4 deletions be/src/storage/rowset_column_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ Status RowsetColumnUpdateState::_update_rowset_meta(const RowsetSegmentStat& sta
if (stat.num_segment <= 1) {
rowset->rowset_meta()->set_segments_overlap_pb(NONOVERLAPPING);
}
rowset->rowset_meta()->clear_txn_meta();
(void)rowset->reload();
return Status::OK();
}

Expand Down Expand Up @@ -617,6 +617,8 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
watch.start();

DCHECK(rowset->num_update_files() == _partial_update_states.size());
DCHECK(rowset->rowset_meta()->get_meta_pb().has_txn_meta())
<< fmt::format("tablet_id: {} rowset_id: {}", tablet->tablet_id(), rowset_id);
const auto& txn_meta = rowset->rowset_meta()->get_meta_pb().txn_meta();

// 1. resolve conflicts and generate `ColumnPartialUpdateState` finally.
Expand Down Expand Up @@ -668,6 +670,7 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
latest_applied_version.major_number() + 1, idx);
};
// 2. getter all rss_rowid_to_update_rowid, and prepare .col writer by the way
int64_t insert_rows = 0;
// rss_id -> rowid -> <update file id, update_rowids>
std::map<uint32_t, RowidsToUpdateRowids> rss_rowid_to_update_rowid;
for (int upt_id = 0; upt_id < _partial_update_states.size(); upt_id++) {
Expand All @@ -676,6 +679,7 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
auto rowid = (uint32_t)(each.first & ROWID_MASK);
rss_rowid_to_update_rowid[rssid][rowid] = std::make_pair(upt_id, each.second);
}
insert_rows += _partial_update_states[upt_id].insert_rowids.size();
}
cost_str << " [generate delta column group writer] " << watch.elapsed_time();
watch.reset();
Expand Down Expand Up @@ -774,9 +778,10 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
"avg_finalize_dcg_time(ms):$3 ",
total_seek_source_segment_time, total_read_column_from_update_time, total_merge_column_time,
total_finalize_dcg_time);
cost_str << strings::Substitute("rss_cnt:$0 update_cnt:$1 column_cnt:$2 update_rows:$3 handle_cnt:$4",
rss_rowid_to_update_rowid.size(), _partial_update_states.size(),
update_column_ids.size(), update_rows, handle_cnt);
cost_str << strings::Substitute(
"rss_cnt:$0 update_cnt:$1 column_cnt:$2 update_rows:$3 handle_cnt:$4 insert_rows:$5",
rss_rowid_to_update_rowid.size(), _partial_update_states.size(), update_column_ids.size(), update_rows,
handle_cnt, insert_rows);

LOG(INFO) << "RowsetColumnUpdateState tablet_id: " << tablet->tablet_id() << ", txn_id: " << rowset->txn_id()
<< ", finalize cost:" << cost_str.str();
Expand Down
22 changes: 20 additions & 2 deletions be/src/tools/meta_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "json2pb/pb_to_json.h"
#include "storage/chunk_helper.h"
#include "storage/data_dir.h"
#include "storage/delta_column_group.h"
#include "storage/key_coder.h"
#include "storage/olap_common.h"
#include "storage/olap_define.h"
Expand Down Expand Up @@ -85,6 +86,7 @@ using starrocks::PageHandle;
using starrocks::PagePointer;
using starrocks::ColumnIteratorOptions;
using starrocks::PageFooterPB;
using starrocks::DeltaColumnGroupList;

DEFINE_string(root_path, "", "storage root path");
DEFINE_string(operation, "get_meta",
Expand Down Expand Up @@ -134,6 +136,8 @@ std::string get_usage(const std::string& progname) {
ss << "./meta_tool.sh --operation=calc_checksum [--column_index=xx] --file=/path/to/segment/file\n";
ss << "./meta_tool.sh --operation=check_table_meta_consistency --root_path=/path/to/storage/path "
"--table_id=tableid\n";
ss << "./meta_tool --operation=scan_dcgs --root_path=/path/to/storage/path "
"--tablet_id=tabletid\n";
ss << "cat 0001000000001394_0000000000000004.meta | ./meta_tool.sh --operation=print_lake_metadata\n";
ss << "cat 0001000000001391_0000000000000001.log | ./meta_tool.sh --operation=print_lake_txn_log\n";
ss << "cat SCHEMA_000000000004204C | ./meta_tool.sh --operation=print_lake_schema\n";
Expand Down Expand Up @@ -583,6 +587,18 @@ void check_meta_consistency(DataDir* data_dir) {
return;
}

void scan_dcgs(DataDir* data_dir) {
DeltaColumnGroupList dcgs;
Status st = TabletMetaManager::scan_tablet_delta_column_group(data_dir->get_meta(), FLAGS_tablet_id, &dcgs);
if (!st.ok()) {
std::cout << "scan delta column group, st: " << st.to_string() << std::endl;
return;
}
for (const auto& dcg : dcgs) {
std::cout << dcg->debug_string() << std::endl;
}
}

namespace starrocks {

class SegmentDump {
Expand Down Expand Up @@ -1103,15 +1119,15 @@ int meta_tool_main(int argc, char** argv) {
"get_meta_stats",
"ls",
"check_table_meta_consistency",
"calc_checksum"};
"scan_dcgs"};
if (valid_operations.find(FLAGS_operation) == valid_operations.end()) {
std::cout << "invalid operation:" << FLAGS_operation << std::endl;
return -1;
}

bool read_only = false;
if (FLAGS_operation == "get_meta" || FLAGS_operation == "get_meta_stats" || FLAGS_operation == "ls" ||
FLAGS_operation == "check_table_meta_consistency") {
FLAGS_operation == "check_table_meta_consistency" || FLAGS_operation == "scan_dcgs") {
read_only = true;
}

Expand Down Expand Up @@ -1140,6 +1156,8 @@ int meta_tool_main(int argc, char** argv) {
list_meta(data_dir.get());
} else if (FLAGS_operation == "check_table_meta_consistency") {
check_meta_consistency(data_dir.get());
} else if (FLAGS_operation == "scan_dcgs") {
scan_dcgs(data_dir.get());
} else {
std::cout << "invalid operation: " << FLAGS_operation << "\n" << usage << std::endl;
return -1;
Expand Down
28 changes: 28 additions & 0 deletions be/test/storage/rowset_column_partial_update_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ TEST_P(RowsetColumnPartialUpdateTest, test_upsert) {
int64_t version = 1;
int64_t version_before_partial_update = 1;
prepare_tablet(this, tablet, version, version_before_partial_update, N);
int64_t version_after_partial_update = version;
auto v1_func = [](int64_t k1) { return (int16_t)(k1 % 100 + 3); };
auto v2_func = [](int64_t k1) { return (int32_t)(k1 % 1000 + 4); };

Expand Down Expand Up @@ -837,6 +838,33 @@ TEST_P(RowsetColumnPartialUpdateTest, test_upsert) {
}
ASSERT_TRUE(StorageEngine::instance()->update_manager()->TEST_primary_index_refcnt(tablet->tablet_id(), 1));
}

{
// test clone after upsert
// 1. full clone with version after partial update
auto new_tablet = create_tablet(rand(), rand());
ASSERT_EQ(1, new_tablet->updates()->version_history_count());
ASSERT_OK(full_clone(tablet, version_after_partial_update, new_tablet));
ASSERT_TRUE(check_tablet(new_tablet, version_after_partial_update, N, [](int64_t k1, int64_t v1, int32_t v2) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
}));
// 2. increment clone, upsert v1 = k1 % 100 + 3
ASSERT_OK(increment_clone(tablet, {version_after_partial_update + 1}, new_tablet));
ASSERT_TRUE(check_tablet(new_tablet, version_after_partial_update + 1, 2 * N,
[](int64_t k1, int64_t v1, int32_t v2) {
if (k1 < N) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
} else {
return (int16_t)((k1 - N) % 100 + 3) == v1 && (int32_t)0 == v2;
}
}));
// 3. increment clone, update v2 = k1 % 100 + 4
ASSERT_OK(increment_clone(tablet, {version_after_partial_update + 2}, new_tablet));
ASSERT_TRUE(check_tablet(new_tablet, version_after_partial_update + 2, 2 * N,
[](int64_t k1, int64_t v1, int32_t v2) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
}));
}
}

TEST_P(RowsetColumnPartialUpdateTest, partial_update_two_rowset_and_check) {
Expand Down

0 comments on commit dd32eb6

Please sign in to comment.