Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: srlch <linzichao@starrocks.com>
  • Loading branch information
srlch committed Aug 11, 2023
1 parent 4a4fe7a commit 0905216
Show file tree
Hide file tree
Showing 16 changed files with 650 additions and 68 deletions.
3 changes: 3 additions & 0 deletions be/src/storage/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ Status SnapshotManager::assign_new_rowset_id(SnapshotMeta* snapshot_meta, const
RETURN_IF_ERROR(FileSystem::Default()->link_file(old_path, new_path));
}
rowset_meta_pb.set_rowset_id(new_rowset_id.to_string());
// reset rowsetid means that it is different from the rowset in snapshot meta.
// It is reasonable that reset the creation time here.
rowset_meta_pb.set_creation_time(UnixSeconds());
}
return Status::OK();
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,17 @@ Status StorageEngine::_perform_update_compaction(DataDir* data_dir) {
}
TRACE("found best tablet $0", best_tablet->tablet_id());

// The concurrency of migration and compaction will lead to inconsistency between the meta and
// primary index cache of the new tablet. So we should abort the compaction for the old tablet
// when executing migration.
std::shared_lock rlock(best_tablet->get_migration_lock(), std::try_to_lock);
if (!rlock.owns_lock()) {
return Status::InternalError("Fail to get migration lock");
}
if (Tablet::check_migrate(best_tablet)) {
return Status::Corruption("Fail to check migrate tablet");
}

