Skip to content

Commit

Permalink
[BugFix] Fix schemachange failed caused by storage migration (#45517)
Browse files Browse the repository at this point in the history
Signed-off-by: Binglin Chang <decstery@gmail.com>
(cherry picked from commit 97fd498)

# Conflicts:
#	be/test/storage/task/engine_storage_migration_task_test.cpp
  • Loading branch information
decster authored and mergify[bot] committed May 13, 2024
1 parent 5515c55 commit d877053
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
6 changes: 6 additions & 0 deletions be/src/storage/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ Status EngineStorageMigrationTask::execute() {
return Status::NotSupported(fmt::format("Not support to migrate updatable tablet: {}", _tablet_id));
}

if (tablet->tablet_state() == TABLET_NOTREADY) {
LOG(WARNING) << "storage migrate failed, tablet is in schemachange process. tablet_id=" << _tablet_id;
return Status::InternalError(
fmt::format("storage migrate failed, tablet is in schemachange process. tablet_id: {}", _tablet_id));
}

// check tablet data dir
if (tablet->data_dir() == _dest_store) {
LOG(INFO) << "Already existed path. tablet_id=" << _tablet_id << ", dest_store=" << _dest_store->path();
Expand Down
104 changes: 104 additions & 0 deletions be/test/storage/task/engine_storage_migration_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,110 @@ TEST_F(EngineStorageMigrationTaskTest, test_concurrent_ingestion_and_migration)
ASSERT_EQ(3, max_version.first);
}

<<<<<<< HEAD
=======
TEST_F(EngineStorageMigrationTaskTest, test_concurrent_ingestion_and_migration_pk) {
TabletManager* tablet_manager = starrocks::StorageEngine::instance()->tablet_manager();
TabletUid old_tablet_uid;
{
TabletSharedPtr tablet = tablet_manager->get_tablet(99999);
old_tablet_uid = tablet->tablet_uid();
}
DeltaWriterOptions writer_options;
writer_options.tablet_id = 99999;
writer_options.schema_hash = 9999;
writer_options.txn_id = 4444;
writer_options.partition_id = 90;
writer_options.load_id.set_hi(3000);
writer_options.load_id.set_lo(4444);
TupleDescriptor* tuple_desc = _create_tuple_desc_pk();
writer_options.slots = &tuple_desc->slots();

{
MemTracker mem_checker(1024 * 1024 * 1024);
auto writer_status = DeltaWriter::open(writer_options, &mem_checker);
ASSERT_TRUE(writer_status.ok());
auto delta_writer = std::move(writer_status.value());
ASSERT_TRUE(delta_writer != nullptr);
// add sleep to add time of tablet create time gap
sleep(2);
// do migration check, migration will fail
do_migration_fail(99999, 9999);
TabletUid new_tablet_uid;
{
TabletSharedPtr tablet = tablet_manager->get_tablet(99999);
new_tablet_uid = tablet->tablet_uid();
}
// the migration fail. so the tablet will not change
ASSERT_TRUE(new_tablet_uid.hi == old_tablet_uid.hi && new_tablet_uid.lo == old_tablet_uid.lo);
// prepare chunk
std::vector<std::string> test_data;
auto chunk = ChunkHelper::new_chunk(tuple_desc->slots(), 1024);
std::vector<uint32_t> indexes;
indexes.reserve(1024);
for (size_t i = 0; i < 1024; ++i) {
indexes.push_back(i);
auto& cols = chunk->columns();
cols[0]->append_datum(Datum(static_cast<int64_t>(i)));
cols[1]->append_datum(Datum(static_cast<int16_t>(i + 1)));
cols[2]->append_datum(Datum(static_cast<int32_t>(i + 2)));
}
auto st = delta_writer->write(*chunk, indexes.data(), 0, indexes.size());
ASSERT_TRUE(st.ok());
st = delta_writer->close();
ASSERT_TRUE(st.ok());
st = delta_writer->commit();
ASSERT_TRUE(st.ok());
}
// make sure to release delta_writer from here
// or it will not release the tablet in gc

// clean trash and unused txns after commit
// it will clean no tablet and txns
tablet_manager->start_trash_sweep();
starrocks::StorageEngine::instance()->_clean_unused_txns();

std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(4444, 90, &tablet_related_rs);
ASSERT_EQ(1, tablet_related_rs.size());
TVersion version = 5;
// publish version for txn
auto tablet = tablet_manager->get_tablet(99999);
for (auto& tablet_rs : tablet_related_rs) {
const RowsetSharedPtr& rowset = tablet_rs.second;
auto st = StorageEngine::instance()->txn_manager()->publish_txn(90, tablet, 4444, version, rowset);
// success because the related transaction is GCed
ASSERT_TRUE(st.ok());
}
ASSERT_EQ(5, tablet->updates()->max_version());
}

TEST_F(EngineStorageMigrationTaskTest, test_migrate_empty_pk_tablet) {
int64_t empty_tablet_id = 66669;
int32_t empty_schema_hash = 6669;
TabletManager* tablet_manager = starrocks::StorageEngine::instance()->tablet_manager();
TabletSharedPtr tablet = tablet_manager->get_tablet(empty_tablet_id);
ASSERT_TRUE(tablet != nullptr);
ASSERT_EQ(tablet->tablet_id(), empty_tablet_id);
DataDir* source_path = tablet->data_dir();
DataDir* dest_path = nullptr;
DataDir* data_dir_1 = starrocks::StorageEngine::instance()->get_stores()[0];
DataDir* data_dir_2 = starrocks::StorageEngine::instance()->get_stores()[1];
if (source_path == data_dir_1) {
dest_path = data_dir_2;
} else {
dest_path = data_dir_1;
}
tablet->set_tablet_state(TabletState::TABLET_NOTREADY);
EngineStorageMigrationTask migration_task_not_ready(empty_tablet_id, empty_schema_hash, dest_path);
ASSERT_ERROR(migration_task_not_ready.execute());
tablet->set_tablet_state(TabletState::TABLET_RUNNING);
tablet.reset();
EngineStorageMigrationTask migration_task(empty_tablet_id, empty_schema_hash, dest_path);
ASSERT_OK(migration_task.execute());
}

>>>>>>> 97fd4982de ([BugFix] Fix schemachange failed caused by storage migration (#45517))
} // namespace starrocks

int main(int argc, char** argv) {
Expand Down

0 comments on commit d877053

Please sign in to comment.