Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve ps gc speed by dumping snapshot from memory directly #7668

Merged
merged 15 commits into from
Jun 30, 2023
4 changes: 2 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ namespace DB
F(type_clean_external, {{"type", "clean_external"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_storage_page_command_count, "Total number of PageStorage's command, such as write / read / scan / snapshot", Counter, \
F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), \
F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \
F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), F(type_read_page_dir, {"type", "read_page_dir"}), \
F(type_read_blob, {"type", "read_blob"}), F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \
M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \
F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \
M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,6 @@ template <typename Trait>
typename BlobStore<Trait>::PageMap
BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
if (entries.empty())
{
return {};
Expand Down Expand Up @@ -986,7 +985,6 @@ BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_l
template <typename Trait>
Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
const auto & [page_id_v3, entry] = id_entry;
const size_t buf_size = entry.size;

Expand Down Expand Up @@ -1046,7 +1044,7 @@ Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPt
template <typename Trait>
BlobFilePtr BlobStore<Trait>::read(const typename BlobStore<Trait>::PageId & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
GET_METRIC(tiflash_storage_page_command_count, type_read_blob).Increment();
assert(buffers != nullptr);
BlobFilePtr blob_file = getBlobFile(blob_id);
try
Expand Down
17 changes: 9 additions & 8 deletions dbms/src/Storages/Page/V3/GCDefines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,6 @@ GCTimeStatistics ExternalPageCallbacksManager<Trait>::doGC(

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
statistics.compact_wal_happen = page_directory.tryDumpSnapshot(read_limiter, write_limiter, force_wal_compact);
if (statistics.compact_wal_happen)
{
GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment();
}
statistics.compact_wal_ms = gc_watch.elapsedMillisecondsFromLastTime();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_wal).Observe(statistics.compact_wal_ms / 1000.0);

typename Trait::PageDirectory::InMemGCOption options;
if constexpr (std::is_same_v<Trait, universal::ExternalPageCallbacksManagerTrait>)
{
Expand All @@ -254,6 +246,15 @@ GCTimeStatistics ExternalPageCallbacksManager<Trait>::doGC(
statistics.compact_directory_ms = gc_watch.elapsedMillisecondsFromLastTime();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_directory).Observe(statistics.compact_directory_ms / 1000.0);

// Compact WAL after in-memory GC in PageDirectory in order to reduce the overhead of dumping useless entries
statistics.compact_wal_happen = page_directory.tryDumpSnapshot(write_limiter, force_wal_compact);
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
if (statistics.compact_wal_happen)
{
GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment();
}
statistics.compact_wal_ms = gc_watch.elapsedMillisecondsFromLastTime();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_wal).Observe(statistics.compact_wal_ms / 1000.0);

SYNC_FOR("before_PageStorageImpl::doGC_fullGC_prepare");

// 2. Remove the expired entries in BlobStore.
Expand Down
105 changes: 39 additions & 66 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void VersionedPageEntries<Trait>::createNewEntry(const PageVersion & ver, const
assert(last_iter->second.isEntry());
// It is ok to replace the entry with same sequence and newer epoch, but not valid
// to replace the entry with newer sequence.
if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence))
if (unlikely(last_iter->second.being_ref_count.getLatestRefCount() != 1 && last_iter->first.sequence < ver.sequence))
{
throw Exception(
fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]",
Expand Down Expand Up @@ -151,7 +151,7 @@ typename VersionedPageEntries<Trait>::PageId VersionedPageEntries<Trait>::create
assert(last_iter->second.isEntry());
// It is ok to replace the entry with same sequence and newer epoch, but not valid
// to replace the entry with newer sequence.
if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence))
if (unlikely(last_iter->second.being_ref_count.getLatestRefCount() != 1 && last_iter->first.sequence < ver.sequence))
{
throw Exception(
fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]",
Expand Down Expand Up @@ -218,7 +218,6 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
is_deleted = false;
create_ver = ver;
delete_ver = PageVersion(0);
being_ref_count = 1;
RUNTIME_CHECK(entries.empty());
entries.emplace(create_ver, EntryOrDelete::newNormalEntry(entry));
// return the new created holder to caller to set the page_id
Expand All @@ -236,7 +235,6 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
is_deleted = false;
create_ver = ver;
delete_ver = PageVersion(0);
being_ref_count = 1;
entries.emplace(create_ver, EntryOrDelete::newNormalEntry(entry));
// return the new created holder to caller to set the page_id
external_holder = std::make_shared<typename Trait::PageId>();
Expand Down Expand Up @@ -402,15 +400,15 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
type = EditRecordType::VAR_EXTERNAL;
is_deleted = false;
create_ver = rec.version;
being_ref_count = rec.being_ref_count;
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.being_ref_count));
being_ref_count.restoreFrom(rec.version, rec.being_ref_count);
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.version, 1 /* meaningless */));
external_holder = std::make_shared<typename Trait::PageId>(rec.page_id);
return external_holder;
}
case EditRecordType::VAR_ENTRY:
{
type = EditRecordType::VAR_ENTRY;
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.being_ref_count));
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.version, rec.being_ref_count));
return nullptr;
}
default:
Expand Down Expand Up @@ -613,12 +611,12 @@ bool VersionedPageEntries<Trait>::isVisible(UInt64 seq) const
}

