diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 249e651c8a7..60e8c2a5896 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -56,12 +56,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -166,6 +168,7 @@ struct ContextShared FileProviderPtr file_provider; /// File provider. IORateLimiter io_rate_limiter; PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3; + DM::GlobalPageIdAllocatorPtr global_page_id_allocator; DM::GlobalStoragePoolPtr global_storage_pool; /// The PS instance available on Write Node. @@ -1697,6 +1700,22 @@ void Context::setPageStorageRunMode(PageStorageRunMode run_mode) const shared->storage_run_mode = run_mode; } +bool Context::initializeGlobalPageIdAllocator() +{ + auto lock = getLock(); + if (!shared->global_page_id_allocator) + { + shared->global_page_id_allocator = std::make_shared(); + } + return true; +} + +DM::GlobalPageIdAllocatorPtr Context::getGlobalPageIdAllocator() const +{ + auto lock = getLock(); + return shared->global_page_id_allocator; +} + bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool) { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 2e856642adf..29aa3c94b21 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -110,6 +110,8 @@ class DeltaIndexManager; class GlobalStoragePool; class SharedBlockSchemas; using GlobalStoragePoolPtr = std::shared_ptr; +class GlobalPageIdAllocator; +using GlobalPageIdAllocatorPtr = std::shared_ptr; } // namespace DM /// (database name, table name) @@ -444,6 +446,10 @@ class Context void initializePageStorageMode(const PathPool & path_pool, UInt64 storage_page_format_version); void setPageStorageRunMode(PageStorageRunMode run_mode) const; PageStorageRunMode getPageStorageRunMode() const; + + bool initializeGlobalPageIdAllocator(); + DM::GlobalPageIdAllocatorPtr getGlobalPageIdAllocator() const; + bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool); DM::GlobalStoragePoolPtr getGlobalStoragePool() const; diff --git a/dbms/src/Server/DTTool/DTTool.h b/dbms/src/Server/DTTool/DTTool.h index 5b16c912a63..0ef5fd84144 100644 --- a/dbms/src/Server/DTTool/DTTool.h +++ b/dbms/src/Server/DTTool/DTTool.h @@ -113,6 +113,7 @@ class ImitativeEnv global_context->getPathCapacity(), global_context->getFileProvider()); TiFlashRaftConfig raft_config; + global_context->initializeGlobalPageIdAllocator(); global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); raft_config.ignore_databases = {"default", "system"}; raft_config.engine = TiDB::StorageEngine::DT; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index c36a3eaea16..69d0d3dc309 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1297,6 +1297,7 @@ int Server::main(const std::vector & /*args*/) settings.bytes_that_rss_larger_than_limit); /// PageStorage run mode has been determined above + global_context->initializeGlobalPageIdAllocator(); if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) { global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 7e77f95531b..c56e034206d 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -261,14 +261,16 @@ DeltaMergeStore::DeltaMergeStore( try { page_storage_run_mode = storage_pool->restore(); // restore from disk + // If there is meta of `DELTA_MERGE_FIRST_SEGMENT_ID`, restore all segments + // If there is no `DELTA_MERGE_FIRST_SEGMENT_ID`, the first segment will be created by `createFirstSegment` later if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID); first_segment_entry.isValid()) { + // Restore all existing segments auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID; - - // parallel restore segment to speed up if (thread_pool) { + // parallel restore segment to speed up auto wait_group = thread_pool->waitGroup(); auto segment_ids = Segment::getAllSegmentIds(*dm_context, segment_id); for (auto & segment_id : segment_ids) @@ -286,6 +288,7 @@ DeltaMergeStore::DeltaMergeStore( } else { + // restore segment one by one while (segment_id != 0) { auto segment = Segment::restoreSegment(log, *dm_context, segment_id); @@ -2129,9 +2132,10 @@ std::pair DeltaMergeStore::getSegmentByStartKey( if (create_if_empty && is_empty) { auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); - auto page_storage_run_mode = storage_pool->getPageStorageRunMode(); - createFirstSegment(*dm_context, page_storage_run_mode); + createFirstSegment(*dm_context); } + // The first segment may be created in this thread or another thread, + // try again to get the new created segment. } while (create_if_empty && is_empty); if (throw_if_notfound) @@ -2147,7 +2151,7 @@ std::pair DeltaMergeStore::getSegmentByStartKey( } } -void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context, PageStorageRunMode page_storage_run_mode) +void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context) { std::unique_lock lock(read_write_mutex); if (!segments.empty()) @@ -2155,22 +2159,10 @@ void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context, PageStorage return; } - auto segment_id = storage_pool->newMetaPageId(); - if (segment_id != DELTA_MERGE_FIRST_SEGMENT_ID) - { - RUNTIME_CHECK_MSG( - page_storage_run_mode != PageStorageRunMode::ONLY_V2, - "The first segment id should be {}, but get {}, run_mode={}", - DELTA_MERGE_FIRST_SEGMENT_ID, - segment_id, - magic_enum::enum_name(page_storage_run_mode)); - - // In ONLY_V3 or MIX_MODE, If create a new DeltaMergeStore - // Should used fixed DELTA_MERGE_FIRST_SEGMENT_ID to create first segment - segment_id = DELTA_MERGE_FIRST_SEGMENT_ID; - } + const auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID; LOG_INFO(log, "creating the first segment with segment_id={}", segment_id); + // newSegment will also commit the segment meta to PageStorage auto first_segment = Segment::newSegment( // log, dm_context, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 2ad8159ee64..81cc89c19d3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -788,7 +788,7 @@ class DeltaMergeStore : private boost::noncopyable const RowKeyValueRef & start_key, bool create_if_empty, bool throw_if_notfound); - void createFirstSegment(DM::DMContext & dm_context, PageStorageRunMode page_storage_run_mode); + void createFirstSegment(DM::DMContext & dm_context); Context & global_context; std::shared_ptr path_pool; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 6b21fefe326..4d0cf78ce26 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -272,7 +272,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) // that callbacks is called after the `DeltaMergeStore` shutdown or dropped, // we must make the callbacks safe. ExternalPageCallbacks callbacks; - callbacks.prefix = storage_pool->getNamespaceID(); + callbacks.prefix = storage_pool->getTableID(); if (auto data_store = dm_context->global_context.getSharedContextDisagg()->remote_data_store; !data_store) { callbacks.scanner diff --git a/dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.cpp b/dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.cpp new file mode 100644 index 00000000000..993018e8b90 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.cpp @@ -0,0 +1,45 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include + +namespace DB::DM +{ + +void GlobalPageIdAllocator::raiseTargetByLowerBound(std::atomic & target, PageIdU64 lower_bound) +{ + PageIdU64 old_value = target.load(); + while (true) + { + // already satisfied the lower_bound, done. + if (old_value >= lower_bound) + break; + SYNC_FOR("before_GlobalPageIdAllocator::raiseLowerBoundCAS_1"); + // try raise to the lower_bound + if (target.compare_exchange_strong(old_value, lower_bound)) + { + // raise success, done. + break; + } + // else the `old_value` is updated, try again + } +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h b/dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h new file mode 100644 index 00000000000..543f800e739 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h @@ -0,0 +1,54 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include + +namespace DB::DM +{ + +class GlobalPageIdAllocator : private boost::noncopyable +{ +public: + GlobalPageIdAllocator() = default; + + void raiseDataPageIdLowerBound(PageIdU64 lower_bound) { raiseTargetByLowerBound(max_data_page_id, lower_bound); } + void raiseLogPageIdLowerBound(PageIdU64 lower_bound) { raiseTargetByLowerBound(max_log_page_id, lower_bound); } + void raiseMetaPageIdLowerBound(PageIdU64 lower_bound) { raiseTargetByLowerBound(max_meta_page_id, lower_bound); } + + PageIdU64 newDataPageIdForDTFile() { return ++max_data_page_id; } + PageIdU64 newLogPageId() { return ++max_log_page_id; } + PageIdU64 newMetaPageId() { return ++max_meta_page_id; } + + std::tuple getCurrentIds() const + { + return std::make_tuple(max_log_page_id.load(), max_data_page_id.load(), max_meta_page_id.load()); + } + +private: + static void raiseTargetByLowerBound(std::atomic & target, PageIdU64 lower_bound); + +private: + std::atomic max_log_page_id = 0; + std::atomic max_data_page_id = 0; + // The meta_page_id == 1 is reserved for first segment in each physical table + std::atomic max_meta_page_id = 1; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/StoragePool/GlobalStoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool/GlobalStoragePool.cpp index a8389926c51..fd5f52bc51b 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool/GlobalStoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool/GlobalStoragePool.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.cpp index c522bcdfbcc..1d5495c0c0b 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -58,18 +59,32 @@ namespace DB::DM StoragePool::StoragePool( Context & global_ctx, KeyspaceID keyspace_id_, - NamespaceID ns_id_, + NamespaceID table_id_, StoragePathPool & storage_path_pool_, const String & name) - : logger(Logger::get(!name.empty() ? name : DB::toString(ns_id_))) + : StoragePool(global_ctx, keyspace_id_, table_id_, storage_path_pool_, global_ctx.getGlobalPageIdAllocator(), name) +{ + // +} + +StoragePool::StoragePool( + Context & global_ctx, + KeyspaceID keyspace_id_, + NamespaceID table_id_, + StoragePathPool & storage_path_pool_, + GlobalPageIdAllocatorPtr page_id_allocator_, + const String & name) + : logger(Logger::get(!name.empty() ? name : DB::toString(table_id_))) , run_mode(global_ctx.getPageStorageRunMode()) , keyspace_id(keyspace_id_) - , ns_id(ns_id_) + , table_id(table_id_) , storage_path_pool(storage_path_pool_) , uni_ps(run_mode == PageStorageRunMode::UNI_PS ? global_ctx.getWriteNodePageStorage() : nullptr) , global_context(global_ctx) + , global_id_allocator(std::move(page_id_allocator_)) , storage_pool_metrics(CurrentMetrics::StoragePoolV3Only, 0) { + assert(global_id_allocator != nullptr); const auto & global_storage_pool = global_context.getGlobalStoragePool(); switch (run_mode) { @@ -97,7 +112,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Log, - ns_id, + table_id, log_storage_v2, /*storage_v3_*/ nullptr, /*uni_ps_*/ nullptr, @@ -106,7 +121,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Data, - ns_id, + table_id, data_storage_v2, /*storage_v3_*/ nullptr, /*uni_ps_*/ nullptr, @@ -115,7 +130,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Meta, - ns_id, + table_id, meta_storage_v2, /*storage_v3_*/ nullptr, /*uni_ps_*/ nullptr, @@ -152,7 +167,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Log, - ns_id, + table_id, /*storage_v2_*/ nullptr, log_storage_v3, /*uni_ps_*/ nullptr, @@ -161,7 +176,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Data, - ns_id, + table_id, /*storage_v2_*/ nullptr, data_storage_v3, /*uni_ps_*/ nullptr, @@ -170,7 +185,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Meta, - ns_id, + table_id, /*storage_v2_*/ nullptr, meta_storage_v3, /*uni_ps_*/ nullptr, @@ -207,10 +222,10 @@ StoragePool::StoragePool( { LOG_INFO( logger, - "PageStorage V2 is already mark deleted. Current pagestorage change from {} to {} [ns_id={}]", // + "PageStorage V2 is already mark deleted. Current pagestorage change from {} to {} [table_id={}]", // static_cast(PageStorageRunMode::MIX_MODE), // static_cast(PageStorageRunMode::ONLY_V3), // - ns_id); + table_id); log_storage_v2 = nullptr; data_storage_v2 = nullptr; meta_storage_v2 = nullptr; @@ -252,7 +267,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Log, - ns_id, + table_id, log_storage_v2, log_storage_v3, /*uni_ps_*/ nullptr, @@ -261,7 +276,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Data, - ns_id, + table_id, data_storage_v2, data_storage_v3, /*uni_ps_*/ nullptr, @@ -270,7 +285,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Meta, - ns_id, + table_id, meta_storage_v2, meta_storage_v3, /*uni_ps_*/ nullptr, @@ -302,7 +317,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Log, - ns_id, + table_id, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps, @@ -311,7 +326,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Data, - ns_id, + table_id, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps, @@ -320,7 +335,7 @@ StoragePool::StoragePool( run_mode, keyspace_id, StorageType::Meta, - ns_id, + table_id, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps, @@ -371,7 +386,7 @@ void StoragePool::forceTransformMetaV2toV3() run_mode, keyspace_id, StorageType::Meta, - ns_id, + table_id, meta_storage_v2, meta_storage_v3, /*uni_ps_*/ nullptr, @@ -384,8 +399,8 @@ void StoragePool::forceTransformMetaV2toV3() meta_transform_storage_reader->traverse(meta_transform_acceptor, /*only_v2*/ true, /*only_v3*/ false); - WriteBatch write_batch_transform{ns_id}; - WriteBatch write_batch_del_v2{ns_id}; + WriteBatch write_batch_transform{table_id}; + WriteBatch write_batch_del_v2{table_id}; for (const auto & page_transform : pages_transform) { @@ -461,8 +476,8 @@ void StoragePool::forceTransformDataV2toV3() // The page ids that can be accessed by DeltaTree const auto all_page_ids = v2_snap->view.validPageIds(); - WriteBatch write_batch_transform{ns_id}; - WriteBatch write_batch_del_v2{ns_id}; + WriteBatch write_batch_transform{table_id}; + WriteBatch write_batch_del_v2{table_id}; PageIdU64Set created_dt_file_id; for (const auto page_id : all_page_ids) @@ -509,18 +524,18 @@ PageStorageRunMode StoragePool::restore() data_storage_v2->restore(); meta_storage_v2->restore(); - max_log_page_id = log_storage_v2->getMaxId(); - max_data_page_id = data_storage_v2->getMaxId(); - max_meta_page_id = meta_storage_v2->getMaxId(); + global_id_allocator->raiseLogPageIdLowerBound(log_storage_v2->getMaxId()); + global_id_allocator->raiseDataPageIdLowerBound(data_storage_v2->getMaxId()); + global_id_allocator->raiseMetaPageIdLowerBound(meta_storage_v2->getMaxId()); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only}; break; } case PageStorageRunMode::ONLY_V3: { - max_log_page_id = log_storage_v3->getMaxId(); - max_data_page_id = data_storage_v3->getMaxId(); - max_meta_page_id = meta_storage_v3->getMaxId(); + global_id_allocator->raiseLogPageIdLowerBound(log_storage_v3->getMaxId()); + global_id_allocator->raiseDataPageIdLowerBound(data_storage_v3->getMaxId()); + global_id_allocator->raiseMetaPageIdLowerBound(meta_storage_v3->getMaxId()); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only}; break; @@ -537,46 +552,46 @@ PageStorageRunMode StoragePool::restore() { LOG_INFO( logger, - "Current pool.meta transform to V3 begin [ns_id={}] [pages_before_transform={}]", - ns_id, + "Current pool.meta transform to V3 begin [table_id={}] [pages_before_transform={}]", + table_id, meta_remain_pages); forceTransformMetaV2toV3(); const auto & meta_remain_pages_after_transform = meta_storage_v2->getNumberOfPages(); LOG_INFO( logger, - "Current pool.meta transform to V3 finished [ns_id={}] [done={}] [pages_before_transform={}], " + "Current pool.meta transform to V3 finished [table_id={}] [done={}] [pages_before_transform={}], " "[pages_after_transform={}]", // - ns_id, + table_id, meta_remain_pages_after_transform == 0, meta_remain_pages, meta_remain_pages_after_transform); } else { - LOG_INFO(logger, "Current pool.meta transform already done before restored [ns_id={}] ", ns_id); + LOG_INFO(logger, "Current pool.meta transform already done before restored [table_id={}] ", table_id); } if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0) { LOG_INFO( logger, - "Current pool.data transform to V3 begin [ns_id={}] [pages_before_transform={}]", - ns_id, + "Current pool.data transform to V3 begin [table_id={}] [pages_before_transform={}]", + table_id, data_remain_pages); forceTransformDataV2toV3(); const auto & data_remain_pages_after_transform = data_storage_v2->getNumberOfPages(); LOG_INFO( logger, - "Current pool.data transform to V3 finished [ns_id={}] [done={}] [pages_before_transform={}], " + "Current pool.data transform to V3 finished [table_id={}] [done={}] [pages_before_transform={}], " "[pages_after_transform={}]", // - ns_id, + table_id, data_remain_pages_after_transform == 0, data_remain_pages, data_remain_pages_after_transform); } else { - LOG_INFO(logger, "Current pool.data transform already done before restored [ns_id={}]", ns_id); + LOG_INFO(logger, "Current pool.data transform already done before restored [table_id={}]", table_id); } // Check number of valid pages in v2 @@ -586,10 +601,10 @@ PageStorageRunMode StoragePool::restore() { LOG_INFO( logger, - "Current pagestorage change from {} to {} [ns_id={}]", // + "Current pagestorage change from {} to {} [table_id={}]", // static_cast(PageStorageRunMode::MIX_MODE), static_cast(PageStorageRunMode::ONLY_V3), - ns_id); + table_id); if (storage_path_pool.createPSV2DeleteMarkFile()) { log_storage_v2->drop(); @@ -605,7 +620,7 @@ PageStorageRunMode StoragePool::restore() PageStorageRunMode::ONLY_V3, keyspace_id, StorageType::Log, - ns_id, + table_id, /*storage_v2_*/ nullptr, log_storage_v3, /*uni_ps_*/ nullptr, @@ -614,7 +629,7 @@ PageStorageRunMode StoragePool::restore() PageStorageRunMode::ONLY_V3, keyspace_id, StorageType::Data, - ns_id, + table_id, /*storage_v2_*/ nullptr, data_storage_v3, /*uni_ps_*/ nullptr, @@ -623,7 +638,7 @@ PageStorageRunMode StoragePool::restore() PageStorageRunMode::ONLY_V3, keyspace_id, StorageType::Meta, - ns_id, + table_id, /*storage_v2_*/ nullptr, meta_storage_v3, /*uni_ps_*/ nullptr, @@ -648,27 +663,31 @@ PageStorageRunMode StoragePool::restore() meta_storage_v3, /*uni_ps_*/ nullptr); - max_log_page_id = log_storage_v3->getMaxId(); - max_data_page_id = data_storage_v3->getMaxId(); - max_meta_page_id = meta_storage_v3->getMaxId(); + global_id_allocator->raiseLogPageIdLowerBound(log_storage_v3->getMaxId()); + global_id_allocator->raiseDataPageIdLowerBound(data_storage_v3->getMaxId()); + global_id_allocator->raiseMetaPageIdLowerBound(meta_storage_v3->getMaxId()); run_mode = PageStorageRunMode::ONLY_V3; storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only}; } else // Still running Mix Mode { - max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId()); - max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId()); - max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId()); + auto max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId()); + auto max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId()); + auto max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId()); + global_id_allocator->raiseLogPageIdLowerBound(max_log_page_id); + global_id_allocator->raiseDataPageIdLowerBound(max_data_page_id); + global_id_allocator->raiseMetaPageIdLowerBound(max_meta_page_id); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode}; } break; } case PageStorageRunMode::UNI_PS: { - max_log_page_id = uni_ps->getMaxIdAfterRestart(); - max_data_page_id = uni_ps->getMaxIdAfterRestart(); - max_meta_page_id = uni_ps->getMaxIdAfterRestart(); + auto max_page_id = uni_ps->getMaxIdAfterRestart(); + global_id_allocator->raiseLogPageIdLowerBound(max_page_id); + global_id_allocator->raiseDataPageIdLowerBound(max_page_id); + global_id_allocator->raiseMetaPageIdLowerBound(max_page_id); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolUniPS}; break; @@ -676,12 +695,14 @@ PageStorageRunMode StoragePool::restore() default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown PageStorageRunMode {}", static_cast(run_mode)); } - LOG_TRACE( + auto [max_log_page_id, max_data_page_id, max_meta_page_id] = global_id_allocator->getCurrentIds(); + LOG_INFO( logger, - "Finished StoragePool restore. [current_run_mode={}] [ns_id={}]" - " [max_log_page_id={}] [max_data_page_id={}] [max_meta_page_id={}]", + "Finished StoragePool restore. run_mode={} keyspace_id={} table_id={}" + " max_log_page_id={} max_data_page_id={} max_meta_page_id={}", magic_enum::enum_name(run_mode), - ns_id, + keyspace_id, + table_id, max_log_page_id, max_data_page_id, max_meta_page_id); @@ -728,7 +749,7 @@ void StoragePool::startup(ExternalPageCallbacks && callbacks) UniversalExternalPageCallbacks us_callbacks; us_callbacks.remover = std::move(callbacks.remover); us_callbacks.scanner = std::move(callbacks.scanner); - us_callbacks.prefix = UniversalPageIdFormat::toFullPrefix(keyspace_id, StorageType::Data, ns_id); + us_callbacks.prefix = UniversalPageIdFormat::toFullPrefix(keyspace_id, StorageType::Data, table_id); uni_ps->registerUniversalExternalPagesCallbacks(us_callbacks); break; } @@ -753,12 +774,12 @@ void StoragePool::shutdown() meta_storage_v2->shutdown(); log_storage_v2->shutdown(); data_storage_v2->shutdown(); - data_storage_v2->unregisterExternalPagesCallbacks(ns_id); + data_storage_v2->unregisterExternalPagesCallbacks(table_id); break; } case PageStorageRunMode::ONLY_V3: { - data_storage_v3->unregisterExternalPagesCallbacks(ns_id); + data_storage_v3->unregisterExternalPagesCallbacks(table_id); break; } case PageStorageRunMode::MIX_MODE: @@ -768,13 +789,13 @@ void StoragePool::shutdown() data_storage_v2->shutdown(); // We have transformed all external pages from V2 to V3 in `restore`, so // only need to unregister callbacks for V3. - data_storage_v3->unregisterExternalPagesCallbacks(ns_id); + data_storage_v3->unregisterExternalPagesCallbacks(table_id); break; } case PageStorageRunMode::UNI_PS: { uni_ps->unregisterUniversalExternalPagesCallbacks( - UniversalPageIdFormat::toFullPrefix(keyspace_id, StorageType::Data, ns_id)); + UniversalPageIdFormat::toFullPrefix(keyspace_id, StorageType::Data, table_id)); break; } default: @@ -834,7 +855,7 @@ void StoragePool::drop() } } -PageIdU64 StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who) +PageIdU64 StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who) const { // In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID. // After TiFlash process restored, the ID will be inserted into the stable delegator, but we may @@ -842,7 +863,7 @@ PageIdU64 StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, c PageIdU64 dtfile_id; do { - dtfile_id = ++max_data_page_id; + dtfile_id = global_id_allocator->newDataPageIdForDTFile(); auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false); fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, { @@ -868,12 +889,22 @@ PageIdU64 StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, c return dtfile_id; } +PageIdU64 StoragePool::newLogPageId() const +{ + return global_id_allocator->newLogPageId(); +} + +PageIdU64 StoragePool::newMetaPageId() const +{ + return global_id_allocator->newMetaPageId(); +} + template inline static PageReaderPtr newReader( const PageStorageRunMode run_mode, KeyspaceID keyspace_id, StorageType tag, - const NamespaceID ns_id, + const NamespaceID table_id, T & storage_v2, T & storage_v3, UniversalPageStoragePtr uni_ps, @@ -888,7 +919,7 @@ inline static PageReaderPtr newReader( run_mode, keyspace_id, tag, - ns_id, + table_id, storage_v2, nullptr, /*uni_ps*/ nullptr, @@ -899,7 +930,7 @@ inline static PageReaderPtr newReader( run_mode, keyspace_id, tag, - ns_id, + table_id, nullptr, storage_v3, /*uni_ps*/ nullptr, @@ -910,7 +941,7 @@ inline static PageReaderPtr newReader( run_mode, keyspace_id, tag, - ns_id, + table_id, storage_v2, storage_v3, /*uni_ps*/ nullptr, @@ -924,7 +955,7 @@ inline static PageReaderPtr newReader( run_mode, keyspace_id, tag, - ns_id, + table_id, nullptr, nullptr, uni_ps, @@ -941,7 +972,7 @@ PageReaderPtr StoragePool::newLogReader(ReadLimiterPtr read_limiter, bool snapsh run_mode, keyspace_id, StorageType::Log, - ns_id, + table_id, log_storage_v2, log_storage_v3, uni_ps, @@ -956,7 +987,7 @@ PageReaderPtr StoragePool::newLogReader(ReadLimiterPtr read_limiter, PageStorage run_mode, keyspace_id, StorageType::Log, - ns_id, + table_id, log_storage_v2, log_storage_v3, uni_ps, @@ -970,7 +1001,7 @@ PageReaderPtr StoragePool::newDataReader(ReadLimiterPtr read_limiter, bool snaps run_mode, keyspace_id, StorageType::Data, - ns_id, + table_id, data_storage_v2, data_storage_v3, uni_ps, @@ -985,7 +1016,7 @@ PageReaderPtr StoragePool::newMetaReader(ReadLimiterPtr read_limiter, bool snaps run_mode, keyspace_id, StorageType::Meta, - ns_id, + table_id, meta_storage_v2, meta_storage_v3, uni_ps, diff --git a/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.h index 33bf28af65b..5fda34fe9be 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool.h @@ -16,6 +16,8 @@ #include #include +#include +#include #include #include #include @@ -32,8 +34,6 @@ using WriteLimiterPtr = std::shared_ptr; class ReadLimiter; using ReadLimiterPtr = std::shared_ptr; -struct Settings; -class Context; class StoragePathPool; class PathPool; class StableDiskDelegator; @@ -53,9 +53,18 @@ class StoragePool : private boost::noncopyable StoragePool( Context & global_ctx, KeyspaceID keyspace_id_, - NamespaceID ns_id_, + NamespaceID table_id_, StoragePathPool & storage_path_pool_, - const String & name = ""); + const String & name); + + // For test + StoragePool( + Context & global_ctx, + KeyspaceID keyspace_id_, + NamespaceID table_id_, + StoragePathPool & storage_path_pool_, + GlobalPageIdAllocatorPtr page_id_allocator_, + const String & name); PageStorageRunMode restore(); @@ -63,7 +72,7 @@ class StoragePool : private boost::noncopyable KeyspaceID getKeyspaceID() const { return keyspace_id; } - NamespaceID getNamespaceID() const { return ns_id; } + NamespaceID getTableID() const { return table_id; } PageStorageRunMode getPageStorageRunMode() const { return run_mode; } @@ -131,9 +140,9 @@ class StoragePool : private boost::noncopyable // StoragePool will assign the max_log_page_id/max_meta_page_id/max_data_page_id by the global max id // regardless of ns_id while being restored. This causes the ids in a table to not be continuously incremented. - PageIdU64 newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who); - PageIdU64 newLogPageId() { return ++max_log_page_id; } - PageIdU64 newMetaPageId() { return ++max_meta_page_id; } + PageIdU64 newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who) const; + PageIdU64 newLogPageId() const; + PageIdU64 newMetaPageId() const; #ifndef DBMS_PUBLIC_GTEST private: @@ -149,11 +158,8 @@ class StoragePool : private boost::noncopyable LoggerPtr logger; PageStorageRunMode run_mode; - const KeyspaceID keyspace_id; - - // whether the three storage instance is owned by this StoragePool - const NamespaceID ns_id; + const NamespaceID table_id; StoragePathPool & storage_path_pool; @@ -181,9 +187,7 @@ class StoragePool : private boost::noncopyable Context & global_context; - std::atomic max_log_page_id = 0; - std::atomic max_data_page_id = 0; - std::atomic max_meta_page_id = 0; + GlobalPageIdAllocatorPtr global_id_allocator; BackgroundProcessingPool::TaskHandle gc_handle = nullptr; diff --git a/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool_fwd.h b/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool_fwd.h index 4dc7aa17166..3342860631a 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool_fwd.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool/StoragePool_fwd.h @@ -18,6 +18,8 @@ namespace DB::DM { +class GlobalPageIdAllocator; +using GlobalPageIdAllocatorPtr = std::shared_ptr; class StoragePool; using StoragePoolPtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h b/dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h index 1fec5fe7e9e..680789aad10 100644 --- a/dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h +++ b/dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h @@ -46,7 +46,7 @@ struct WriteBatches : private boost::noncopyable explicit WriteBatches(StoragePool & storage_pool_, const WriteLimiterPtr & write_limiter_ = nullptr) : keyspace_id(storage_pool_.getKeyspaceID()) - , ns_id(storage_pool_.getNamespaceID()) + , ns_id(storage_pool_.getTableID()) , run_mode(storage_pool_.getPageStorageRunMode()) , log(run_mode, keyspace_id, StorageType::Log, ns_id) , data(run_mode, keyspace_id, StorageType::Data, ns_id) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index dc6fa1b822c..bb3b05cdc58 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include #include @@ -36,6 +38,7 @@ #include #include + namespace CurrentMetrics { extern const Metric DT_SnapshotOfRead; @@ -71,17 +74,25 @@ class SegmentTest : public DB::base::TiFlashStorageTestBasic TiFlashStorageTestBasic::SetUp(); table_columns = std::make_shared(); - segment = reload(); + segment = buildFirstSegment(); ASSERT_EQ(segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); } protected: - SegmentPtr reload(const ColumnDefinesPtr & pre_define_columns = {}, DB::Settings && db_settings = DB::Settings()) + SegmentPtr buildFirstSegment( + const ColumnDefinesPtr & pre_define_columns = {}, + DB::Settings && db_settings = DB::Settings()) { TiFlashStorageTestBasic::reload(std::move(db_settings)); storage_path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); - storage_pool - = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 100, *storage_path_pool, "test.t1"); + page_id_allocator = std::make_shared(); + storage_pool = std::make_shared( + *db_context, + NullspaceID, + /*ns_id*/ 100, + *storage_path_pool, + page_id_allocator, + "test.t1"); storage_pool->restore(); ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns() : pre_define_columns; setColumns(cols); @@ -91,7 +102,7 @@ class SegmentTest : public DB::base::TiFlashStorageTestBasic *dm_context, table_columns, RowKeyRange::newAll(false, 1), - storage_pool->newMetaPageId(), + DELTA_MERGE_FIRST_SEGMENT_ID, 0); } @@ -118,6 +129,7 @@ class SegmentTest : public DB::base::TiFlashStorageTestBasic protected: /// all these var lives as ref in dm_context + GlobalPageIdAllocatorPtr page_id_allocator; std::shared_ptr storage_path_pool; std::shared_ptr storage_pool; ColumnDefinesPtr table_columns; @@ -436,7 +448,7 @@ try Settings my_settings; const auto enable_relevant_place = GetParam(); my_settings.dt_enable_relevant_place = enable_relevant_place; - this->reload({}, std::move(my_settings)); + this->buildFirstSegment({}, std::move(my_settings)); const size_t num_rows_write = 300; { @@ -990,7 +1002,7 @@ try settings.dt_segment_limit_rows = 11; settings.dt_segment_delta_limit_rows = 7; - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); + segment = buildFirstSegment(DMTestEnv::getDefaultColumns(), std::move(settings)); size_t num_batches_written = 0; const size_t num_rows_per_write = 5; @@ -1306,7 +1318,7 @@ try auto columns_before_ddl = DMTestEnv::getDefaultColumns(); columns_before_ddl->emplace_back(column_i8_before_ddl); DB::Settings db_settings; - segment = reload(columns_before_ddl, std::move(db_settings)); + segment = buildFirstSegment(columns_before_ddl, std::move(db_settings)); /// write to segment Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); @@ -1431,7 +1443,7 @@ try auto columns_before_ddl = DMTestEnv::getDefaultColumns(); // Not cache any rows DB::Settings db_settings; - segment = reload(columns_before_ddl, std::move(db_settings)); + segment = buildFirstSegment(columns_before_ddl, std::move(db_settings)); } const size_t num_rows_write = 100; @@ -1551,7 +1563,7 @@ try Settings settings = dmContext().global_context.getSettings(); settings.dt_segment_stable_pack_rows = 10; - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); + segment = buildFirstSegment(DMTestEnv::getDefaultColumns(), std::move(settings)); const size_t num_rows_write_every_round = 100; const size_t write_round = 3; @@ -1592,7 +1604,7 @@ try Settings settings = dmContext().global_context.getSettings(); settings.dt_segment_stable_pack_rows = 10; - segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); + segment = buildFirstSegment(DMTestEnv::getDefaultColumns(), std::move(settings)); const size_t num_rows_write_every_round = 100; const size_t write_round = 3; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index d6530f0a9c3..65f3e4c7ae2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include #include @@ -44,40 +46,46 @@ class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic void SetUp() override { TiFlashStorageTestBasic::SetUp(); - table_columns_ = std::make_shared(); + table_columns = std::make_shared(); - segment = reload(); + segment = buildFirstSegment(); ASSERT_EQ(segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); } protected: - SegmentPtr reload(ColumnDefinesPtr cols = {}, DB::Settings && db_settings = DB::Settings()) + SegmentPtr buildFirstSegment(ColumnDefinesPtr cols = {}, DB::Settings && db_settings = DB::Settings()) { TiFlashStorageTestBasic::reload(std::move(db_settings)); path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t", false)); - storage_pool = std::make_shared(*db_context, NullspaceID, /*table_id*/ 100, *path_pool, "test.t1"); + page_id_allocator = std::make_shared(); + storage_pool = std::make_shared( + *db_context, + NullspaceID, + /*table_id*/ 100, + *path_pool, + page_id_allocator, + "test.t1"); storage_pool->restore(); if (!cols) cols = DMTestEnv::getDefaultColumns( is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID); setColumns(cols); - auto segment_id = storage_pool->newMetaPageId(); return Segment::newSegment( Logger::get(), - *dm_context_, - table_columns_, + *dm_context, + table_columns, RowKeyRange::newAll(is_common_handle, rowkey_column_size), - segment_id, + DELTA_MERGE_FIRST_SEGMENT_ID, 0); } // setColumns should update dm_context at the same time void setColumns(const ColumnDefinesPtr & columns) { - *table_columns_ = *columns; + *table_columns = *columns; - dm_context_ = DMContext::createUnique( + dm_context = DMContext::createUnique( *db_context, path_pool, storage_pool, @@ -89,18 +97,19 @@ class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic db_context->getSettingsRef()); } - const ColumnDefinesPtr & tableColumns() const { return table_columns_; } + const ColumnDefinesPtr & tableColumns() const { return table_columns; } - DMContext & dmContext() { return *dm_context_; } + DMContext & dmContext() { return *dm_context; } private: /// all these var lives as ref in dm_context + GlobalPageIdAllocatorPtr page_id_allocator; std::shared_ptr path_pool; std::shared_ptr storage_pool; - ColumnDefinesPtr table_columns_; + ColumnDefinesPtr table_columns; DM::DeltaMergeStore::Settings settings; /// dm_context - std::unique_ptr dm_context_; + std::unique_ptr dm_context; protected: // the segment we are going to test @@ -306,13 +315,13 @@ try } CATCH -class SegmentDeletion_Common_Handle_test +class SegmentDeletionCommonHandleTest : public SegmentCommonHandleTest , public testing::WithParamInterface> { }; -TEST_P(SegmentDeletion_Common_Handle_test, DeleteDataInDelta) +TEST_P(SegmentDeletionCommonHandleTest, DeleteDataInDelta) try { const size_t num_rows_write = 100; @@ -382,7 +391,7 @@ try } CATCH -TEST_P(SegmentDeletion_Common_Handle_test, DeleteDataInStable) +TEST_P(SegmentDeletionCommonHandleTest, DeleteDataInStable) try { const size_t num_rows_write = 100; @@ -460,7 +469,7 @@ try } CATCH -TEST_P(SegmentDeletion_Common_Handle_test, DeleteDataInStableAndDelta) +TEST_P(SegmentDeletionCommonHandleTest, DeleteDataInStableAndDelta) try { const size_t num_rows_write = 100; @@ -550,7 +559,7 @@ CATCH INSTANTIATE_TEST_CASE_P( WhetherReadOrMergeDeltaBeforeDeleteRange, - SegmentDeletion_Common_Handle_test, + SegmentDeletionCommonHandleTest, testing::Combine(testing::Bool(), testing::Bool())); TEST_F(SegmentCommonHandleTest, DeleteRead) @@ -929,7 +938,7 @@ try settings.dt_segment_limit_rows = 11; settings.dt_segment_delta_limit_rows = 7; - segment = reload(DMTestEnv::getDefaultColumns(DMTestEnv::PkType::CommonHandle), std::move(settings)); + segment = buildFirstSegment(DMTestEnv::getDefaultColumns(DMTestEnv::PkType::CommonHandle), std::move(settings)); size_t num_batches_written = 0; const size_t num_rows_per_write = 5; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp index e7e47c0705d..22ad30f0dd0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include #include #include @@ -96,7 +98,7 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic kvstore->setStore(meta_store); } - segment = reload(); + segment = buildFirstSegment(); ASSERT_EQ(segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); } @@ -118,11 +120,20 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic } protected: - SegmentPtr reload(const ColumnDefinesPtr & pre_define_columns = {}, DB::Settings && db_settings = DB::Settings()) + SegmentPtr buildFirstSegment( + const ColumnDefinesPtr & pre_define_columns = {}, + DB::Settings && db_settings = DB::Settings()) { TiFlashStorageTestBasic::reload(std::move(db_settings)); storage_path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); - storage_pool = std::make_shared(*db_context, NullspaceID, ns_id, *storage_path_pool, "test.t1"); + page_id_allocator = std::make_shared(); + storage_pool = std::make_shared( + *db_context, + NullspaceID, + ns_id, + *storage_path_pool, + page_id_allocator, + "test.t1"); storage_pool->restore(); ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns() : pre_define_columns; setColumns(cols); @@ -132,7 +143,7 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic *dm_context, table_columns, RowKeyRange::newAll(false, 1), - storage_pool->newMetaPageId(), + DELTA_MERGE_FIRST_SEGMENT_ID, 0); } @@ -159,6 +170,7 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic protected: /// all these var lives as ref in dm_context + GlobalPageIdAllocatorPtr page_id_allocator; std::shared_ptr storage_path_pool; std::shared_ptr storage_pool; ColumnDefinesPtr table_columns; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 756e6025f4e..9e1d0089d39 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -763,6 +763,7 @@ try table_info.pk_is_handle = false; // max page id is only updated at restart, so we need recreate page v3 before recreate table + ctx->getGlobalContext().initializeGlobalPageIdAllocator(); ctx->getGlobalContext().initializeGlobalStoragePoolIfNeed(ctx->getPathPool()); storage = StorageDeltaMerge::create( "TiFlash", diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_global_page_id_allocator.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_global_page_id_allocator.cpp new file mode 100644 index 00000000000..3365e442604 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_global_page_id_allocator.cpp @@ -0,0 +1,104 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include + +namespace DB::DM::tests +{ + +TEST(GlobalPageIdAllocatorTest, Normal) +{ + GlobalPageIdAllocator allocator; + + // Note the meta page id is allocated begin with 2 + // 1 is reserved for `DELTA_MERGE_FIRST_SEGMENT_ID` for each + // physical table + EXPECT_EQ(allocator.newMetaPageId(), 2); + EXPECT_EQ(allocator.newMetaPageId(), 3); + EXPECT_EQ(allocator.newMetaPageId(), 4); + allocator.raiseMetaPageIdLowerBound(1024); + EXPECT_EQ(allocator.newMetaPageId(), 1025); + EXPECT_EQ(allocator.newMetaPageId(), 1026); + EXPECT_EQ(allocator.newMetaPageId(), 1027); + + EXPECT_EQ(allocator.newLogPageId(), 1); + EXPECT_EQ(allocator.newLogPageId(), 2); + EXPECT_EQ(allocator.newLogPageId(), 3); + allocator.raiseLogPageIdLowerBound(65536); + EXPECT_EQ(allocator.newLogPageId(), 65537); + EXPECT_EQ(allocator.newLogPageId(), 65538); + + EXPECT_EQ(allocator.newDataPageIdForDTFile(), 1); + EXPECT_EQ(allocator.newDataPageIdForDTFile(), 2); + EXPECT_EQ(allocator.newDataPageIdForDTFile(), 3); + allocator.raiseDataPageIdLowerBound(114); + EXPECT_EQ(allocator.newDataPageIdForDTFile(), 115); + EXPECT_EQ(allocator.newDataPageIdForDTFile(), 116); +} + +TEST(GlobalPageIdAllocatorTest, IdChangedBeforeRaiseLowerBound) +{ + GlobalPageIdAllocator allocator; + + EXPECT_EQ(allocator.newLogPageId(), 1); + EXPECT_EQ(allocator.newLogPageId(), 2); + EXPECT_EQ(allocator.newLogPageId(), 3); + + auto sp_raise_bound = SyncPointCtl::enableInScope("before_GlobalPageIdAllocator::raiseLowerBoundCAS_1"); + auto th_raise_lower_bound = std::async([&]() { + allocator.raiseLogPageIdLowerBound(1024); + EXPECT_EQ(allocator.newLogPageId(), 1025); + }); + + sp_raise_bound.waitAndPause(); + EXPECT_EQ(allocator.newLogPageId(), 4); + sp_raise_bound.next(); + EXPECT_EQ(allocator.newLogPageId(), 5); + + // use `disable` instead of `next` to break the loop inside `raiseTargetByLowerBound` + sp_raise_bound.disable(); + th_raise_lower_bound.get(); + EXPECT_EQ(allocator.newLogPageId(), 1026); +} + +TEST(GlobalPageIdAllocatorTest, IdRaisedBeforeRaiseLowerBound) +{ + GlobalPageIdAllocator allocator; + + EXPECT_EQ(allocator.newLogPageId(), 1); + EXPECT_EQ(allocator.newLogPageId(), 2); + EXPECT_EQ(allocator.newLogPageId(), 3); + + auto sp_raise_bound = SyncPointCtl::enableInScope("before_GlobalPageIdAllocator::raiseLowerBoundCAS_1"); + auto th_raise_lower_bound_1 = std::async([&]() { allocator.raiseLogPageIdLowerBound(1024); }); + auto th_raise_lower_bound_2 = std::async([&]() { allocator.raiseLogPageIdLowerBound(3000); }); + + sp_raise_bound.waitAndPause(); + + // use `disable` instead of `next` to break the loop inside `raiseTargetByLowerBound` + sp_raise_bound.disable(); + + th_raise_lower_bound_2.get(); + EXPECT_EQ(allocator.newLogPageId(), 3001); + th_raise_lower_bound_1.get(); + EXPECT_EQ(allocator.newLogPageId(), 3002); +} + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 387c4af06e0..757442de688 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -832,12 +832,13 @@ SegmentPtr SegmentTestBasic::reload( : pre_define_columns; setColumns(cols); + // Always return the first segment return Segment::newSegment( Logger::get(), *dm_context, table_columns, RowKeyRange::newAll(is_common_handle, 1), - storage_pool->newMetaPageId(), + DELTA_MERGE_FIRST_SEGMENT_ID, 0); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 9481ee291b1..4aff76f7554 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -138,8 +138,6 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic // std::map operation_statistics; - SegmentPtr reload(bool is_common_handle, const ColumnDefinesPtr & pre_define_columns, DB::Settings && db_settings); - // setColumns should update dm_context at the same time void setColumns(const ColumnDefinesPtr & columns); @@ -154,6 +152,9 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic std::pair getSegmentForRead(PageIdU64 segment_id); +private: + SegmentPtr reload(bool is_common_handle, const ColumnDefinesPtr & pre_define_columns, DB::Settings && db_settings); + protected: inline static constexpr PageIdU64 NAMESPACE_ID = 100; diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index dd5028057a2..14da643e73b 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -56,6 +56,7 @@ DTWorkload::DTWorkload( key_gen = KeyGenerator::create(opts_); ts_gen = std::make_unique(); // max page id is only updated at restart, so we need recreate page v3 before recreate table + context->initializeGlobalPageIdAllocator(); context->initializeGlobalStoragePoolIfNeed(context->getPathPool()); Stopwatch sw; store = std::make_unique( diff --git a/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp index 191a547d436..8b7ab602e24 100644 --- a/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/MainEntry.cpp @@ -236,6 +236,7 @@ void run(WorkloadOptions & opts, ContextPtr context) std::vector stats; try { + context->initializeGlobalPageIdAllocator(); // HandleTable is a unordered_map that stores handle->timestamp for data verified. auto handle_table = createHandleTable(opts); // Table Schema diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 5a82b5f7bd9..9dbcfc47bfa 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -95,7 +95,8 @@ typename PageDirectoryFactory::PageDirectoryPtr PageDirectoryFactory::PageDirectoryPtr PageDirectoryFactorysequence = max_applied_ver.sequence; RUNTIME_CHECK(dir->sequence, mock_sequence); @@ -204,9 +205,10 @@ void PageDirectoryFactory::loadEdit( // So we filter the REF record which is less than or equal to the `filter_seq` // Is this entry could be duplicated with the dumped snapshot bool filter = !force_apply && r.version.sequence <= filter_seq && r.type == EditRecordType::REF; - if (filter) + if (unlikely(filter)) { LOG_INFO(Logger::get(), "Not idempotent REF record is ignored during restart, record={}", r); + updateMaxIdByRecord(dir, r); continue; } @@ -245,21 +247,7 @@ void PageDirectoryFactory::applyRecord( } } - if constexpr (std::is_same_v) - { - // We only need page id under specific prefix after restart. - // If you want to add other prefix here, make sure the page id allocation space is still enough after adding it. - if (UniversalPageIdFormat::isType(r.page_id, StorageType::Data) - || UniversalPageIdFormat::isType(r.page_id, StorageType::Log) - || UniversalPageIdFormat::isType(r.page_id, StorageType::Meta)) - { - dir->max_page_id = std::max(dir->max_page_id, Trait::PageIdTrait::getU64ID(r.page_id)); - } - } - else - { - dir->max_page_id = std::max(dir->max_page_id, Trait::PageIdTrait::getU64ID(r.page_id)); - } + updateMaxIdByRecord(dir, r); const auto & version_list = iter->second; const auto & restored_version = r.version; @@ -362,6 +350,28 @@ void PageDirectoryFactory::applyRecord( } } +template +void PageDirectoryFactory::updateMaxIdByRecord( + const PageDirectoryPtr & dir, + const typename PageEntriesEdit::EditRecord & r) +{ + if constexpr (std::is_same_v) + { + // We only need page id under specific prefix after restart. + // If you want to add other prefix here, make sure the page id allocation space is still enough after adding it. + if (UniversalPageIdFormat::isType(r.page_id, StorageType::Data) + || UniversalPageIdFormat::isType(r.page_id, StorageType::Log) + || UniversalPageIdFormat::isType(r.page_id, StorageType::Meta)) + { + dir->max_page_id = std::max(dir->max_page_id, Trait::PageIdTrait::getU64ID(r.page_id)); + } + } + else + { + dir->max_page_id = std::max(dir->max_page_id, Trait::PageIdTrait::getU64ID(r.page_id)); + } +} + template void PageDirectoryFactory::loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader) { diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h index 710cae20591..d6f6e57e0bf 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h @@ -67,7 +67,8 @@ class PageDirectoryFactory const String & storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, - PageEntriesEdit & edit); + PageEntriesEdit & edit, + UInt64 filter_seq = 0); // just for test PageDirectoryFactory & setBlobStats(BlobStats & blob_stats_) @@ -83,6 +84,7 @@ class PageDirectoryFactory const PageDirectoryPtr & dir, const typename PageEntriesEdit::EditRecord & r, bool strict_check); + static void updateMaxIdByRecord(const PageDirectoryPtr & dir, const typename PageEntriesEdit::EditRecord & r); void restoreBlobStats(const PageDirectoryPtr & dir); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 641402c5c31..50ef1132191 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -1299,6 +1300,31 @@ try } CATCH +TEST_F(PageDirectoryTest, RestoreWithIdempotentRef) +try +{ + auto provider = DB::tests::TiFlashTestEnv::getDefaultFileProvider(); + auto path = getTemporaryPath(); + PSDiskDelegatorPtr delegator = std::make_shared(path); + PageDirectoryFactory factory; + + // Generate an edit with idempotent REF operation + const UInt64 filter_seq = 1000; + u128::PageEntriesEdit edit; + edit.appendRecord({.type = EditRecordType::UPSERT, .page_id = UInt128(10000), .version = PageVersion(53, 1)}); + edit.appendRecord( + {.type = EditRecordType::REF, + .page_id = UInt128(90000), + .ori_page_id = UInt128(1), + .version = PageVersion(51)}); + edit.appendRecord({.type = EditRecordType::UPSERT, .page_id = UInt128(1), .version = PageVersion(52, 1)}); + edit.appendRecord({.type = EditRecordType::PUT, .page_id = UInt128(5), .version = PageVersion(1001)}); + + auto d = factory.createFromEditForTest(getCurrentTestName(), provider, delegator, edit, filter_seq); + EXPECT_EQ(d->getMaxIdAfterRestart(), 90000); +} +CATCH + TEST_F(PageDirectoryTest, IncrRefDuringDump) try { diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index d61ddb52edd..ea6d7cde338 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -155,6 +155,7 @@ void TiFlashTestEnv::addGlobalContext( global_context->getFileProvider()); global_context->setPageStorageRunMode(ps_run_mode); + global_context->initializeGlobalPageIdAllocator(); global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); global_context->initializeWriteNodePageStorageIfNeed(global_context->getPathPool()); LOG_INFO(Logger::get(), "Storage mode : {}", static_cast(global_context->getPageStorageRunMode())); @@ -194,6 +195,7 @@ ContextPtr TiFlashTestEnv::getContext(const DB::Settings & settings, Strings tes context.setPath(root_path); auto paths = getPathPool(testdata_path); context.setPathPool(paths.first, paths.second, Strings{}, context.getPathCapacity(), context.getFileProvider()); + global_contexts[0]->initializeGlobalPageIdAllocator(); global_contexts[0]->initializeGlobalStoragePoolIfNeed(context.getPathPool()); global_contexts[0]->tryReleaseWriteNodePageStorageForTest(); global_contexts[0]->initializeWriteNodePageStorageIfNeed(context.getPathPool());