Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: background version compact for v2 (#6446) #6465

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
M(MemoryCapacity) \
M(PSMVCCNumSnapshots) \
M(PSMVCCSnapshotsList) \
M(PSMVCCNumDelta) \
M(PSMVCCNumBase) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
M(PSMVCCCompactOnDelta) \
M(PSMVCCCompactOnDeltaRebaseRejected) \
M(PSMVCCCompactOnBase) \
M(PSMVCCCompactOnBaseCommit) \
\
M(DMWriteBlock) \
M(DMWriteBlockNS) \
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace DB
F(type_seg_split_bg, {"type", "seg_split_bg"}), \
F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \
Expand Down Expand Up @@ -164,7 +164,8 @@ namespace DB
F(type_v3_bs_full_gc, {"type", "v3_bs_full_gc"})) \
M(tiflash_storage_page_gc_duration_seconds, "Bucketed histogram of page's gc task duration", Histogram, \
F(type_v2, {{"type", "v2"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v2_compact, {{"type", "v2_compact"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v2_data_compact, {{"type", "v2_data_compact"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v2_ver_compact, {{"type", "v2_ver_compact"}}, ExpBuckets{0.0005, 2, 20}), \
/* Below are metrics for PageStorage V3 */ \
F(type_compact_wal, {{"type", "compact_wal"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_compact_directory, {{"type", "compact_directory"}}, ExpBuckets{0.0005, 2, 20}), \
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
BackgroundProcessingPoolPtr blockable_background_pool; /// The thread pool for the blockable background work performed by the tables.
BackgroundProcessingPoolPtr ps_compact_background_pool; /// The thread pool for the background work performed by the ps v2.
mutable TMTContextPtr tmt_context; /// Context of TiFlash. Note that this should be free before background_pool.
MultiVersion<Macros> macros; /// Substitutions extracted from config.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
Expand Down Expand Up @@ -1374,6 +1375,15 @@ BackgroundProcessingPool & Context::getBlockableBackgroundPool()
return *shared->blockable_background_pool;
}

BackgroundProcessingPool & Context::getPSBackgroundPool()
{
auto lock = getLock();
// use the same size as `background_pool_size`
if (!shared->ps_compact_background_pool)
shared->ps_compact_background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size, "bg-page-");
return *shared->ps_compact_background_pool;
}

void Context::createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config)
{
auto lock = getLock();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class Context
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBlockableBackgroundPool();
BackgroundProcessingPool & getPSBackgroundPool();

void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config);

Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,13 @@ bool DeltaValueSpace::compact(DMContext & context)
log_storage_snap = context.storage_pool.logReader()->getSnapshot(/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo()));
}

// do compaction task
WriteBatches wbs(context.storage_pool, context.getWriteLimiter());
const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap);
compaction_task->prepare(context, wbs, reader);
{
// do compaction task
const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap);
compaction_task->prepare(context, wbs, reader);
log_storage_snap.reset(); // release the snapshot ASAP
}

{
std::scoped_lock lock(mutex);
Expand Down
26 changes: 21 additions & 5 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/V2/PageStorage.h>
#include <common/defines.h>
#include <fmt/format.h>


Expand Down Expand Up @@ -90,16 +91,19 @@ GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & globa
path_pool.getPSDiskDelegatorGlobalMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
global_ctx,
true))
, data_storage(PageStorage::create("__global__.data",
path_pool.getPSDiskDelegatorGlobalMulti("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
global_ctx,
true))
, meta_storage(PageStorage::create("__global__.meta",
path_pool.getPSDiskDelegatorGlobalMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
global_ctx,
true))
, global_context(global_ctx)
{
Expand Down Expand Up @@ -184,15 +188,18 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo
log_storage_v2 = PageStorage::create(name + ".log",
storage_path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(global_context.getSettingsRef(), StorageType::Log),
global_context.getFileProvider());
global_context.getFileProvider(),
global_context);
data_storage_v2 = PageStorage::create(name + ".data",
storage_path_pool.getPSDiskDelegatorSingle("data"), // keep for behavior not changed
extractConfig(global_context.getSettingsRef(), StorageType::Data),
global_ctx.getFileProvider());
global_context.getFileProvider(),
global_context);
meta_storage_v2 = PageStorage::create(name + ".meta",
storage_path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(global_context.getSettingsRef(), StorageType::Meta),
global_ctx.getFileProvider());
global_context.getFileProvider(),
global_context);
log_storage_reader = std::make_shared<PageReader>(run_mode, ns_id, log_storage_v2, /*storage_v3_*/ nullptr, nullptr);
data_storage_reader = std::make_shared<PageReader>(run_mode, ns_id, data_storage_v2, /*storage_v3_*/ nullptr, nullptr);
meta_storage_reader = std::make_shared<PageReader>(run_mode, ns_id, meta_storage_v2, /*storage_v3_*/ nullptr, nullptr);
Expand Down Expand Up @@ -246,18 +253,21 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo
storage_path_pool.getPSDiskDelegatorMulti("log"),
extractConfig(global_context.getSettingsRef(), StorageType::Log),
global_context.getFileProvider(),
global_context,
/* use_v3 */ false,
/* no_more_write_to_v2 */ true);
data_storage_v2 = PageStorage::create(name + ".data",
storage_path_pool.getPSDiskDelegatorMulti("data"),
extractConfig(global_context.getSettingsRef(), StorageType::Data),
global_ctx.getFileProvider(),
global_context.getFileProvider(),
global_context,
/* use_v3 */ false,
/* no_more_write_to_v2 */ true);
meta_storage_v2 = PageStorage::create(name + ".meta",
storage_path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(global_context.getSettingsRef(), StorageType::Meta),
global_ctx.getFileProvider(),
global_context.getFileProvider(),
global_context,
/* use_v3 */ false,
/* no_more_write_to_v2 */ true);
}
Expand Down Expand Up @@ -626,6 +636,12 @@ void StoragePool::shutdown()
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}
if (run_mode != PageStorageRunMode::ONLY_V3)
{
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
}
}

