Skip to content

Commit

Permalink
PageStorage: background version compact for v2 (#6446)
Browse files Browse the repository at this point in the history
close #6407
  • Loading branch information
JaySon-Huang authored Dec 8, 2022
1 parent 1a69ba8 commit f248fac
Show file tree
Hide file tree
Showing 26 changed files with 604 additions and 191 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
M(MemoryCapacity) \
M(PSMVCCNumSnapshots) \
M(PSMVCCSnapshotsList) \
M(PSMVCCNumDelta) \
M(PSMVCCNumBase) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
M(PSMVCCCompactOnDelta) \
M(PSMVCCCompactOnDeltaRebaseRejected) \
M(PSMVCCCompactOnBase) \
M(PSMVCCCompactOnBaseCommit) \
\
M(DMWriteBlock) \
M(DMWriteBlockNS) \
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace DB
F(type_seg_split_bg, {"type", "seg_split_bg"}), \
F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \
Expand Down Expand Up @@ -164,7 +164,8 @@ namespace DB
F(type_v3_bs_full_gc, {"type", "v3_bs_full_gc"})) \
M(tiflash_storage_page_gc_duration_seconds, "Bucketed histogram of page's gc task duration", Histogram, \
F(type_v2, {{"type", "v2"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v2_compact, {{"type", "v2_compact"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v2_data_compact, {{"type", "v2_data_compact"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v2_ver_compact, {{"type", "v2_ver_compact"}}, ExpBuckets{0.0005, 2, 20}), \
/* Below are metrics for PageStorage V3 */ \
F(type_compact_wal, {{"type", "compact_wal"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_compact_directory, {{"type", "compact_directory"}}, ExpBuckets{0.0005, 2, 20}), \
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
BackgroundProcessingPoolPtr blockable_background_pool; /// The thread pool for the blockable background work performed by the tables.
BackgroundProcessingPoolPtr ps_compact_background_pool; /// The thread pool for the background work performed by the ps v2.
mutable TMTContextPtr tmt_context; /// Context of TiFlash. Note that this should be free before background_pool.
MultiVersion<Macros> macros; /// Substitutions extracted from config.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
Expand Down Expand Up @@ -1374,6 +1375,15 @@ BackgroundProcessingPool & Context::getBlockableBackgroundPool()
return *shared->blockable_background_pool;
}

BackgroundProcessingPool & Context::getPSBackgroundPool()
{
auto lock = getLock();
// use the same size as `background_pool_size`
if (!shared->ps_compact_background_pool)
shared->ps_compact_background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size, "bg-page-");
return *shared->ps_compact_background_pool;
}

void Context::createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config)
{
auto lock = getLock();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class Context
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBlockableBackgroundPool();
BackgroundProcessingPool & getPSBackgroundPool();

void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config);

Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,13 @@ bool DeltaValueSpace::compact(DMContext & context)
log_storage_snap = context.storage_pool.logReader()->getSnapshot(/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo()));
}

// do compaction task
WriteBatches wbs(context.storage_pool, context.getWriteLimiter());
const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap);
compaction_task->prepare(context, wbs, reader);
{
// do compaction task
const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap);
compaction_task->prepare(context, wbs, reader);
log_storage_snap.reset(); // release the snapshot ASAP
}

{
std::scoped_lock lock(mutex);
Expand Down
26 changes: 21 additions & 5 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/V2/PageStorage.h>
#include <common/defines.h>
#include <fmt/format.h>


Expand Down Expand Up @@ -90,16 +91,19 @@ GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & globa
path_pool.getPSDiskDelegatorGlobalMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
global_ctx,
true))
, data_storage(PageStorage::create("__global__.data",
path_pool.getPSDiskDelegatorGlobalMulti("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
global_ctx,
true))
, meta_storage(PageStorage::create("__global__.meta",
path_pool.getPSDiskDelegatorGlobalMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
global_ctx,
true))
, global_context(global_ctx)
{
Expand Down Expand Up @@ -184,15 +188,18 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo
log_storage_v2 = PageStorage::create(name + ".log",
storage_path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(global_context.getSettingsRef(), StorageType::Log),
global_context.getFileProvider());
global_context.getFileProvider(),
global_context);
data_storage_v2 = PageStorage::create(name + ".data",
storage_path_pool.getPSDiskDelegatorSingle("data"), // keep for behavior not changed
extractConfig(global_context.getSettingsRef(), StorageType::Data),
global_ctx.getFileProvider());
global_context.getFileProvider(),
global_context);
meta_storage_v2 = PageStorage::create(name + ".meta",
storage_path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(global_context.getSettingsRef(), StorageType::Meta),
global_ctx.getFileProvider());
global_context.getFileProvider(),
global_context);
log_storage_reader = std::make_shared<PageReader>(run_mode, ns_id, log_storage_v2, /*storage_v3_*/ nullptr, nullptr);
data_storage_reader = std::make_shared<PageReader>(run_mode, ns_id, data_storage_v2, /*storage_v3_*/ nullptr, nullptr);
meta_storage_reader = std::make_shared<PageReader>(run_mode, ns_id, meta_storage_v2, /*storage_v3_*/ nullptr, nullptr);
Expand Down Expand Up @@ -246,18 +253,21 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo
storage_path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(global_context.getSettingsRef(), StorageType::Log),
global_context.getFileProvider(),
global_context,
/* use_v3 */ false,
/* no_more_write_to_v2 */ true);
data_storage_v2 = PageStorage::create(name + ".data",
storage_path_pool.getPSDiskDelegatorMulti("data"),
extractConfig(global_context.getSettingsRef(), StorageType::Data),
global_ctx.getFileProvider(),
global_context.getFileProvider(),
global_context,
/* use_v3 */ false,
/* no_more_write_to_v2 */ true);
meta_storage_v2 = PageStorage::create(name + ".meta",
storage_path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(global_context.getSettingsRef(), StorageType::Meta),
global_ctx.getFileProvider(),
global_context.getFileProvider(),
global_context,
/* use_v3 */ false,
/* no_more_write_to_v2 */ true);
}
Expand Down Expand Up @@ -626,6 +636,12 @@ void StoragePool::shutdown()
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}
if (run_mode != PageStorageRunMode::ONLY_V3)
{
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
}
}

