Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (release-8.1) #9045

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 49 additions & 37 deletions dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/Page/ConfigSettings.h>
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/PageConstants.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/V2/PageStorage.h>
Expand All @@ -31,6 +32,7 @@
#include <Storages/PathPool.h>
#include <fmt/format.h>

#include <magic_enum.hpp>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -222,9 +224,9 @@ StoragePool::StoragePool(
{
LOG_INFO(
logger,
"PageStorage V2 is already mark deleted. Current pagestorage change from {} to {} [table_id={}]", //
static_cast<UInt8>(PageStorageRunMode::MIX_MODE), //
static_cast<UInt8>(PageStorageRunMode::ONLY_V3), //
"PageStorage V2 is already mark deleted. Current pagestorage change from {} to {}, table_id={}",
magic_enum::enum_name(PageStorageRunMode::MIX_MODE),
magic_enum::enum_name(PageStorageRunMode::ONLY_V3),
table_id);
log_storage_v2 = nullptr;
data_storage_v2 = nullptr;
Expand Down Expand Up @@ -368,11 +370,10 @@ StoragePool::StoragePool(

void StoragePool::forceTransformMetaV2toV3()
{
if (unlikely(run_mode != PageStorageRunMode::MIX_MODE))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transform meta must run under mix mode [run_mode={}]",
static_cast<Int32>(run_mode));
RUNTIME_CHECK_MSG(
run_mode == PageStorageRunMode::MIX_MODE,
"Transform meta must run under mix mode, run_mode={}",
static_cast<Int32>(run_mode));

assert(meta_storage_v2 != nullptr);
assert(meta_storage_v3 != nullptr);
Expand Down Expand Up @@ -411,7 +412,7 @@ void StoragePool::forceTransformMetaV2toV3()
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Can't transform meta from V2 to V3, [page_id={}] {}", //
"Can't transform meta from V2 to V3, page_id={} {}", //
page_transform.page_id,
page_transform_entry.toDebugString());
}
Expand Down Expand Up @@ -440,11 +441,11 @@ static inline DB::PS::V2::PageEntriesVersionSetWithDelta::Snapshot * toV2Concret

void StoragePool::forceTransformDataV2toV3()
{
if (unlikely(run_mode != PageStorageRunMode::MIX_MODE))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Transform meta must run under mix mode [run_mode={}]",
static_cast<Int32>(run_mode));
RUNTIME_CHECK_MSG(
run_mode == PageStorageRunMode::MIX_MODE,
"Transform data must run under mix mode, run_mode={}",
static_cast<Int32>(run_mode));

assert(data_storage_v2 != nullptr);
assert(data_storage_v3 != nullptr);
auto data_transform_storage_writer = std::make_shared<PageWriter>(
Expand Down Expand Up @@ -520,10 +521,12 @@ PageStorageRunMode StoragePool::restore()
{
case PageStorageRunMode::ONLY_V2:
{
// Restore the PSV2 instances from disk
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();

// ONLY_V2, make sure the page_ids is larger than that in PSV2
global_id_allocator->raiseLogPageIdLowerBound(log_storage_v2->getMaxId());
global_id_allocator->raiseDataPageIdLowerBound(data_storage_v2->getMaxId());
global_id_allocator->raiseMetaPageIdLowerBound(meta_storage_v2->getMaxId());
Expand All @@ -533,6 +536,9 @@ PageStorageRunMode StoragePool::restore()
}
case PageStorageRunMode::ONLY_V3:
{
// ONLY_V3
// - StoragePool is simply a wrapper for the PS instances in GlobalStoragePool
// - Make sure the page_ids is larger than that in PSV2
global_id_allocator->raiseLogPageIdLowerBound(log_storage_v3->getMaxId());
global_id_allocator->raiseDataPageIdLowerBound(data_storage_v3->getMaxId());
global_id_allocator->raiseMetaPageIdLowerBound(meta_storage_v3->getMaxId());
Expand All @@ -542,6 +548,7 @@ PageStorageRunMode StoragePool::restore()
}
case PageStorageRunMode::MIX_MODE:
{
// Restore the PSV2 instances from disk
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();
Expand All @@ -552,59 +559,71 @@ PageStorageRunMode StoragePool::restore()
{
LOG_INFO(
logger,
"Current pool.meta transform to V3 begin [table_id={}] [pages_before_transform={}]",
"Current pool.meta transform to V3 begin, table_id={} pages_before_transform={}",
table_id,
meta_remain_pages);
forceTransformMetaV2toV3();
const auto & meta_remain_pages_after_transform = meta_storage_v2->getNumberOfPages();
LOG_INFO(
logger,
"Current pool.meta transform to V3 finished [table_id={}] [done={}] [pages_before_transform={}], "
"[pages_after_transform={}]", //
"Current pool.meta transform to V3 finished, table_id={} done={} pages_before_transform={} "
"pages_after_transform={}", //
table_id,
meta_remain_pages_after_transform == 0,
meta_remain_pages,
meta_remain_pages_after_transform);
}
else
{
LOG_INFO(logger, "Current pool.meta transform already done before restored [table_id={}] ", table_id);
LOG_INFO(logger, "Current pool.meta transform already done before restored, table_id={}", table_id);
}

if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0)
{
LOG_INFO(
logger,
"Current pool.data transform to V3 begin [table_id={}] [pages_before_transform={}]",
"Current pool.data transform to V3 begin, table_id={} pages_before_transform={}",
table_id,
data_remain_pages);
forceTransformDataV2toV3();
const auto & data_remain_pages_after_transform = data_storage_v2->getNumberOfPages();
LOG_INFO(
logger,
"Current pool.data transform to V3 finished [table_id={}] [done={}] [pages_before_transform={}], "
"[pages_after_transform={}]", //
"Current pool.data transform to V3 finished, table_id={} done={} pages_before_transform={} "
"pages_after_transform={}", //
table_id,
data_remain_pages_after_transform == 0,
data_remain_pages,
data_remain_pages_after_transform);
}
else
{
LOG_INFO(logger, "Current pool.data transform already done before restored [table_id={}]", table_id);
LOG_INFO(logger, "Current pool.data transform already done before restored, table_id={}", table_id);
}

// Though all the pages may have been transformed into PageStoage V3 format, we still need
// to ensure the following allocated page_ids is larger than that in both v2 and v3.
// Because `PageStorageV3->getMaxId` is not accurate after the previous "meta" and "data"
// transformed from v2 to v3.
auto max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId());
auto max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId());
auto max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId());
global_id_allocator->raiseLogPageIdLowerBound(max_log_page_id);
global_id_allocator->raiseDataPageIdLowerBound(max_data_page_id);
global_id_allocator->raiseMetaPageIdLowerBound(max_meta_page_id);