void StoragePool::drop()
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/V3/PageStorageImpl.h>
Expand All @@ -23,13 +24,14 @@ PageStoragePtr PageStorage::create(
PSDiskDelegatorPtr delegator,
const PageStorageConfig & config,
const FileProviderPtr & file_provider,
Context & global_ctx,
bool use_v3,
bool no_more_insert_to_v2)
{
if (use_v3)
return std::make_shared<PS::V3::PageStorageImpl>(name, delegator, config, file_provider);
else
return std::make_shared<PS::V2::PageStorage>(name, delegator, config, file_provider, no_more_insert_to_v2);
return std::make_shared<PS::V2::PageStorage>(name, delegator, config, file_provider, global_ctx.getPSBackgroundPool(), no_more_insert_to_v2);
}

/***************************
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class PageStorage : private boost::noncopyable
PSDiskDelegatorPtr delegator,
const PageStorageConfig & config,
const FileProviderPtr & file_provider,
Context & global_ctx,
bool use_v3 = false,
bool no_more_insert_to_v2 = false);

Expand Down Expand Up @@ -198,6 +199,8 @@ class PageStorage : private boost::noncopyable
return gcImpl(not_skip, write_limiter, read_limiter);
}

virtual void shutdown() {}

// Register and unregister external pages GC callbacks
// Note that user must ensure that it is safe to call `scanner` and `remover` even after unregister.
virtual void registerExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) = 0;
Expand Down
34 changes: 29 additions & 5 deletions dbms/src/Storages/Page/V2/PageEntries.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/CurrentMetrics.h>
#include <Common/nocopyable.h>
#include <IO/WriteHelpers.h>
#include <Storages/Page/Config.h>
Expand All @@ -31,6 +32,11 @@
#include <unordered_set>


namespace CurrentMetrics
{
extern const int PSMVCCNumDelta;
extern const int PSMVCCNumBase;
} // namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -118,12 +124,30 @@ class PageEntriesMixin
{
public:
explicit PageEntriesMixin(bool is_base_)
: normal_pages()
, page_ref()
, ref_deletions()
, max_page_id(0)
: max_page_id(0)
, is_base(is_base_)
{}
{
if (is_base)
{
CurrentMetrics::add(CurrentMetrics::PSMVCCNumBase);
}
else
{
CurrentMetrics::add(CurrentMetrics::PSMVCCNumDelta);
}
}

virtual ~PageEntriesMixin()
{
if (is_base)
{
CurrentMetrics::sub(CurrentMetrics::PSMVCCNumBase);
}
else
{
CurrentMetrics::sub(CurrentMetrics::PSMVCCNumDelta);
}
}

public:
static std::shared_ptr<T> createBase() { return std::make_shared<T>(true); }
Expand Down
37 changes: 32 additions & 5 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,14 @@ PageStorage::PageStorage(String name,
PSDiskDelegatorPtr delegator_, //
const PageStorageConfig & config_,
const FileProviderPtr & file_provider_,
BackgroundProcessingPool & ver_compact_pool_,
bool no_more_insert_)
: DB::PageStorage(name, delegator_, config_, file_provider_)
, write_files(std::max(1UL, config_.num_write_slots.get()))
, page_file_log(&Poco::Logger::get("PageFile"))
, log(&Poco::Logger::get("PageStorage"))
, versioned_page_entries(storage_name, config.version_set_config, log)
, ver_compact_pool(ver_compact_pool_)
, no_more_insert(no_more_insert_)
{
// at least 1 write slots
Expand All @@ -184,6 +186,10 @@ PageStorage::PageStorage(String name,
config.num_write_slots = num_paths * 2;
}
write_files.resize(config.num_write_slots);

// If there is no snapshot released, check with default interval (10s) and exit quickly
// If snapshot released, wakeup this handle to compact the version list
ver_compact_handle = ver_compact_pool.addTask([this] { return compactInMemVersions(); }, /*multi*/ false);
}