void StoragePool::drop()
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/V3/PageStorageImpl.h>
Expand All @@ -23,13 +24,14 @@ PageStoragePtr PageStorage::create(
PSDiskDelegatorPtr delegator,
const PageStorageConfig & config,
const FileProviderPtr & file_provider,
Context & global_ctx,
bool use_v3,
bool no_more_insert_to_v2)
{
if (use_v3)
return std::make_shared<PS::V3::PageStorageImpl>(name, delegator, config, file_provider);
else
return std::make_shared<PS::V2::PageStorage>(name, delegator, config, file_provider, no_more_insert_to_v2);
return std::make_shared<PS::V2::PageStorage>(name, delegator, config, file_provider, global_ctx.getPSBackgroundPool(), no_more_insert_to_v2);
}

/***************************
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class PageStorage : private boost::noncopyable
PSDiskDelegatorPtr delegator,
const PageStorageConfig & config,
const FileProviderPtr & file_provider,
Context & global_ctx,
bool use_v3 = false,
bool no_more_insert_to_v2 = false);

Expand Down Expand Up @@ -198,6 +199,8 @@ class PageStorage : private boost::noncopyable
return gcImpl(not_skip, write_limiter, read_limiter);
}

virtual void shutdown() {}

// Register and unregister external pages GC callbacks
// Note that user must ensure that it is safe to call `scanner` and `remover` even after unregister.
virtual void registerExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) = 0;
Expand Down
34 changes: 29 additions & 5 deletions dbms/src/Storages/Page/V2/PageEntries.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/CurrentMetrics.h>
#include <Common/nocopyable.h>
#include <IO/WriteHelpers.h>
#include <Storages/Page/Config.h>
Expand All @@ -31,6 +32,11 @@
#include <unordered_set>


namespace CurrentMetrics
{
extern const int PSMVCCNumDelta;
extern const int PSMVCCNumBase;
} // namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -118,12 +124,30 @@ class PageEntriesMixin
{
public:
explicit PageEntriesMixin(bool is_base_)
: normal_pages()
, page_ref()
, ref_deletions()
, max_page_id(0)
: max_page_id(0)
, is_base(is_base_)
{}
{
if (is_base)
{
CurrentMetrics::add(CurrentMetrics::PSMVCCNumBase);
}
else
{
CurrentMetrics::add(CurrentMetrics::PSMVCCNumDelta);
}
}

virtual ~PageEntriesMixin()
{
if (is_base)
{
CurrentMetrics::sub(CurrentMetrics::PSMVCCNumBase);
}
else
{
CurrentMetrics::sub(CurrentMetrics::PSMVCCNumDelta);
}
}

public:
static std::shared_ptr<T> createBase() { return std::make_shared<T>(true); }
Expand Down
37 changes: 32 additions & 5 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,14 @@ PageStorage::PageStorage(String name,
PSDiskDelegatorPtr delegator_, //
const PageStorageConfig & config_,
const FileProviderPtr & file_provider_,
BackgroundProcessingPool & ver_compact_pool_,
bool no_more_insert_)
: DB::PageStorage(name, delegator_, config_, file_provider_)
, write_files(std::max(1UL, config_.num_write_slots.get()))
, page_file_log(&Poco::Logger::get("PageFile"))
, log(&Poco::Logger::get("PageStorage"))
, versioned_page_entries(storage_name, config.version_set_config, log)
, ver_compact_pool(ver_compact_pool_)
, no_more_insert(no_more_insert_)
{
// at least 1 write slots
Expand All @@ -184,6 +186,10 @@ PageStorage::PageStorage(String name,
config.num_write_slots = num_paths * 2;
}
write_files.resize(config.num_write_slots);

// If there is no snapshot released, check with default interval (10s) and exit quickly
// If snapshot released, wakeup this handle to compact the version list
ver_compact_handle = ver_compact_pool.addTask([this] { return compactInMemVersions(); }, /*multi*/ false);
}


