Skip to content

Commit

Permalink
Revert "improve ps gc speed by dumping snapshot from memory directly (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Aug 10, 2023
1 parent 11e9a74 commit dd87ab9
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 447 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ namespace DB
F(type_clean_external, {{"type", "clean_external"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_storage_page_command_count, "Total number of PageStorage's command, such as write / read / scan / snapshot", Counter, \
F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), F(type_read_page_dir, {"type", "read_page_dir"}), \
F(type_read_blob, {"type", "read_blob"}), F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \
F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), \
F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \
M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \
F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \
M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ template <typename Trait>
typename BlobStore<Trait>::PageMap
BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
if (entries.empty())
{
return {};
Expand Down Expand Up @@ -1001,6 +1002,7 @@ BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_l
template <typename Trait>
Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
const auto & [page_id_v3, entry] = id_entry;
const size_t buf_size = entry.size;

Expand Down Expand Up @@ -1060,7 +1062,7 @@ Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPt
template <typename Trait>
BlobFilePtr BlobStore<Trait>::read(const typename BlobStore<Trait>::PageId & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
{
GET_METRIC(tiflash_storage_page_command_count, type_read_blob).Increment();
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
assert(buffers != nullptr);
BlobFilePtr blob_file = getBlobFile(blob_id);
try
Expand Down
17 changes: 8 additions & 9 deletions dbms/src/Storages/Page/V3/GCDefines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ GCTimeStatistics ExternalPageCallbacksManager<Trait>::doGC(

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
statistics.compact_wal_happen = page_directory.tryDumpSnapshot(read_limiter, write_limiter, force_wal_compact);
if (statistics.compact_wal_happen)
{
GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment();
}
statistics.compact_wal_ms = gc_watch.elapsedMillisecondsFromLastTime();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_wal).Observe(statistics.compact_wal_ms / 1000.0);

typename Trait::PageDirectory::InMemGCOption options;
if constexpr (std::is_same_v<Trait, universal::ExternalPageCallbacksManagerTrait>)
{
Expand All @@ -247,15 +255,6 @@ GCTimeStatistics ExternalPageCallbacksManager<Trait>::doGC(
statistics.compact_directory_ms = gc_watch.elapsedMillisecondsFromLastTime();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_directory).Observe(statistics.compact_directory_ms / 1000.0);

// Compact WAL after in-memory GC in PageDirectory in order to reduce the overhead of dumping useless entries
statistics.compact_wal_happen = page_directory.tryDumpSnapshot(write_limiter, force_wal_compact);
if (statistics.compact_wal_happen)
{
GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment();
}
statistics.compact_wal_ms = gc_watch.elapsedMillisecondsFromLastTime();
GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_compact_wal).Observe(statistics.compact_wal_ms / 1000.0);

SYNC_FOR("before_PageStorageImpl::doGC_fullGC_prepare");

// 2. Remove the expired entries in BlobStore.
Expand Down
106 changes: 66 additions & 40 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void VersionedPageEntries<Trait>::createNewEntry(const PageVersion & ver, const
assert(last_iter->second.isEntry());
// It is ok to replace the entry with same sequence and newer epoch, but not valid
// to replace the entry with newer sequence.
if (unlikely(last_iter->second.being_ref_count.getLatestRefCount() != 1 && last_iter->first.sequence < ver.sequence))
if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence))
{
throw Exception(
fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]",
Expand Down Expand Up @@ -151,7 +151,7 @@ typename VersionedPageEntries<Trait>::PageId VersionedPageEntries<Trait>::create
assert(last_iter->second.isEntry());
// It is ok to replace the entry with same sequence and newer epoch, but not valid
// to replace the entry with newer sequence.
if (unlikely(last_iter->second.being_ref_count.getLatestRefCount() != 1 && last_iter->first.sequence < ver.sequence))
if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence))
{
throw Exception(
fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]",
Expand Down Expand Up @@ -218,6 +218,7 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
is_deleted = false;
create_ver = ver;
delete_ver = PageVersion(0);
being_ref_count = 1;
RUNTIME_CHECK(entries.empty());
entries.emplace(create_ver, EntryOrDelete::newNormalEntry(entry));
// return the new created holder to caller to set the page_id
Expand All @@ -235,6 +236,7 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
is_deleted = false;
create_ver = ver;
delete_ver = PageVersion(0);
being_ref_count = 1;
entries.emplace(create_ver, EntryOrDelete::newNormalEntry(entry));
// return the new created holder to caller to set the page_id
external_holder = std::make_shared<typename Trait::PageId>();
Expand Down Expand Up @@ -400,15 +402,15 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
type = EditRecordType::VAR_EXTERNAL;
is_deleted = false;
create_ver = rec.version;
being_ref_count.restoreFrom(rec.version, rec.being_ref_count);
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.version, 1 /* meaningless */));
being_ref_count = rec.being_ref_count;
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.being_ref_count));
external_holder = std::make_shared<typename Trait::PageId>(rec.page_id);
return external_holder;
}
case EditRecordType::VAR_ENTRY:
{
type = EditRecordType::VAR_ENTRY;
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.version, rec.being_ref_count));
entries.emplace(rec.version, EntryOrDelete::newFromRestored(rec.entry, rec.being_ref_count));
return nullptr;
}
default:
Expand Down Expand Up @@ -612,12 +614,12 @@ bool VersionedPageEntries<Trait>::isVisible(UInt64 seq) const
}