Expand Down Expand Up @@ -361,7 +367,7 @@ void PageStorage::restore()
PageId PageStorage::getMaxId()
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
return versioned_page_entries.getSnapshot("", ver_compact_handle)->version()->maxId();
}

PageId PageStorage::getNormalPageIdImpl(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist)
Expand Down Expand Up @@ -588,13 +594,13 @@ void PageStorage::writeImpl(DB::WriteBatch && wb, const WriteLimiterPtr & write_

DB::PageStorage::SnapshotPtr PageStorage::getSnapshot(const String & tracing_id)
{
return versioned_page_entries.getSnapshot(tracing_id);
return versioned_page_entries.getSnapshot(tracing_id, ver_compact_handle);
}

PageStorage::VersionedPageEntries::SnapshotPtr
PageStorage::getConcreteSnapshot()
{
return versioned_page_entries.getSnapshot(/*tracing_id*/ "");
return versioned_page_entries.getSnapshot(/*tracing_id*/ "", ver_compact_handle);
}

SnapshotsStatistics PageStorage::getSnapshotsStat() const
Expand Down Expand Up @@ -942,6 +948,28 @@ WriteBatch::SequenceID PageStorage::WritingFilesSnapshot::minPersistedSequence()
return seq;
}

void PageStorage::shutdown()
{
if (ver_compact_handle)
{
ver_compact_pool.removeTask(ver_compact_handle);
ver_compact_handle = nullptr;
}
}

bool PageStorage::compactInMemVersions()
{
Stopwatch watch;
// try compact the in-mem version list
bool done_anything = versioned_page_entries.tryCompact();
if (done_anything)
{
auto elapsed_sec = watch.elapsedSeconds();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_v2_ver_compact).Observe(elapsed_sec);
}
return done_anything;
}

bool PageStorage::gcImpl(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter)
{
// If another thread is running gc, just return;
Expand All @@ -954,7 +982,6 @@ bool PageStorage::gcImpl(bool not_skip, const WriteLimiterPtr & write_limiter, c
gc_is_running.compare_exchange_strong(is_running, false);
});


/// Get all pending external pages and PageFiles. Note that we should get external pages before PageFiles.
ExternalPageCallbacks::PathAndIdsVec external_pages;
if (external_pages_scanner)
Expand Down Expand Up @@ -1185,7 +1212,7 @@ bool PageStorage::gcImpl(bool not_skip, const WriteLimiterPtr & write_limiter, c
// We only care about those time cost in actually doing compaction on page data.
if (gc_context.compact_result.do_compaction)
{
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_v2_compact).Observe(watch_migrate.elapsedSeconds());
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_v2_data_compact).Observe(watch_migrate.elapsedSeconds());
}
}

Expand Down
Loading