Skip to content

Commit

Permalink
Storage: Fix max-id being mis-reused cause data corruption after chan…
Browse files Browse the repository at this point in the history
…ging tiflash replica number (pingcap#8698)

close pingcap#8695
  • Loading branch information
JaySon-Huang committed Jan 25, 2024
1 parent 707537a commit 0663cd4
Show file tree
Hide file tree
Showing 27 changed files with 504 additions and 166 deletions.
19 changes: 19 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/KVStore/BackgroundService.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/PageConstants.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/PathCapacityMetrics.h>
Expand Down Expand Up @@ -168,6 +170,7 @@ struct ContextShared
FileProviderPtr file_provider; /// File provider.
IORateLimiter io_rate_limiter;
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalPageIdAllocatorPtr global_page_id_allocator;
DM::GlobalStoragePoolPtr global_storage_pool;

/// The PS instance available on Write Node.
Expand Down Expand Up @@ -1720,6 +1723,22 @@ void Context::setPageStorageRunMode(PageStorageRunMode run_mode) const
shared->storage_run_mode = run_mode;
}

bool Context::initializeGlobalPageIdAllocator()
{
auto lock = getLock();
if (!shared->global_page_id_allocator)
{
shared->global_page_id_allocator = std::make_shared<DM::GlobalPageIdAllocator>();
}
return true;
}

DM::GlobalPageIdAllocatorPtr Context::getGlobalPageIdAllocator() const
{
auto lock = getLock();
return shared->global_page_id_allocator;
}

bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
using GlobalStoragePoolPtr = std::shared_ptr<GlobalStoragePool>;
class GlobalPageIdAllocator;
using GlobalPageIdAllocatorPtr = std::shared_ptr<GlobalPageIdAllocator>;
} // namespace DM

/// (database name, table name)
Expand Down Expand Up @@ -446,6 +448,10 @@ class Context
void initializePageStorageMode(const PathPool & path_pool, UInt64 storage_page_format_version);
void setPageStorageRunMode(PageStorageRunMode run_mode) const;
PageStorageRunMode getPageStorageRunMode() const;

bool initializeGlobalPageIdAllocator();
DM::GlobalPageIdAllocatorPtr getGlobalPageIdAllocator() const;

bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/DTTool/DTTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class ImitativeEnv
global_context->getPathCapacity(),
global_context->getFileProvider());
TiFlashRaftConfig raft_config;
global_context->initializeGlobalPageIdAllocator();
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
raft_config.ignore_databases = {"default", "system"};
raft_config.engine = TiDB::StorageEngine::DT;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
settings.bytes_that_rss_larger_than_limit);