template <typename Trait>
Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & target_ver, const PageVersion & ref_ver)
Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & ver)
{
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
if (auto iter = MapUtils::findMutLess(entries, PageVersion(target_ver.sequence + 1));
if (auto iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1));
iter != entries.end())
{
// ignore all "delete"
Expand All @@ -630,27 +632,23 @@ Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & target_ver,
// Then `iter` point to an entry or the `entries.begin()`, return if entry found
if (iter->second.isEntry())
{
auto ref_count_value = iter->second.being_ref_count.getLatestRefCount();
if (unlikely(met_delete && ref_count_value == 1))
if (unlikely(met_delete && iter->second.being_ref_count == 1))
{
throw Exception(fmt::format("Try to add ref to a completely deleted entry [entry={}] [ver={}]", iter->second, target_ver), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Try to add ref to a completely deleted entry [entry={}] [ver={}]", iter->second, ver), ErrorCodes::LOGICAL_ERROR);
}
iter->second.being_ref_count.incrRefCount(ref_ver, 1);
return ref_count_value + 1;
return ++iter->second.being_ref_count;
}
} // fallthrough to FAIL
}
else if (type == EditRecordType::VAR_EXTERNAL)
{
if (create_ver <= target_ver)
if (create_ver <= ver)
{
// We may add reference to an external id even if it is logically deleted.
auto ref_count_value = being_ref_count.getLatestRefCount();
being_ref_count.incrRefCount(ref_ver, 1);
return ref_count_value + 1;
return ++being_ref_count;
}
}
throw Exception(fmt::format("The entry to be added ref count is not found [ver={}] [state={}]", target_ver, toDebugString()), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("The entry to be added ref count is not found [ver={}] [state={}]", ver, toDebugString()), ErrorCodes::LOGICAL_ERROR);
}