Status res;
int64_t duration_ns = 0;
{
Expand Down
42 changes: 30 additions & 12 deletions be/src/storage/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,28 @@ Status TabletManager::_add_tablet_unlocked(const TabletSharedPtr& new_tablet, bo
LOG(WARNING) << "add tablet with same data dir twice! tablet_id=" << new_tablet->tablet_id();
return Status::InternalError(fmt::format("tablet already exists, tablet_id: {}", old_tablet->tablet_id()));
}
old_tablet->obtain_header_rdlock();
auto old_rowset = old_tablet->rowset_with_max_version();
auto new_rowset = new_tablet->rowset_with_max_version();
auto old_time = (old_rowset == nullptr) ? -1 : old_rowset->creation_time();
auto new_time = (new_rowset == nullptr) ? -1 : new_rowset->creation_time();
auto old_version = (old_rowset == nullptr) ? -1 : old_rowset->end_version();
auto new_version = (new_rowset == nullptr) ? -1 : new_rowset->end_version();
old_tablet->release_header_lock();

int64_t old_time = 0;
int64_t new_time = 0;
int64_t old_version = 0;
int64_t new_version = 0;
if (new_tablet->updates() != nullptr) {
old_time = old_tablet->updates()->max_rowset_creation_time();
new_time = new_tablet->updates()->max_rowset_creation_time();
old_version = old_tablet->updates()->max_version();
new_version = new_tablet->updates()->max_version();
} else {
old_tablet->obtain_header_rdlock();
auto old_rowset = old_tablet->rowset_with_max_version();
auto new_rowset = new_tablet->rowset_with_max_version();
old_time = (old_rowset == nullptr) ? -1 : old_rowset->creation_time();
new_time = (new_rowset == nullptr) ? -1 : new_rowset->creation_time();
old_version = (old_rowset == nullptr) ? -1 : old_rowset->end_version();
new_version = (new_rowset == nullptr) ? -1 : new_rowset->end_version();
old_tablet->release_header_lock();
}
bool replace_old = (new_version > old_version) || (new_version == old_version && new_time > old_time);

if (replace_old) {
RETURN_IF_ERROR(_drop_tablet_unlocked(old_tablet->tablet_id(), kMoveFilesToTrash));
RETURN_IF_ERROR(_update_tablet_map_and_partition_info(new_tablet));
Expand Down Expand Up @@ -1509,11 +1522,14 @@ Status TabletManager::create_tablet_from_meta_snapshot(DataDir* store, TTabletId
auto shard_str = shard_path.substr(shard_path.find_last_of('/') + 1);
auto shard = stol(shard_str);

auto snapshot_meta = SnapshotManager::instance()->parse_snapshot_meta(meta_path);
if (!snapshot_meta.ok()) {
LOG(WARNING) << "Fail to parse " << meta_path << ": " << snapshot_meta.status();
return snapshot_meta.status();
auto meta_file = SnapshotManager::instance()->parse_snapshot_meta(meta_path);
if (!meta_file.ok()) {
LOG(WARNING) << "Fail to parse " << meta_path << ": " << meta_file.status();
return meta_file.status();
}
auto val = std::move(meta_file).value();
auto snapshot_meta = &val;

if (snapshot_meta->snapshot_type() != SNAPSHOT_TYPE_FULL) {
return Status::InternalError("not full snapshot");
}
Expand All @@ -1529,6 +1545,8 @@ Status TabletManager::create_tablet_from_meta_snapshot(DataDir* store, TTabletId
LOG(INFO) << strings::Substitute("create tablet from snapshot tablet:$0 version:$1 path:$2", tablet_id,
snapshot_meta->snapshot_version(), schema_hash_path);

RETURN_IF_ERROR(SnapshotManager::instance()->assign_new_rowset_id(snapshot_meta, schema_hash_path));

// Set of rowset id collected from rowset meta.
std::set<uint32_t> set1;
for (const auto& rowset_meta_pb : snapshot_meta->rowset_metas()) {
Expand Down
31 changes: 31 additions & 0 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2717,6 +2717,7 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) {
info->__set_row_count(total_row);
info->__set_data_size(total_size);
info->__set_is_error_state(_error);
info->__set_max_rowset_creation_time(max_rowset_creation_time());
}

int64_t TabletUpdates::get_average_row_size() {
Expand Down Expand Up @@ -4557,4 +4558,34 @@ Status TabletUpdates::get_rowset_and_segment_idx_by_rssid(uint32_t rssid, Rowset
return Status::NotFound(strings::Substitute("rowset for rssid $0 not found", rssid));
}

int64_t TabletUpdates::max_rowset_creation_time() {
std::lock_guard l1(_lock);
std::lock_guard l2(_rowsets_lock);

if (_edit_version_infos.empty()) {
LOG(WARNING) << "tablet deleted when call max_rowset_creation_time() tablet:" << _tablet.tablet_id();
return 0;
}

int cur_max_major_idx = 0;
int cur_max_major = 0;
for (int i = 0; i < _edit_version_infos.size(); ++i) {
if (_edit_version_infos[i]->version.major() > cur_max_major) {
cur_max_major = _edit_version_infos[i]->version.major();
cur_max_major_idx = i;
}
}

const std::vector<uint32_t>& rowsets = _edit_version_infos[cur_max_major_idx]->rowsets;
int64_t max_rowset_creation_time = 0;
for (uint32_t rowsetid : rowsets) {
auto itr = _rowsets.find(rowsetid);
if (itr == _rowsets.end()) {
continue;
}
max_rowset_creation_time = std::max(max_rowset_creation_time, itr->second->creation_time());
}
return max_rowset_creation_time;
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ class TabletUpdates {

Status pk_index_major_compaction();

// get the max rowset creation time for largest major version
int64_t max_rowset_creation_time();

private:
friend class Tablet;
friend class PrimaryIndex;
Expand Down
193 changes: 160 additions & 33 deletions be/src/storage/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "runtime/exec_env.h"
#include "storage/snapshot_manager.h"
#include "storage/tablet_meta_manager.h"
#include "storage/update_manager.h"
#include "util/defer_op.h"

namespace starrocks {
Expand All @@ -54,10 +55,6 @@ Status EngineStorageMigrationTask::execute() {
LOG(WARNING) << "Not found tablet: " << _tablet_id;
return Status::NotFound(fmt::format("Not found tablet: {}", _tablet_id));
}
if (tablet->updates() != nullptr) {
LOG(WARNING) << "Not support to migrate updatable tablet: " << _tablet_id;
return Status::NotSupported(fmt::format("Not support to migrate updatable tablet: {}", _tablet_id));
}

// check tablet data dir
if (tablet->data_dir() == _dest_store) {
Expand Down Expand Up @@ -116,16 +113,26 @@ Status EngineStorageMigrationTask::_storage_migrate(TabletSharedPtr tablet) {
return Status::InternalError("could not migration because has unfinished txns.");
}

if (tablet->updates() != nullptr && tablet->updates()->num_pending() != 0) {
LOG(WARNING) << "could not migration because has pending txns.";
return Status::InternalError("could not migration because has pending txns.");
}

// get all versions to be migrate
{
std::shared_lock header_rdlock(tablet->get_header_lock());
RowsetSharedPtr max_version = tablet->rowset_with_max_version();
if (max_version == nullptr) {
LOG(WARNING) << "Not found version in tablet. tablet: " << tablet->tablet_id();
return Status::NotFound(fmt::format("Not found version in tablet. tablet: {}", tablet->tablet_id()));
}
if (tablet->updates() == nullptr) {
RowsetSharedPtr max_version = tablet->rowset_with_max_version();
if (max_version == nullptr) {
LOG(WARNING) << "Not found version in tablet. tablet: " << tablet->tablet_id();
return Status::NotFound(
fmt::format("Not found version in tablet. tablet: {}", tablet->tablet_id()));
}

end_version = max_version->end_version();
end_version = max_version->end_version();
} else {
end_version = tablet->updates()->max_version();
}
res = tablet->capture_consistent_rowsets(Version(0, end_version), &consistent_rowsets);
if (!res.ok() || consistent_rowsets.empty()) {
LOG(WARNING) << "Fail to capture consistent rowsets. version=" << end_version;
Expand Down Expand Up @@ -221,6 +228,15 @@ Status EngineStorageMigrationTask::_storage_migrate(TabletSharedPtr tablet) {
break;
}

if (tablet->updates() != nullptr && tablet->updates()->num_pending() != 0) {
LOG(WARNING) << "could not migration because has pending txns.";
need_remove_new_path = true;
res = Status::InternalError("could not migration because has pending txns.");
break;
}

// can guarantee there are no new load txns in the following execution flow.

auto new_tablet_meta = std::make_shared<TabletMeta>();
Status st = TabletMetaManager::get_tablet_meta(_dest_store, _tablet_id, _schema_hash, new_tablet_meta.get());
if (st.ok()) {
Expand All @@ -235,6 +251,51 @@ Status EngineStorageMigrationTask::_storage_migrate(TabletSharedPtr tablet) {
break;
}

if (tablet->updates() == nullptr) {
res = _finish_migration(tablet, end_version, shard, consistent_rowsets, new_tablet_meta, schema_hash_path,
new_meta_file, dcgs_snapshot_path, need_remove_new_path);
} else {
res = _finish_primary_key_migration(tablet, end_version, shard, consistent_rowsets, schema_hash_path,
new_meta_file, need_remove_new_path);
}
} while (false);

// 4. clear
if (!new_meta_file.empty()) {
// remove meta file
Status st = fs::remove(new_meta_file);
if (!st.ok()) {
LOG(WARNING) << "failed to remove meta file. tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
<< ", path=" << schema_hash_path << ", error=" << st.to_string();
}
}
if (!dcgs_snapshot_path.empty()) {
// remove dcg snapshot file
Status st = fs::remove(dcgs_snapshot_path);
if (!st.ok()) {
LOG(WARNING) << "failed to remove dcg file. tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
<< ", path=" << schema_hash_path << ", error=" << st.to_string();
}
}
if (!res.ok() && need_remove_new_path) {
// remove all index and data files if migration failed
Status st = fs::remove_all(schema_hash_path);
if (!st.ok()) {
LOG(WARNING) << "failed to remove storage migration path"
<< ". schema_hash_path=" << schema_hash_path << ", error=" << st.to_string();
}
}

return res;
}

Status EngineStorageMigrationTask::_finish_migration(TabletSharedPtr tablet, int64_t end_version, uint64_t shard,
const std::vector<RowsetSharedPtr>& consistent_rowsets,
std::shared_ptr<TabletMeta> new_tablet_meta,
const string& schema_hash_path, std::string& new_meta_file,
std::string& dcgs_snapshot_path, bool& need_remove_new_path) {
Status res = Status::OK();
do {
{
// check version
std::shared_lock header_rdlock(tablet->get_header_lock());
Expand Down Expand Up @@ -319,7 +380,7 @@ Status EngineStorageMigrationTask::_storage_migrate(TabletSharedPtr tablet) {

// it will change rowset id and its create time
// rowset create time is useful when load tablet from meta to check which tablet is the tablet to load
st = SnapshotManager::instance()->convert_rowset_ids(schema_hash_path, _tablet_id, _schema_hash);
Status st = SnapshotManager::instance()->convert_rowset_ids(schema_hash_path, _tablet_id, _schema_hash);
if (!st.ok()) {
LOG(WARNING) << "Fail to convert rowset id. path=" << schema_hash_path;
need_remove_new_path = true;
Expand Down Expand Up @@ -397,31 +458,97 @@ Status EngineStorageMigrationTask::_storage_migrate(TabletSharedPtr tablet) {
}
} while (false);

// 4. clear
if (!new_meta_file.empty()) {
// remove hdr meta file
Status st = fs::remove(new_meta_file);
if (!st.ok()) {
LOG(WARNING) << "failed to remove meta file. tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
<< ", path=" << schema_hash_path << ", error=" << st.to_string();
return res;
}

/*
The key idea is that, when we migrate the Primary Key tablet, it need the following steps:
1. copy the data file into the dest store path (done before this function)
2. snapshot the meta data
3. create a NEW tablet using meta data in step 2 with the same tablet id. And FORCE REPLACE
the old one. This is the same as non Primary Key tablet.
4. clear primary index cache
*/
Status EngineStorageMigrationTask::_finish_primary_key_migration(TabletSharedPtr tablet, int64_t end_version,
uint64_t shard,
const std::vector<RowsetSharedPtr>& consistent_rowsets,
const string& schema_hash_path,
std::string& new_meta_file,
bool& need_remove_new_path) {
Status res = Status::OK();
do {
{
// check version
std::shared_lock header_rdlock(tablet->get_header_lock());
/*
Just make sure the major version is consistent.
There is no need to pay attention to minor here,
because when making the snapshot on tablet meta, the
corresponding delvector will be obtained according
to the segment file in consistent_rowsets, so
consistency can be maintained.
*/
int64_t new_end_version = tablet->updates()->max_version();
if (end_version != new_end_version) {
LOG(WARNING) << "Version does not match. src_version: " << end_version
<< ", dst_version: " << new_end_version;
need_remove_new_path = true;
res = Status::InternalError(fmt::format("Version does not match. src_version: {}, dst_version: {}",
end_version, new_end_version));
break;
}

std::vector<RowsetMetaSharedPtr> rowset_metas;
rowset_metas.reserve(consistent_rowsets.size());
for (const auto& rowset : consistent_rowsets) {
rowset_metas.emplace_back(rowset->rowset_meta());
}
res = SnapshotManager::instance()->make_snapshot_on_tablet_meta(SNAPSHOT_TYPE_FULL, schema_hash_path,
tablet, rowset_metas, new_end_version,
g_Types_constants.TSNAPSHOT_REQ_VERSION2);
if (!res.ok()) {
need_remove_new_path = true;
break;
}
}
}
if (!dcgs_snapshot_path.empty()) {
// remove dcg snapshot file
Status st = fs::remove(dcgs_snapshot_path);
if (!st.ok()) {
LOG(WARNING) << "failed to remove dcg file. tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
<< ", path=" << schema_hash_path << ", error=" << st.to_string();

auto tablet_manager = StorageEngine::instance()->tablet_manager();
res = tablet_manager->create_tablet_from_meta_snapshot(_dest_store, _tablet_id, tablet->schema_hash(),
schema_hash_path, false);
if (!res.ok()) {
LOG(WARNING) << "Fail to create tablet from meta snapshot " << _tablet_id << ", try to clear meta store";
WriteBatch wb;
RETURN_IF_ERROR(TabletMetaManager::clear_del_vector(_dest_store, &wb, _tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_delta_column_group(_dest_store, &wb, _tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_rowset(_dest_store, &wb, _tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_log(_dest_store, &wb, _tablet_id));
RETURN_IF_ERROR(TabletMetaManager::remove_tablet_meta(_dest_store, &wb, _tablet_id, tablet->schema_hash()));
auto st = _dest_store->get_meta()->write_batch(&wb);
LOG_IF(WARNING, !st.ok()) << "Fail to clear meta store: " << st;
need_remove_new_path = true;
break;
}
}
if (!res.ok() && need_remove_new_path) {
// remove all index and data files if migration failed
Status st = fs::remove_all(schema_hash_path);
if (!st.ok()) {
LOG(WARNING) << "failed to remove storage migration path"
<< ". schema_hash_path=" << schema_hash_path << ", error=" << st.to_string();

// clear index cache
auto manager = StorageEngine::instance()->update_manager();
auto& index_cache = manager->index_cache();
auto index_entry = index_cache.get_or_create(_tablet_id);
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
index_entry->value().unload();
index_cache.release(index_entry);

// if old tablet finished schema change, then the schema change status of the new tablet is DONE
// else the schema change status of the new tablet is FAILED
TabletSharedPtr new_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_tablet_id);
if (new_tablet == nullptr) {
// tablet already loaded success.
// just log, and not set need_remove_new_path.
LOG(WARNING) << "Not found tablet: " << _tablet_id;
res = Status::NotFound(fmt::format("Not found tablet: {}", _tablet_id));
break;
}
}
} while (false);

return res;
}
Expand Down
Loading

0 comments on commit 0905216

Please sign in to comment.