template <typename Trait>
Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & ver)
Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & target_ver, const PageVersion & ref_ver)
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
if (auto iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1));
if (auto iter = MapUtils::findMutLess(entries, PageVersion(target_ver.sequence + 1));
iter != entries.end())
{
// ignore all "delete"
Expand All @@ -631,23 +629,27 @@ Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & ver)
// Then `iter` point to an entry or the `entries.begin()`, return if entry found
if (iter->second.isEntry())
{
if (unlikely(met_delete && iter->second.being_ref_count == 1))
auto ref_count_value = iter->second.being_ref_count.getLatestRefCount();
if (unlikely(met_delete && ref_count_value == 1))
{
throw Exception(fmt::format("Try to add ref to a completely deleted entry [entry={}] [ver={}]", iter->second, ver), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Try to add ref to a completely deleted entry [entry={}] [ver={}]", iter->second, target_ver), ErrorCodes::LOGICAL_ERROR);
}
return ++iter->second.being_ref_count;
iter->second.being_ref_count.incrRefCount(ref_ver, 1);
return ref_count_value + 1;
}
} // fallthrough to FAIL
}
else if (type == EditRecordType::VAR_EXTERNAL)
{
if (create_ver <= ver)
if (create_ver <= target_ver)
{
// We may add reference to an external id even if it is logically deleted.
return ++being_ref_count;
auto ref_count_value = being_ref_count.getLatestRefCount();
being_ref_count.incrRefCount(ref_ver, 1);
return ref_count_value + 1;
}
}
throw Exception(fmt::format("The entry to be added ref count is not found [ver={}] [state={}]", ver, toDebugString()), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("The entry to be added ref count is not found [ver={}] [state={}]", target_ver, toDebugString()), ErrorCodes::LOGICAL_ERROR);
}