template <typename Trait>
Expand Down Expand Up @@ -711,7 +709,7 @@ bool VersionedPageEntries<Trait>::cleanOutdatedEntries(
{
if (type == EditRecordType::VAR_EXTERNAL)
{
return (being_ref_count.getLatestRefCount() == 1 && is_deleted && delete_ver.sequence <= lowest_seq);
return (being_ref_count == 1 && is_deleted && delete_ver.sequence <= lowest_seq);
}
else if (type == EditRecordType::VAR_REF)
{
Expand Down Expand Up @@ -789,7 +787,7 @@ bool VersionedPageEntries<Trait>::cleanOutdatedEntries(
{
if (last_entry_is_delete)
{
if (iter->second.being_ref_count.getLatestRefCount() == 1)
if (iter->second.being_ref_count == 1)
{
if (entries_removed)
{
Expand Down Expand Up @@ -831,8 +829,12 @@ bool VersionedPageEntries<Trait>::derefAndClean(
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_EXTERNAL)
{
being_ref_count.decrRefCountInSnap(lowest_seq, deref_count);
return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count.getLatestRefCount() == 1);
if (being_ref_count <= deref_count)
{
throw Exception(fmt::format("Decreasing ref count error [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count));
}
being_ref_count -= deref_count;
return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count == 1);
}
else if (type == EditRecordType::VAR_ENTRY)
{
Expand All @@ -855,7 +857,11 @@ bool VersionedPageEntries<Trait>::derefAndClean(
throw Exception(fmt::format("Can not find entry for decreasing ref count till the begin [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count));
}
assert(iter->second.isEntry());
iter->second.being_ref_count.decrRefCountInSnap(lowest_seq, deref_count);
if (iter->second.being_ref_count <= deref_count)
{
throw Exception(fmt::format("Decreasing ref count error [page_id={}] [ver={}] [deref_count={}] [entry={}]", page_id, deref_ver, deref_count, iter->second));
}
iter->second.being_ref_count -= deref_count;

if (lowest_seq == 0)
return false;
Expand Down Expand Up @@ -892,7 +898,7 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
return;
auto iter = entries.find(create_ver);
RUNTIME_CHECK(iter != entries.end());
edit.varExternal(page_id, create_ver, iter->second.entry, being_ref_count.getRefCountInSnap(seq));
edit.varExternal(page_id, create_ver, iter->second.entry, being_ref_count);
if (is_deleted && delete_ver.sequence <= seq)
{
edit.varDel(page_id, delete_ver);
Expand All @@ -910,7 +916,7 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
if (last_iter->second.isEntry())
{
const auto & entry = last_iter->second;
edit.varEntry(page_id, /*ver*/ last_iter->first, entry.entry, entry.being_ref_count.getRefCountInSnap(seq));
edit.varEntry(page_id, /*ver*/ last_iter->first, entry.entry, entry.being_ref_count);
return;
}
else if (last_iter->second.isDelete())
Expand All @@ -924,12 +930,11 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
auto prev_iter = --last_iter; // Note that `last_iter` should not be used anymore
if (prev_iter->second.isEntry())
{
auto ref_count_value = prev_iter->second.being_ref_count.getRefCountInSnap(seq);
if (ref_count_value == 1)
if (prev_iter->second.being_ref_count == 1)
return;
// It is being ref by another id, should persist the item and delete
const auto & entry = prev_iter->second;
edit.varEntry(page_id, prev_iter->first, entry.entry, ref_count_value);
edit.varEntry(page_id, prev_iter->first, entry.entry, entry.being_ref_count);
edit.varDel(page_id, last_version);
}
}
Expand Down Expand Up @@ -1016,7 +1021,7 @@ SnapshotsStatistics PageDirectory<Trait>::getSnapshotsStat() const
template <typename Trait>
typename PageDirectory<Trait>::PageIdAndEntry PageDirectory<Trait>::getByIDImpl(const PageId & page_id, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read_page_dir).Increment();
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
PageEntryV3 entry_got;

// After two write batches applied: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10}, the `mvcc_table_directory` is:
Expand Down Expand Up @@ -1111,7 +1116,7 @@ template <typename Trait>
std::pair<typename PageDirectory<Trait>::PageIdAndEntries, typename PageDirectory<Trait>::PageIds>
PageDirectory<Trait>::getByIDsImpl(const typename PageDirectory<Trait>::PageIds & page_ids, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read_page_dir).Increment();
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
PageEntryV3 entry_got;
PageIds page_not_found = {};

Expand Down Expand Up @@ -1437,7 +1442,7 @@ void PageDirectory<Trait>::applyRefEditRecord(
// Add the ref-count of being-ref entry
if (auto resolved_iter = mvcc_table_directory.find(resolved_id); resolved_iter != mvcc_table_directory.end())
{
resolved_iter->second->incrRefCount(resolved_ver, version);
resolved_iter->second->incrRefCount(resolved_ver);
}
else
{
Expand Down Expand Up @@ -1829,8 +1834,7 @@ PageDirectory<Trait>::getEntriesByBlobIdsForDifferentPageTypes(const typename Pa
return page_type_and_gc_info;
}

template <typename Trait>
bool PageDirectory<Trait>::tryDumpSnapshot(const WriteLimiterPtr & write_limiter, bool force)
bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter, bool force)
{
// Only apply compact logs when files snapshot is valid
auto files_snap = wal->tryGetFilesSnapshot(max_persisted_log_files, force);
Expand All @@ -1846,19 +1850,43 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const WriteLimiterPtr & write_limiter
auto identifier = fmt::format("{}.dump_{}", wal->name(), log_num);

Stopwatch watch;
auto snapshot_reader = wal->createReaderForFiles(identifier, files_snap.persisted_log_files, read_limiter);
// we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
// persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
auto collapsed_dir = [&]() {
// we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
// persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
static_assert(std::is_same_v<Trait, u128::PageDirectoryTrait> || std::is_same_v<Trait, universal::PageDirectoryTrait>,
"unknown impl");
if constexpr (std::is_same_v<Trait, u128::PageDirectoryTrait>)
{
u128::PageDirectoryFactory factory;
return factory.createFromReader(
identifier,
std::move(snapshot_reader),
/* wal */ nullptr);
}
else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
universal::PageDirectoryFactory factory;
return factory.createFromReader(
identifier,
std::move(snapshot_reader),
/* wal */ nullptr);
}
}();
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto snap = createSnapshot(identifier);
auto edit = dumpSnapshotToEdit(snap);
files_snap.num_records = edit.size();
files_snap.dump_elapsed_ms = watch.elapsedMilliseconds();
auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit();
files_snap.num_records = edit_from_disk.size();
files_snap.read_elapsed_ms = watch.elapsedMilliseconds();
if constexpr (std::is_same_v<Trait, u128::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit), write_limiter);
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit_from_disk), write_limiter);
return done_any_io;
}
else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit), write_limiter);
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit_from_disk), write_limiter);
return done_any_io;
}
}
Expand Down Expand Up @@ -1939,8 +1967,6 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::gcInMemEntries(
}
}

SYNC_FOR("after_PageDirectory::doGC_getLowestSeq");

PageEntriesV3 all_del_entries;
typename MVCCMapType::iterator iter;
{
Expand Down
Loading

0 comments on commit dd87ab9

Please sign in to comment.