Skip to content

Commit

Permalink
Storage: Fix max-id being mis-reused cause data corruption after chan…
Browse files Browse the repository at this point in the history
…ging tiflash replica number (#8698) (#8733)

close #8695
  • Loading branch information
ti-chi-bot authored Jan 26, 2024
1 parent bdc1e9d commit 4335916
Show file tree
Hide file tree
Showing 51 changed files with 843 additions and 430 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
#include <Debug/dbgFuncMisc.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>

#include <fstream>
#include <regex>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/MarkCache.h>
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/KVStore/BackgroundService.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/PageConstants.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/PathCapacityMetrics.h>
Expand Down Expand Up @@ -167,6 +170,7 @@ struct ContextShared
FileProviderPtr file_provider; /// File provider.
IORateLimiter io_rate_limiter;
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalPageIdAllocatorPtr global_page_id_allocator;
DM::GlobalStoragePoolPtr global_storage_pool;

/// The PS instance available on Write Node.
Expand Down Expand Up @@ -1719,6 +1723,22 @@ void Context::setPageStorageRunMode(PageStorageRunMode run_mode) const
shared->storage_run_mode = run_mode;
}

bool Context::initializeGlobalPageIdAllocator()
{
auto lock = getLock();
if (!shared->global_page_id_allocator)
{
shared->global_page_id_allocator = std::make_shared<DM::GlobalPageIdAllocator>();
}
return true;
}

DM::GlobalPageIdAllocatorPtr Context::getGlobalPageIdAllocator() const
{
auto lock = getLock();
return shared->global_page_id_allocator;
}

bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
using GlobalStoragePoolPtr = std::shared_ptr<GlobalStoragePool>;
class GlobalPageIdAllocator;
using GlobalPageIdAllocatorPtr = std::shared_ptr<GlobalPageIdAllocator>;
} // namespace DM

/// (database name, table name)
Expand Down Expand Up @@ -446,6 +448,10 @@ class Context
void initializePageStorageMode(const PathPool & path_pool, UInt64 storage_page_format_version);
void setPageStorageRunMode(PageStorageRunMode run_mode) const;
PageStorageRunMode getPageStorageRunMode() const;

bool initializeGlobalPageIdAllocator();
DM::GlobalPageIdAllocatorPtr getGlobalPageIdAllocator() const;

bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/DTTool/DTTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class ImitativeEnv
global_context->getPathCapacity(),
global_context->getFileProvider());
TiFlashRaftConfig raft_config;
global_context->initializeGlobalPageIdAllocator();
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
raft_config.ignore_databases = {"default", "system"};
raft_config.engine = TiDB::StorageEngine::DT;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/FormatVersion.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/PathPool.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
settings.bytes_that_rss_larger_than_limit);

/// PageStorage run mode has been determined above
global_context->initializeGlobalPageIdAllocator();
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
{
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/PathPool.h>
#include <TestUtils/TiFlashStorageTestBasic.h>
#include <gtest/gtest.h>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
#include <Poco/Logger.h>
#include <Server/StorageConfigParser.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/KVStore/MultiRaft/RegionManager.h>
#include <Storages/KVStore/MultiRaft/RegionPersister.h>
#include <Storages/KVStore/Region.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_headers_and_sources(delta_merge ./ReadThread)
add_headers_and_sources(delta_merge ./Remote)
add_headers_and_sources(delta_merge ./Remote/DataStore)
add_headers_and_sources(delta_merge ./Decode)
add_headers_and_sources(delta_merge ./StoragePool)

add_library(delta_merge ${delta_merge_headers} ${delta_merge_sources})
target_link_libraries(delta_merge PRIVATE dbms page)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool_fwd.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
Expand Down Expand Up @@ -130,7 +130,8 @@ class ColumnFile
virtual ColumnFileReaderPtr getReader(
const DMContext & context,
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefinesPtr & col_defs) const = 0;
const ColumnDefinesPtr & col_defs) const
= 0;

/// Note: Only ColumnFileInMemory can be appendable. Other ColumnFiles (i.e. ColumnFilePersisted) have
/// been persisted in the disk and their data will be immutable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/Page/PageStorage.h>

namespace DB::DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/StoragePool_fwd.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/PageDefinesBase.h>

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Delta/DeltaValueSpace.h>
#include <Storages/DeltaMerge/RowKeyFilter.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>