Expand Down Expand Up @@ -361,7 +367,7 @@ void PageStorage::restore()
PageId PageStorage::getMaxId()
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
return versioned_page_entries.getSnapshot("", ver_compact_handle)->version()->maxId();
}

PageId PageStorage::getNormalPageIdImpl(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist)
Expand Down Expand Up @@ -588,13 +594,13 @@ void PageStorage::writeImpl(DB::WriteBatch && wb, const WriteLimiterPtr & write_

DB::PageStorage::SnapshotPtr PageStorage::getSnapshot(const String & tracing_id)
{
return versioned_page_entries.getSnapshot(tracing_id);
return versioned_page_entries.getSnapshot(tracing_id, ver_compact_handle);
}

PageStorage::VersionedPageEntries::SnapshotPtr
PageStorage::getConcreteSnapshot()
{
return versioned_page_entries.getSnapshot(/*tracing_id*/ "");
return versioned_page_entries.getSnapshot(/*tracing_id*/ "", ver_compact_handle);
}

SnapshotsStatistics PageStorage::getSnapshotsStat() const
Expand Down Expand Up @@ -942,6 +948,28 @@ WriteBatch::SequenceID PageStorage::WritingFilesSnapshot::minPersistedSequence()
return seq;
}

void PageStorage::shutdown()
{
if (ver_compact_handle)
{
ver_compact_pool.removeTask(ver_compact_handle);
ver_compact_handle = nullptr;
}
}

bool PageStorage::compactInMemVersions()
{
Stopwatch watch;
// try compact the in-mem version list
bool done_anything = versioned_page_entries.tryCompact();
if (done_anything)
{
auto elapsed_sec = watch.elapsedSeconds();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_v2_ver_compact).Observe(elapsed_sec);
}
return done_anything;
}

bool PageStorage::gcImpl(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter)
{
// If another thread is running gc, just return;
Expand All @@ -954,7 +982,6 @@ bool PageStorage::gcImpl(bool not_skip, const WriteLimiterPtr & write_limiter, c
gc_is_running.compare_exchange_strong(is_running, false);
});


/// Get all pending external pages and PageFiles. Note that we should get external pages before PageFiles.
ExternalPageCallbacks::PathAndIdsVec external_pages;
if (external_pages_scanner)
Expand Down Expand Up @@ -1185,7 +1212,7 @@ bool PageStorage::gcImpl(bool not_skip, const WriteLimiterPtr & write_limiter, c
// We only care about those time cost in actually doing compaction on page data.
if (gc_context.compact_result.do_compaction)
{
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_v2_compact).Observe(watch_migrate.elapsedSeconds());
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_v2_data_compact).Observe(watch_migrate.elapsedSeconds());
}
}

Expand Down
Loading

0 comments on commit f248fac

Please sign in to comment.