Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support pk table migration (#25521) #25522

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Feature] Support pk table migration (#25521)
Signed-off-by: srlch <linzichao@starrocks.com>
  • Loading branch information
srlch committed Aug 22, 2023
commit 45b9e2d25c3e5bf91914fc640b29b02062d19968
3 changes: 3 additions & 0 deletions be/src/storage/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ Status SnapshotManager::assign_new_rowset_id(SnapshotMeta* snapshot_meta, const
RETURN_IF_ERROR(FileSystem::Default()->link_file(old_path, new_path));
}
rowset_meta_pb.set_rowset_id(new_rowset_id.to_string());
// reset rowsetid means that it is different from the rowset in snapshot meta.
// It is reasonable that reset the creation time here.
rowset_meta_pb.set_creation_time(UnixSeconds());
}
return Status::OK();
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,17 @@ Status StorageEngine::_perform_update_compaction(DataDir* data_dir) {
}
TRACE("found best tablet $0", best_tablet->tablet_id());

// The concurrency of migration and compaction will lead to inconsistency between the meta and
// primary index cache of the new tablet. So we should abort the compaction for the old tablet
// when executing migration.
std::shared_lock rlock(best_tablet->get_migration_lock(), std::try_to_lock);
if (!rlock.owns_lock()) {
return Status::InternalError("Fail to get migration lock, tablet_id: {}", best_tablet->tablet_id());
}
if (Tablet::check_migrate(best_tablet)) {
return Status::InternalError("Fail to check migrate tablet, tablet_id: {}", best_tablet->tablet_id());
}

Status res;
int64_t duration_ns = 0;
{
Expand Down
42 changes: 30 additions & 12 deletions be/src/storage/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,28 @@ Status TabletManager::_add_tablet_unlocked(const TabletSharedPtr& new_tablet, bo
LOG(WARNING) << "add tablet with same data dir twice! tablet_id=" << new_tablet->tablet_id();
return Status::InternalError(fmt::format("tablet already exists, tablet_id: {}", old_tablet->tablet_id()));
}
old_tablet->obtain_header_rdlock();
auto old_rowset = old_tablet->rowset_with_max_version();
auto new_rowset = new_tablet->rowset_with_max_version();
auto old_time = (old_rowset == nullptr) ? -1 : old_rowset->creation_time();
auto new_time = (new_rowset == nullptr) ? -1 : new_rowset->creation_time();
auto old_version = (old_rowset == nullptr) ? -1 : old_rowset->end_version();
auto new_version = (new_rowset == nullptr) ? -1 : new_rowset->end_version();
old_tablet->release_header_lock();

int64_t old_time = 0;
int64_t new_time = 0;
int64_t old_version = 0;
int64_t new_version = 0;
if (new_tablet->updates() != nullptr) {
old_time = old_tablet->updates()->max_rowset_creation_time();
new_time = new_tablet->updates()->max_rowset_creation_time();
old_version = old_tablet->updates()->max_version();
new_version = new_tablet->updates()->max_version();
} else {
old_tablet->obtain_header_rdlock();
auto old_rowset = old_tablet->rowset_with_max_version();
auto new_rowset = new_tablet->rowset_with_max_version();
old_time = (old_rowset == nullptr) ? -1 : old_rowset->creation_time();
new_time = (new_rowset == nullptr) ? -1 : new_rowset->creation_time();
old_version = (old_rowset == nullptr) ? -1 : old_rowset->end_version();
new_version = (new_rowset == nullptr) ? -1 : new_rowset->end_version();
old_tablet->release_header_lock();
}
bool replace_old = (new_version > old_version) || (new_version == old_version && new_time > old_time);

if (replace_old) {
RETURN_IF_ERROR(_drop_tablet_unlocked(old_tablet->tablet_id(), kMoveFilesToTrash));
RETURN_IF_ERROR(_update_tablet_map_and_partition_info(new_tablet));
Expand Down Expand Up @@ -1509,11 +1522,14 @@ Status TabletManager::create_tablet_from_meta_snapshot(DataDir* store, TTabletId
auto shard_str = shard_path.substr(shard_path.find_last_of('/') + 1);
auto shard = stol(shard_str);

auto snapshot_meta = SnapshotManager::instance()->parse_snapshot_meta(meta_path);
if (!snapshot_meta.ok()) {
LOG(WARNING) << "Fail to parse " << meta_path << ": " << snapshot_meta.status();
return snapshot_meta.status();
auto meta_file = SnapshotManager::instance()->parse_snapshot_meta(meta_path);
if (!meta_file.ok()) {
LOG(WARNING) << "Fail to parse " << meta_path << ": " << meta_file.status();
return meta_file.status();
}
auto val = std::move(meta_file).value();
auto snapshot_meta = &val;

if (snapshot_meta->snapshot_type() != SNAPSHOT_TYPE_FULL) {
return Status::InternalError("not full snapshot");
}
Expand All @@ -1529,6 +1545,8 @@ Status TabletManager::create_tablet_from_meta_snapshot(DataDir* store, TTabletId
LOG(INFO) << strings::Substitute("create tablet from snapshot tablet:$0 version:$1 path:$2", tablet_id,
snapshot_meta->snapshot_version(), schema_hash_path);

RETURN_IF_ERROR(SnapshotManager::instance()->assign_new_rowset_id(snapshot_meta, schema_hash_path));

// Set of rowset id collected from rowset meta.
std::set<uint32_t> set1;
for (const auto& rowset_meta_pb : snapshot_meta->rowset_metas()) {
Expand Down
31 changes: 31 additions & 0 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2734,6 +2734,7 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) {
info->__set_row_count(total_row);
info->__set_data_size(total_size);
info->__set_is_error_state(_error);
info->__set_max_rowset_creation_time(max_rowset_creation_time());
}

int64_t TabletUpdates::get_average_row_size() {
Expand Down Expand Up @@ -4579,4 +4580,34 @@ Status TabletUpdates::get_rowset_and_segment_idx_by_rssid(uint32_t rssid, Rowset
return Status::NotFound(strings::Substitute("rowset for rssid $0 not found", rssid));
}

int64_t TabletUpdates::max_rowset_creation_time() {
std::lock_guard l1(_lock);
std::lock_guard l2(_rowsets_lock);

if (_edit_version_infos.empty()) {
LOG(WARNING) << "tablet deleted when call max_rowset_creation_time() tablet:" << _tablet.tablet_id();
return 0;
}

int cur_max_major_idx = 0;
int cur_max_major = 0;
for (int i = 0; i < _edit_version_infos.size(); ++i) {
if (_edit_version_infos[i]->version.major_number() > cur_max_major) {
cur_max_major = _edit_version_infos[i]->version.major_number();
cur_max_major_idx = i;
}
}

const std::vector<uint32_t>& rowsets = _edit_version_infos[cur_max_major_idx]->rowsets;
int64_t max_rowset_creation_time = 0;
for (uint32_t rowsetid : rowsets) {
auto itr = _rowsets.find(rowsetid);
if (itr == _rowsets.end()) {
continue;
}
max_rowset_creation_time = std::max(max_rowset_creation_time, itr->second->creation_time());
}
return max_rowset_creation_time;
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ class TabletUpdates {

Status pk_index_major_compaction();

// get the max rowset creation time for largest major version
int64_t max_rowset_creation_time();

private:
friend class Tablet;
friend class PrimaryIndex;
Expand Down
Loading