/// PageStorage run mode has been determined above
global_context->initializeGlobalPageIdAllocator();
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
{
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
Expand Down
30 changes: 11 additions & 19 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,16 @@ DeltaMergeStore::DeltaMergeStore(
try
{
page_storage_run_mode = storage_pool->restore(); // restore from disk
// If there is meta of `DELTA_MERGE_FIRST_SEGMENT_ID`, restore all segments
// If there is no `DELTA_MERGE_FIRST_SEGMENT_ID`, the first segment will be created by `createFirstSegment` later
if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID);
first_segment_entry.isValid())
{
// Restore all existing segments
auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;

// parallel restore segment to speed up
if (thread_pool)
{
// parallel restore segment to speed up
auto wait_group = thread_pool->waitGroup();
auto segment_ids = Segment::getAllSegmentIds(*dm_context, segment_id);
for (auto & segment_id : segment_ids)
Expand All @@ -285,6 +287,7 @@ DeltaMergeStore::DeltaMergeStore(
}
else
{
// restore segment one by one
while (segment_id != 0)
{
auto segment = Segment::restoreSegment(log, *dm_context, segment_id);
Expand Down Expand Up @@ -2124,9 +2127,10 @@ std::pair<SegmentPtr, bool> DeltaMergeStore::getSegmentByStartKey(
if (create_if_empty && is_empty)
{
auto dm_context = newDMContext(global_context, global_context.getSettingsRef());
auto page_storage_run_mode = storage_pool->getPageStorageRunMode();
createFirstSegment(*dm_context, page_storage_run_mode);
createFirstSegment(*dm_context);
}
// The first segment may be created in this thread or another thread,
// try again to get the new created segment.
} while (create_if_empty && is_empty);

if (throw_if_notfound)
Expand All @@ -2142,30 +2146,18 @@ std::pair<SegmentPtr, bool> DeltaMergeStore::getSegmentByStartKey(
}
}

void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context, PageStorageRunMode page_storage_run_mode)
void DeltaMergeStore::createFirstSegment(DM::DMContext & dm_context)
{
std::unique_lock lock(read_write_mutex);
if (!segments.empty())
{
return;
}

auto segment_id = storage_pool->newMetaPageId();
if (segment_id != DELTA_MERGE_FIRST_SEGMENT_ID)
{
RUNTIME_CHECK_MSG(
page_storage_run_mode != PageStorageRunMode::ONLY_V2,
"The first segment id should be {}, but get {}, run_mode={}",
DELTA_MERGE_FIRST_SEGMENT_ID,
segment_id,
magic_enum::enum_name(page_storage_run_mode));

// In ONLY_V3 or MIX_MODE, If create a new DeltaMergeStore
// Should used fixed DELTA_MERGE_FIRST_SEGMENT_ID to create first segment
segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
}
const auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
LOG_INFO(log, "creating the first segment with segment_id={}", segment_id);

// newSegment will also commit the segment meta to PageStorage
auto first_segment = Segment::newSegment( //
log,
dm_context,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ class DeltaMergeStore : private boost::noncopyable
const RowKeyValueRef & start_key,
bool create_if_empty,
bool throw_if_notfound);
void createFirstSegment(DM::DMContext & dm_context, PageStorageRunMode page_storage_run_mode);
void createFirstSegment(DM::DMContext & dm_context);

Context & global_context;
std::shared_ptr<StoragePathPool> path_pool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
// that callbacks is called after the `DeltaMergeStore` shutdown or dropped,
// we must make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.prefix = storage_pool->getNamespaceID();
callbacks.prefix = storage_pool->getTableID();
if (auto data_store = dm_context->db_context.getSharedContextDisagg()->remote_data_store; !data_store)
{
callbacks.scanner
Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/Page/PageDefinesBase.h>
#include <common/logger_useful.h>

#include <atomic>

namespace DB::DM
{

void GlobalPageIdAllocator::raiseTargetByLowerBound(std::atomic<PageIdU64> & target, PageIdU64 lower_bound)
{
PageIdU64 old_value = target.load();
while (true)
{
// already satisfied the lower_bound, done.
if (old_value >= lower_bound)
break;
SYNC_FOR("before_GlobalPageIdAllocator::raiseLowerBoundCAS_1");
// try raise to the lower_bound
if (target.compare_exchange_strong(old_value, lower_bound))
{
// raise success, done.
break;
}
// else the `old_value` is updated, try again
}
}

} // namespace DB::DM
54 changes: 54 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/KVStore/Types.h>
#include <Storages/Page/PageDefinesBase.h>
#include <Storages/Page/PageStorage_fwd.h>

#include <atomic>

namespace DB::DM
{

class GlobalPageIdAllocator : private boost::noncopyable
{
public:
GlobalPageIdAllocator() = default;

void raiseDataPageIdLowerBound(PageIdU64 lower_bound) { raiseTargetByLowerBound(max_data_page_id, lower_bound); }
void raiseLogPageIdLowerBound(PageIdU64 lower_bound) { raiseTargetByLowerBound(max_log_page_id, lower_bound); }
void raiseMetaPageIdLowerBound(PageIdU64 lower_bound) { raiseTargetByLowerBound(max_meta_page_id, lower_bound); }

PageIdU64 newDataPageIdForDTFile() { return ++max_data_page_id; }
PageIdU64 newLogPageId() { return ++max_log_page_id; }
PageIdU64 newMetaPageId() { return ++max_meta_page_id; }

std::tuple<PageIdU64, PageIdU64, PageIdU64> getCurrentIds() const
{
return std::make_tuple(max_log_page_id.load(), max_data_page_id.load(), max_meta_page_id.load());
}

private:
static void raiseTargetByLowerBound(std::atomic<PageIdU64> & target, PageIdU64 lower_bound);

private:
std::atomic<PageIdU64> max_log_page_id = 0;
std::atomic<PageIdU64> max_data_page_id = 0;
// The meta_page_id == 1 is reserved for first segment in each physical table
std::atomic<PageIdU64> max_meta_page_id = 1;
};

} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePoolConfig.h>
#include <Storages/Page/Page.h>
Expand Down
Loading

0 comments on commit 0663cd4

Please sign in to comment.