Skip to content

Commit

Permalink
[Feature] Support pk table migration (StarRocks#25521) (StarRocks#25522)
Browse files Browse the repository at this point in the history
Support primary key table disk migration

Fixes StarRocks#25521
  • Loading branch information
srlch authored Aug 22, 2023
1 parent be2f71b commit 1341138
Show file tree
Hide file tree
Showing 17 changed files with 698 additions and 66 deletions.
3 changes: 3 additions & 0 deletions be/src/storage/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ Status SnapshotManager::assign_new_rowset_id(SnapshotMeta* snapshot_meta, const
RETURN_IF_ERROR(FileSystem::Default()->link_file(old_path, new_path));
}
rowset_meta_pb.set_rowset_id(new_rowset_id.to_string());
// reset rowsetid means that it is different from the rowset in snapshot meta.
// It is reasonable that reset the creation time here.
rowset_meta_pb.set_creation_time(UnixSeconds());
}
return Status::OK();
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,17 @@ Status StorageEngine::_perform_update_compaction(DataDir* data_dir) {
}
TRACE("found best tablet $0", best_tablet->tablet_id());

// The concurrency of migration and compaction will lead to inconsistency between the meta and
// primary index cache of the new tablet. So we should abort the compaction for the old tablet
// when executing migration.
std::shared_lock rlock(best_tablet->get_migration_lock(), std::try_to_lock);
if (!rlock.owns_lock()) {
return Status::InternalError("Fail to get migration lock, tablet_id: {}", best_tablet->tablet_id());
}
if (Tablet::check_migrate(best_tablet)) {
return Status::InternalError("Fail to check migrate tablet, tablet_id: {}", best_tablet->tablet_id());
}

Status res;
int64_t duration_ns = 0;
{
Expand Down
42 changes: 30 additions & 12 deletions be/src/storage/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,28 @@ Status TabletManager::_add_tablet_unlocked(const TabletSharedPtr& new_tablet, bo
LOG(WARNING) << "add tablet with same data dir twice! tablet_id=" << new_tablet->tablet_id();
return Status::InternalError(fmt::format("tablet already exists, tablet_id: {}", old_tablet->tablet_id()));
}
old_tablet->obtain_header_rdlock();
auto old_rowset = old_tablet->rowset_with_max_version();
auto new_rowset = new_tablet->rowset_with_max_version();
auto old_time = (old_rowset == nullptr) ? -1 : old_rowset->creation_time();
auto new_time = (new_rowset == nullptr) ? -1 : new_rowset->creation_time();
auto old_version = (old_rowset == nullptr) ? -1 : old_rowset->end_version();
auto new_version = (new_rowset == nullptr) ? -1 : new_rowset->end_version();
old_tablet->release_header_lock();

int64_t old_time = 0;
int64_t new_time = 0;
int64_t old_version = 0;
int64_t new_version = 0;
if (new_tablet->updates() != nullptr) {
old_time = old_tablet->updates()->max_rowset_creation_time();
new_time = new_tablet->updates()->max_rowset_creation_time();
old_version = old_tablet->updates()->max_version();
new_version = new_tablet->updates()->max_version();
} else {
old_tablet->obtain_header_rdlock();
auto old_rowset = old_tablet->rowset_with_max_version();
auto new_rowset = new_tablet->rowset_with_max_version();
old_time = (old_rowset == nullptr) ? -1 : old_rowset->creation_time();
new_time = (new_rowset == nullptr) ? -1 : new_rowset->creation_time();
old_version = (old_rowset == nullptr) ? -1 : old_rowset->end_version();
new_version = (new_rowset == nullptr) ? -1 : new_rowset->end_version();
old_tablet->release_header_lock();
}
bool replace_old = (new_version > old_version) || (new_version == old_version && new_time > old_time);

if (replace_old) {
RETURN_IF_ERROR(_drop_tablet_unlocked(old_tablet->tablet_id(), kMoveFilesToTrash));
RETURN_IF_ERROR(_update_tablet_map_and_partition_info(new_tablet));
Expand Down Expand Up @@ -1509,11 +1522,14 @@ Status TabletManager::create_tablet_from_meta_snapshot(DataDir* store, TTabletId
auto shard_str = shard_path.substr(shard_path.find_last_of('/') + 1);
auto shard = stol(shard_str);

auto snapshot_meta = SnapshotManager::instance()->parse_snapshot_meta(meta_path);
if (!snapshot_meta.ok()) {
LOG(WARNING) << "Fail to parse " << meta_path << ": " << snapshot_meta.status();
return snapshot_meta.status();
auto meta_file = SnapshotManager::instance()->parse_snapshot_meta(meta_path);
if (!meta_file.ok()) {
LOG(WARNING) << "Fail to parse " << meta_path << ": " << meta_file.status();
return meta_file.status();
}
auto val = std::move(meta_file).value();
auto snapshot_meta = &val;

if (snapshot_meta->snapshot_type() != SNAPSHOT_TYPE_FULL) {
return Status::InternalError("not full snapshot");
}
Expand All @@ -1529,6 +1545,8 @@ Status TabletManager::create_tablet_from_meta_snapshot(DataDir* store, TTabletId
LOG(INFO) << strings::Substitute("create tablet from snapshot tablet:$0 version:$1 path:$2", tablet_id,
snapshot_meta->snapshot_version(), schema_hash_path);

RETURN_IF_ERROR(SnapshotManager::instance()->assign_new_rowset_id(snapshot_meta, schema_hash_path));

// Set of rowset id collected from rowset meta.
std::set<uint32_t> set1;
for (const auto& rowset_meta_pb : snapshot_meta->rowset_metas()) {
Expand Down
31 changes: 31 additions & 0 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2734,6 +2734,7 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) {
info->__set_row_count(total_row);
info->__set_data_size(total_size);
info->__set_is_error_state(_error);
info->__set_max_rowset_creation_time(max_rowset_creation_time());
}

int64_t TabletUpdates::get_average_row_size() {
Expand Down Expand Up @@ -4579,4 +4580,34 @@ Status TabletUpdates::get_rowset_and_segment_idx_by_rssid(uint32_t rssid, Rowset
return Status::NotFound(strings::Substitute("rowset for rssid $0 not found", rssid));
}

int64_t TabletUpdates::max_rowset_creation_time() {
std::lock_guard l1(_lock);
std::lock_guard l2(_rowsets_lock);

if (_edit_version_infos.empty()) {
LOG(WARNING) << "tablet deleted when call max_rowset_creation_time() tablet:" << _tablet.tablet_id();
return 0;
}

int cur_max_major_idx = 0;
int cur_max_major = 0;
for (int i = 0; i < _edit_version_infos.size(); ++i) {
if (_edit_version_infos[i]->version.major_number() > cur_max_major) {
cur_max_major = _edit_version_infos[i]->version.major_number();
cur_max_major_idx = i;
}
}

const std::vector<uint32_t>& rowsets = _edit_version_infos[cur_max_major_idx]->rowsets;
int64_t max_rowset_creation_time = 0;
for (uint32_t rowsetid : rowsets) {
auto itr = _rowsets.find(rowsetid);
if (itr == _rowsets.end()) {
continue;
}
max_rowset_creation_time = std::max(max_rowset_creation_time, itr->second->creation_time());
}
return max_rowset_creation_time;
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ class TabletUpdates {

Status pk_index_major_compaction();

// get the max rowset creation time for largest major version
int64_t max_rowset_creation_time();

private:
friend class Tablet;
friend class PrimaryIndex;
Expand Down
Loading

0 comments on commit 1341138

Please sign in to comment.