// Check number of valid pages in v2
// If V2 already have no any data in disk, Then change run_mode to ONLY_V3
if (log_storage_v2->getNumberOfPages() == 0 && data_storage_v2->getNumberOfPages() == 0
&& meta_storage_v2->getNumberOfPages() == 0)
{
LOG_INFO(
logger,
"Current pagestorage change from {} to {} [table_id={}]", //
static_cast<UInt8>(PageStorageRunMode::MIX_MODE),
static_cast<UInt8>(PageStorageRunMode::ONLY_V3),
"Current pagestorage change from {} to {}, table_id={}",
magic_enum::enum_name(PageStorageRunMode::MIX_MODE),
magic_enum::enum_name(PageStorageRunMode::ONLY_V3),
table_id);

if (storage_path_pool.createPSV2DeleteMarkFile())
{
log_storage_v2->drop();
Expand Down Expand Up @@ -663,27 +682,19 @@ PageStorageRunMode StoragePool::restore()
meta_storage_v3,
/*uni_ps_*/ nullptr);

global_id_allocator->raiseLogPageIdLowerBound(log_storage_v3->getMaxId());
global_id_allocator->raiseDataPageIdLowerBound(data_storage_v3->getMaxId());
global_id_allocator->raiseMetaPageIdLowerBound(meta_storage_v3->getMaxId());

run_mode = PageStorageRunMode::ONLY_V3;
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
}
else // Still running Mix Mode
{
auto max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId());
auto max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId());
auto max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId());
global_id_allocator->raiseLogPageIdLowerBound(max_log_page_id);
global_id_allocator->raiseDataPageIdLowerBound(max_data_page_id);
global_id_allocator->raiseMetaPageIdLowerBound(max_meta_page_id);
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode};
}
break;
}
case PageStorageRunMode::UNI_PS:
{
// UNI_PS
// - StoragePool is simply a wrapper for the uni_ps
auto max_page_id = uni_ps->getMaxIdAfterRestart();
global_id_allocator->raiseLogPageIdLowerBound(max_page_id);
global_id_allocator->raiseDataPageIdLowerBound(max_page_id);
Expand All @@ -695,7 +706,8 @@ PageStorageRunMode StoragePool::restore()
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown PageStorageRunMode {}", static_cast<UInt8>(run_mode));
}
auto [max_log_page_id, max_data_page_id, max_meta_page_id] = global_id_allocator->getCurrentIds();