template <typename Trait>
Expand Down Expand Up @@ -708,7 +710,7 @@ bool VersionedPageEntries<Trait>::cleanOutdatedEntries(
{
if (type == EditRecordType::VAR_EXTERNAL)
{
return (being_ref_count == 1 && is_deleted && delete_ver.sequence <= lowest_seq);
return (being_ref_count.getLatestRefCount() == 1 && is_deleted && delete_ver.sequence <= lowest_seq);
}
else if (type == EditRecordType::VAR_REF)
{
Expand Down Expand Up @@ -786,7 +788,7 @@ bool VersionedPageEntries<Trait>::cleanOutdatedEntries(
{
if (last_entry_is_delete)
{
if (iter->second.being_ref_count == 1)
if (iter->second.being_ref_count.getLatestRefCount() == 1)
{
if (entries_removed)
{
Expand Down Expand Up @@ -828,12 +830,8 @@ bool VersionedPageEntries<Trait>::derefAndClean(
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_EXTERNAL)
{
if (being_ref_count <= deref_count)
{
throw Exception(fmt::format("Decreasing ref count error [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count));
}
being_ref_count -= deref_count;
return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count == 1);
being_ref_count.decrRefCountInSnap(lowest_seq, deref_count);
return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count.getLatestRefCount() == 1);
}
else if (type == EditRecordType::VAR_ENTRY)
{
Expand All @@ -856,11 +854,7 @@ bool VersionedPageEntries<Trait>::derefAndClean(
throw Exception(fmt::format("Can not find entry for decreasing ref count till the begin [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count));
}
assert(iter->second.isEntry());
if (iter->second.being_ref_count <= deref_count)
{
throw Exception(fmt::format("Decreasing ref count error [page_id={}] [ver={}] [deref_count={}] [entry={}]", page_id, deref_ver, deref_count, iter->second));
}
iter->second.being_ref_count -= deref_count;
iter->second.being_ref_count.decrRefCountInSnap(lowest_seq, deref_count);

if (lowest_seq == 0)
return false;
Expand Down Expand Up @@ -897,7 +891,7 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
return;
auto iter = entries.find(create_ver);
RUNTIME_CHECK(iter != entries.end());
edit.varExternal(page_id, create_ver, iter->second.entry, being_ref_count);
edit.varExternal(page_id, create_ver, iter->second.entry, being_ref_count.getRefCountInSnap(seq));
if (is_deleted && delete_ver.sequence <= seq)
{
edit.varDel(page_id, delete_ver);
Expand All @@ -915,7 +909,7 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
if (last_iter->second.isEntry())
{
const auto & entry = last_iter->second;
edit.varEntry(page_id, /*ver*/ last_iter->first, entry.entry, entry.being_ref_count);
edit.varEntry(page_id, /*ver*/ last_iter->first, entry.entry, entry.being_ref_count.getRefCountInSnap(seq));
return;
}
else if (last_iter->second.isDelete())
Expand All @@ -929,11 +923,12 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
auto prev_iter = --last_iter; // Note that `last_iter` should not be used anymore
if (prev_iter->second.isEntry())
{
if (prev_iter->second.being_ref_count == 1)
auto ref_count_value = prev_iter->second.being_ref_count.getRefCountInSnap(seq);
if (ref_count_value == 1)
return;
// It is being ref by another id, should persist the item and delete
const auto & entry = prev_iter->second;
edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count);
edit.varEntry(page_id, prev_iter->first, entry.entry, ref_count_value);
edit.varDel(page_id, last_version);
}
}
Expand Down Expand Up @@ -1020,7 +1015,7 @@ SnapshotsStatistics PageDirectory<Trait>::getSnapshotsStat() const
template <typename Trait>
typename PageDirectory<Trait>::PageIdAndEntry PageDirectory<Trait>::getByIDImpl(const PageId & page_id, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
GET_METRIC(tiflash_storage_page_command_count, type_read_page_dir).Increment();
PageEntryV3 entry_got;

// After two write batches applied: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10}, the `mvcc_table_directory` is:
Expand Down Expand Up @@ -1115,7 +1110,7 @@ template <typename Trait>
std::pair<typename PageDirectory<Trait>::PageIdAndEntries, typename PageDirectory<Trait>::PageIds>
PageDirectory<Trait>::getByIDsImpl(const typename PageDirectory<Trait>::PageIds & page_ids, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
GET_METRIC(tiflash_storage_page_command_count, type_read_page_dir).Increment();
PageEntryV3 entry_got;
PageIds page_not_found = {};

Expand Down Expand Up @@ -1441,7 +1436,7 @@ void PageDirectory<Trait>::applyRefEditRecord(
// Add the ref-count of being-ref entry
if (auto resolved_iter = mvcc_table_directory.find(resolved_id); resolved_iter != mvcc_table_directory.end())
{
resolved_iter->second->incrRefCount(resolved_ver);
resolved_iter->second->incrRefCount(resolved_ver, version);
}
else
{
Expand Down Expand Up @@ -1818,7 +1813,7 @@ PageDirectory<Trait>::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_i
}

template <typename Trait>
bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter, bool force)
bool PageDirectory<Trait>::tryDumpSnapshot(const WriteLimiterPtr & write_limiter, bool force)
{
// Only apply compact logs when files snapshot is valid
auto files_snap = wal->tryGetFilesSnapshot(max_persisted_log_files, force);
Expand All @@ -1834,43 +1829,19 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter,
auto identifier = fmt::format("{}.dump_{}", wal->name(), log_num);

Stopwatch watch;
auto snapshot_reader = wal->createReaderForFiles(identifier, files_snap.persisted_log_files, read_limiter);
// we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
// persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
auto collapsed_dir = [&]() {
// we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
// persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
static_assert(std::is_same_v<Trait, u128::PageDirectoryTrait> || std::is_same_v<Trait, universal::PageDirectoryTrait>,
"unknown impl");
if constexpr (std::is_same_v<Trait, u128::PageDirectoryTrait>)
{
u128::PageDirectoryFactory factory;
return factory.createFromReader(
identifier,
std::move(snapshot_reader),
/* wal */ nullptr);
}
else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
universal::PageDirectoryFactory factory;
return factory.createFromReader(
identifier,
std::move(snapshot_reader),
/* wal */ nullptr);
}
}();
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
files_snap.num_records = edit_from_disk.size();
files_snap.read_elapsed_ms = watch.elapsedMilliseconds();
auto snap = createSnapshot(identifier);
auto edit = dumpSnapshotToEdit(snap);
files_snap.num_records = edit.size();
files_snap.dump_elapsed_ms = watch.elapsedMilliseconds();
if constexpr (std::is_same_v<Trait, u128::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit_from_disk), write_limiter);
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit), write_limiter);
return done_any_io;
}
else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit_from_disk), write_limiter);
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit), write_limiter);
return done_any_io;
}
}
Expand Down Expand Up @@ -1959,6 +1930,8 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::gcInMemEntries(
}
}

SYNC_FOR("after_PageDirectory::doGC_getLowestSeq");

PageEntriesV3 all_del_entries;
typename MVCCMapType::iterator iter;
{
Expand Down
Loading