namespace DB::DM
Expand Down
30 changes: 11 additions & 19 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,16 @@ DeltaMergeStore::DeltaMergeStore(
try
{
page_storage_run_mode = storage_pool->restore(); // restore from disk
// If there is meta of `DELTA_MERGE_FIRST_SEGMENT_ID`, restore all segments
// If there is no `DELTA_MERGE_FIRST_SEGMENT_ID`, the first segment will be created by `createFirstSegment` later
if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID);
first_segment_entry.isValid())
{
// Restore all existing segments
auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;

// parallel restore segment to speed up
if (thread_pool)
{
// parallel restore segment to speed up
auto wait_group = thread_pool->waitGroup();
auto segment_ids = Segment::getAllSegmentIds(*dm_context, segment_id);
for (auto & segment_id : segment_ids)
Expand All @@ -285,6 +287,7 @@ DeltaMergeStore::DeltaMergeStore(
}
else
{
// restore segment one by one
while (segment_id != 0)
{
auto segment = Segment::restoreSegment(log, *dm_context, segment_id);
Expand Down Expand Up @@ -2124,9 +2127,10 @@ std::pair<SegmentPtr, bool> DeltaMergeStore::getSegmentByStartKey(
if (create_if_empty && is_empty)
{
auto dm_context = newDMContext(global_context, global_context.getSettingsRef());
auto page_storage_run_mode = storage_pool->getPageStorageRunMode();
createFirstSegment(*dm_context, page_storage_run_mode);
createFirstSegment(*dm_context);
}
// The first segment may be created in this thread or another thread,
// try again to get the new created segment.
} while (create_if_empty && is_empty);

if (throw_if_notfound)
Expand All @@ -2142,30 +2146,18 @@ std::pair<SegmentPtr, bool> DeltaMergeStore::getSegmentByStartKey(
}
}

void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context, PageStorageRunMode page_storage_run_mode)
void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context)
{
std::unique_lock lock(read_write_mutex);
if (!segments.empty())
{
return;
}

auto segment_id = storage_pool->newMetaPageId();
if (segment_id != DELTA_MERGE_FIRST_SEGMENT_ID)
{
RUNTIME_CHECK_MSG(
page_storage_run_mode != PageStorageRunMode::ONLY_V2,
"The first segment id should be {}, but get {}, run_mode={}",
DELTA_MERGE_FIRST_SEGMENT_ID,
segment_id,
magic_enum::enum_name(page_storage_run_mode));

// In ONLY_V3 or MIX_MODE, If create a new DeltaMergeStore
// Should used fixed DELTA_MERGE_FIRST_SEGMENT_ID to create first segment
segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
}
const auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
LOG_INFO(log, "creating the first segment with segment_id={}", segment_id);

// newSegment will also commit the segment meta to PageStorage
auto first_segment = Segment::newSegment( //
log,
dm_context,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ class DeltaMergeStore : private boost::noncopyable
const RowKeyValueRef & start_key,
bool create_if_empty,
bool throw_if_notfound);
void createFirstSegment(DM::DMContext & dm_context, PageStorageRunMode page_storage_run_mode);
void createFirstSegment(DM::DMContext & dm_context);

Context & global_context;
std::shared_ptr<StoragePathPool> path_pool;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/GCOptions.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/PathPool.h>
Expand Down Expand Up @@ -266,7 +266,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
// that callbacks is called after the `DeltaMergeStore` shutdown or dropped,
// we must make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.prefix = storage_pool->getNamespaceID();
callbacks.prefix = storage_pool->getTableID();
if (auto data_store = dm_context->db_context.getSharedContextDisagg()->remote_data_store; !data_store)
{
callbacks.scanner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/Page/PageStorage.h>

namespace DB
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#include <Storages/DeltaMerge/RowKeyOrderedBlockInputStream.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <Storages/DeltaMerge/RowKeyFilter.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StableValueSpace.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/Page/PageDefinesBase.h>
#include <common/logger_useful.h>

#include <atomic>

namespace DB::DM
{

void GlobalPageIdAllocator::raiseTargetByLowerBound(std::atomic<PageIdU64> & target, PageIdU64 lower_bound)
{
PageIdU64 old_value = target.load();
while (true)
{
// already satisfied the lower_bound, done.
if (old_value >= lower_bound)
break;
SYNC_FOR("before_GlobalPageIdAllocator::raiseLowerBoundCAS_1");
// try raise to the lower_bound
if (target.compare_exchange_strong(old_value, lower_bound))
{
// raise success, done.
break;
}
// else the `old_value` is updated, try again
}
}

} // namespace DB::DM
Loading

0 comments on commit 4335916

Please sign in to comment.