const auto [max_log_page_id, max_data_page_id, max_meta_page_id] = global_id_allocator->getCurrentIds();
LOG_INFO(
logger,
"Finished StoragePool restore. run_mode={} keyspace_id={} table_id={}"
Expand Down Expand Up @@ -881,7 +893,7 @@ PageIdU64 StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, c
// else there is a DTFile with that id, continue to acquire a new ID.
LOG_WARNING(
logger,
"The DTFile is already exists, continute to acquire another ID. [call={}][path={}] [id={}]",
"The DTFile is already exists, continute to acquire another ID. call={} path={} file_id={}",
who,
existed_path,
dtfile_id);
Expand Down
94 changes: 73 additions & 21 deletions dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/Page/PageDefinesBase.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/WriteBatchImpl.h>
#include <Storages/Page/WriteBatchWrapperImpl.h>
Expand All @@ -27,6 +28,11 @@
#include <fmt/ranges.h>
#include <gtest/gtest.h>

namespace DB::FailPoints
{
extern const char force_set_dtfile_exist_when_acquire_id[];
} // namespace DB::FailPoints

namespace DB
{
using namespace tests;
Expand Down Expand Up @@ -706,6 +712,52 @@ try
}
CATCH

TEST_F(PageStorageMixedTest, GetMaxIdAfterUpgraded)
try
{
const size_t buf_sz = 1024;
char c_buff[buf_sz] = {0};

const PageIdU64 max_data_page_id_allocated = (1 << 30) + 1;
const PageIdU64 max_meta_page_id_allocated = (1 << 28) + 1;
{
// Prepare a StoragePool with
// - 0 pages in "log" (must be 0)
// - some pages in "data" with `max_data_page_id_allocated`
// - some pages in "meta" with `max_meta_page_id_allocated`
{
WriteBatch batch;
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(1, 0, buff, buf_sz);
batch.putRefPage(2, 1);
ReadBufferPtr buff2 = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(1 << 30, 0, buff2, buf_sz);
batch.putRefPage(max_data_page_id_allocated, 1 << 30);
storage_pool_v2->dataWriter()->write(std::move(batch), nullptr);
}
{
WriteBatch batch;
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(max_meta_page_id_allocated, 0, buff, buf_sz);
storage_pool_v2->metaWriter()->write(std::move(batch), nullptr);
}
}

// Mock that after upgraded, the run_mode is transformed to `ONLY_V3`
ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3);

// Disable some failpoint to avoid it affect the allocated id
DB::FailPointHelper::disableFailPoint(DB::FailPoints::force_set_dtfile_exist_when_acquire_id);
SCOPE_EXIT({ DB::FailPointHelper::enableFailPoint(DB::FailPoints::force_set_dtfile_exist_when_acquire_id); });

// Allocate new id for "data" should be larger than `max_data_page_id_allocated`
auto d = storage_path_pool_v2->getStableDiskDelegator();
EXPECT_GT(storage_pool_mix->newDataPageIdForDTFile(d, "GetMaxIdAfterUpgraded"), max_data_page_id_allocated);

// Allocate new id for "meta" should be larger than `max_meta_page_id_allocated`
EXPECT_GT(storage_pool_mix->newMetaPageId(), max_meta_page_id_allocated);
}
CATCH

TEST_F(PageStorageMixedTest, ReuseV2ID)
try
Expand All @@ -714,40 +766,40 @@ try
char c_buff[buf_sz] = {0};

{
WriteBatch batch;
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(1, 0, buff, buf_sz);
page_writer_v2->write(std::move(batch), nullptr);
}

{
WriteBatch batch;
batch.delPage(1);
page_writer_v2->write(std::move(batch), nullptr);
// prepare "put 1" && "del 1"
{
WriteBatch batch;
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(1, 0, buff, buf_sz);
page_writer_v2->write(std::move(batch), nullptr);
}
{
WriteBatch batch;
batch.delPage(1);
page_writer_v2->write(std::move(batch), nullptr);
}
}

{
ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3);
ASSERT_EQ(storage_pool_mix->newLogPageId(), 1);
}
// All pages in v2 are removed, it run on V3 after restart
ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3);
ASSERT_EQ(storage_pool_mix->newLogPageId(), 2); // new allocated page id won't reuse page 1

// Mock that some new pages are written
{
WriteBatch batch;
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
batch.putPage(1, 0, buff, buf_sz);
batch.putPage(2, 0, buff, buf_sz);
page_writer_mix->write(std::move(batch), nullptr);
}

{
WriteBatch batch;
batch.delPage(1);
batch.delPage(2);
page_writer_mix->write(std::move(batch), nullptr);
}

{
ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3);
// ASSERT_EQ(storage_pool_mix->newLogPageId(), 2); // max id for v3 will not be updated, ignore this check
}
// Mock restart
ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3);
ASSERT_EQ(storage_pool_mix->newLogPageId(), 3);
}
CATCH

Expand Down