Skip to content

Commit

Permalink
Storage: Refactor for fixing recreate Storage instance (#8701)
Browse files Browse the repository at this point in the history
ref #8695
  • Loading branch information
JaySon-Huang authored Jan 18, 2024
1 parent a4df770 commit 5839a41
Show file tree
Hide file tree
Showing 36 changed files with 322 additions and 247 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
#include <Debug/dbgFuncMisc.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>

#include <fstream>
#include <regex>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/MarkCache.h>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/KVStore/BackgroundService.h>
#include <Storages/KVStore/TMTContext.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/FormatVersion.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/PathPool.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/PathPool.h>
#include <TestUtils/TiFlashStorageTestBasic.h>
#include <gtest/gtest.h>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
#include <Poco/Logger.h>
#include <Server/StorageConfigParser.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/KVStore/MultiRaft/RegionManager.h>
#include <Storages/KVStore/MultiRaft/RegionPersister.h>
#include <Storages/KVStore/Region.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_headers_and_sources(delta_merge ./ReadThread)
add_headers_and_sources(delta_merge ./Remote)
add_headers_and_sources(delta_merge ./Remote/DataStore)
add_headers_and_sources(delta_merge ./Decode)
add_headers_and_sources(delta_merge ./StoragePool)

add_library(delta_merge ${delta_merge_headers} ${delta_merge_sources})
target_link_libraries(delta_merge PRIVATE dbms page)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool_fwd.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
Expand Down Expand Up @@ -130,7 +130,8 @@ class ColumnFile
virtual ColumnFileReaderPtr getReader(
const DMContext & context,
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefinesPtr & col_defs) const = 0;
const ColumnDefinesPtr & col_defs) const
= 0;

/// Note: Only ColumnFileInMemory can be appendable. Other ColumnFiles (i.e. ColumnFilePersisted) have
/// been persisted in the disk and their data will be immutable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/Page/PageStorage.h>

namespace DB::DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/StoragePool_fwd.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/PageDefinesBase.h>

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Delta/DeltaValueSpace.h>
#include <Storages/DeltaMerge/RowKeyFilter.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>

namespace DB::DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/GCOptions.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/PathPool.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/Page/PageStorage.h>

namespace DB
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
#include <Storages/DeltaMerge/Segment_fwd.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <Storages/DeltaMerge/RowKeyFilter.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StableValueSpace.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
Expand Down
116 changes: 116 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool/GlobalStoragePool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePoolConfig.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/PathPool.h>

namespace DB::DM
{

GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & global_ctx, const Settings & settings)
: log_storage(PageStorage::create(
"__global__.log",
path_pool.getPSDiskDelegatorGlobalMulti(PathPool::log_path_prefix),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
global_ctx,
true))
, data_storage(PageStorage::create(
"__global__.data",
path_pool.getPSDiskDelegatorGlobalMulti(PathPool::data_path_prefix),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
global_ctx,
true))
, meta_storage(PageStorage::create(
"__global__.meta",
path_pool.getPSDiskDelegatorGlobalMulti(PathPool::meta_path_prefix),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
global_ctx,
true))
, global_context(global_ctx)
{}


GlobalStoragePool::~GlobalStoragePool()
{
shutdown();
}

void GlobalStoragePool::restore()
{
log_storage->restore();
data_storage->restore();
meta_storage->restore();

gc_handle = global_context.getBackgroundPool().addTask(
[this] { return this->gc(global_context.getSettingsRef()); },
false);
}

void GlobalStoragePool::shutdown()
{
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = {};
}
}

FileUsageStatistics GlobalStoragePool::getLogFileUsage() const
{
return log_storage->getFileUsageStatistics();
}

bool GlobalStoragePool::gc()
{
return gc(global_context.getSettingsRef(), /*immediately=*/true, DELTA_MERGE_GC_PERIOD);
}

bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period)
{
Timepoint now = Clock::now();
if (!immediately)
{
// No need lock
if (now < (last_try_gc_time.load() + try_gc_period))
return false;
}

last_try_gc_time = now;

bool done_anything = false;
auto write_limiter = global_context.getWriteLimiter();
auto read_limiter = global_context.getReadLimiter();
auto config = extractConfig(settings, StorageType::Meta);
meta_storage->reloadSettings(config);
done_anything |= meta_storage->gc(/*not_skip*/ false, write_limiter, read_limiter);

config = extractConfig(settings, StorageType::Data);
data_storage->reloadSettings(config);
done_anything |= data_storage->gc(/*not_skip*/ false, write_limiter, read_limiter);

config = extractConfig(settings, StorageType::Log);
log_storage->reloadSettings(config);
done_anything |= log_storage->gc(/*not_skip*/ false, write_limiter, read_limiter);

return done_anything;
}

} // namespace DB::DM
82 changes: 82 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool/GlobalStoragePool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/KVStore/Types.h>
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/PageStorage_fwd.h>

#include <atomic>
#include <chrono>

namespace DB
{
class WriteLimiter;
using WriteLimiterPtr = std::shared_ptr<WriteLimiter>;
class ReadLimiter;
using ReadLimiterPtr = std::shared_ptr<ReadLimiter>;

struct Settings;
class PathPool;
class StableDiskDelegator;
class AsynchronousMetrics;
} // namespace DB

namespace DB::DM
{

class GlobalStoragePool : private boost::noncopyable
{
public:
using Clock = std::chrono::system_clock;
using Timepoint = Clock::time_point;
using Seconds = std::chrono::seconds;

GlobalStoragePool(const PathPool & path_pool, Context & global_ctx, const Settings & settings);

~GlobalStoragePool();

void restore();

void shutdown();

friend class StoragePool;
friend class ::DB::AsynchronousMetrics;

// GC immediately
// Only used on dbgFuncMisc
bool gc();

FileUsageStatistics getLogFileUsage() const;

private:
bool gc(const Settings & settings, bool immediately = false, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD);

private:
PageStoragePtr log_storage;
PageStoragePtr data_storage;
PageStoragePtr meta_storage;

std::atomic<Timepoint> last_try_gc_time = Clock::now();

Context & global_context;
BackgroundProcessingPool::TaskHandle gc_handle;
};

} // namespace DB::DM
Loading

0 comments on commit 5839a41

Please sign in to comment.