Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dumpIncrementalCheckpoint utility #6928

Merged
merged 14 commits into from
Mar 3, 2023
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void CPFilesWriter::writePrefix(const CPFilesWriter::PrefixInfo & info)
write_stage = WriteStage::WritingEdits;
}

bool CPFilesWriter::writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edits)
bool CPFilesWriter::writeEditsAndApplyCheckpointInfo(universal::PageEntriesEdit & edits)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage));

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CPFilesWriter : private boost::noncopyable
* The list of lock files that will be always appended to the checkpoint file.
*
* Note: In addition to the specified lock files, the checkpoint file will also contain
* lock files from `writeEditsAndApplyRemoteInfo`.
* lock files from `writeEditsAndApplyCheckpointInfo`.
*/
const std::unordered_set<String> & must_locked_files = {};
};
Expand Down Expand Up @@ -71,7 +71,7 @@ class CPFilesWriter : private boost::noncopyable
*
* You must call `writeSuffix` finally, if you don't plan to write edits anymore.
*/
bool /* has_new_data */ writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edit);
bool /* has_new_data */ writeEditsAndApplyCheckpointInfo(universal::PageEntriesEdit & edit);

/**
* This function must be called, and must be called last, after other `writeXxx`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CheckpointFileTest : public DB::base::TiFlashStorageTestBasic
void SetUp() override
{
dir = getTemporaryPath();
DB::tests::TiFlashTestEnv::tryRemovePath(dir);
dropDataOnDisk(dir);
createIfNotExist(dir);
}

Expand Down Expand Up @@ -99,7 +99,7 @@ try
edits.appendRecord({.type = EditRecordType::DEL});

ASSERT_THROW({
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
},
DB::Exception);
}
Expand All @@ -124,7 +124,7 @@ try
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -180,15 +180,15 @@ try
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "abc", .entry = {.size = 29, .offset = 5}});
edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "rain"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -290,7 +290,7 @@ try
edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "sun"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -379,7 +379,7 @@ try
.sequence = 5,
.last_sequence = 3,
});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
writer->writeSuffix();
writer.reset();

Expand Down Expand Up @@ -461,12 +461,12 @@ try
});
{
auto edits = universal::PageEntriesEdit{};
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "snow"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -562,7 +562,7 @@ try
},
});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -621,7 +621,7 @@ try
.page_id = fmt::format("record_{}", i),
.entry = {.size = 22, .offset = 10},
});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down
90 changes: 89 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ std::optional<PageEntryV3> VersionedPageEntries<Trait>::getLastEntry(std::option
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++)
for (const auto & entrie : std::ranges::reverse_view(entries))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (const auto & entrie : std::ranges::reverse_view(entries))
for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++)

keep it not change because we don't support clang 15 in CI by now

{
if (seq.has_value() && it_r->first.sequence > seq.value())
continue;
Expand All @@ -486,6 +486,55 @@ std::optional<PageEntryV3> VersionedPageEntries<Trait>::getLastEntry(std::option
return std::nullopt;
}

template <typename Trait>
void VersionedPageEntries<Trait>::copyCheckpointInfoFromEdit(const typename PageEntriesEdit::EditRecord & edit)
{
// We have a running PageStorage instance, and did a checkpoint dump. The checkpoint dump is encoded using
// PageEntriesEdit. During the checkpoint dump, this function is invoked so that we can write back where
// (the checkpoint info) each page's data was dumped.
// In this case, there is a living snapshot protecting the data.

// Pre-check: All ENTRY edit record must contain checkpoint info for copying.
RUNTIME_CHECK(edit.type == EditRecordType::VAR_ENTRY);
RUNTIME_CHECK(edit.entry.checkpoint_info.has_value());

auto page_lock = acquireLock();

if (type != EditRecordType::VAR_ENTRY)
{
// For example, Put X -> Delete X -> dumpSnapshotToEdit -> Full GC -> Delete X -> copyCheckpointInfoFromEdit.
// In this case, we have X=VAR_ENTRY in the `edit`, but will get X=VAR_DELETE in the page directory.
return;
}

// Due to GC movement, (sequence, epoch) may be changed to (sequence, epoch+x), so
// we search within [ (sequence, 0), (sequence+1, 0) ), and assign checkpoint info for all of it.
auto iter = MapUtils::findMutLess(entries, PageVersion(edit.version.sequence + 1));
if (iter == entries.end())
{
// The referenced version may be GCed so that we cannot find any matching entry.
return;
}

// TODO: Not sure if there is a full GC this may be false? Let's keep it here for now.
RUNTIME_CHECK(
iter->first.sequence == edit.version.sequence,
iter->first.sequence,
edit.version.sequence);

// Discard epoch, and only check sequence.
while (iter->first.sequence == edit.version.sequence)
{
// We will never meet the same Version mapping to one entry and one delete, so let's verify it is an entry.
RUNTIME_CHECK(iter->second.isEntry());
iter->second.entry.checkpoint_info = edit.entry.checkpoint_info;

if (iter == entries.begin())
break;
--iter;
}
}

// Returns true when **this id** is "visible" by `seq`.
// If this page id is marked as deleted or not created, it is "not visible".
// Note that not visible does not means this id can be GC.
Expand Down Expand Up @@ -1623,6 +1672,45 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter,
return done_any_io;
}

template <typename Trait>
void PageDirectory<Trait>::copyCheckpointInfoFromEdit(PageEntriesEdit & edit)
{
const auto & records = edit.getRecords();
if (records.empty())
return;

// Pre-check: All ENTRY edit record must contain checkpoint info.
// We do the pre-check before copying any remote info to avoid partial completion.
for (const auto & rec : records)
{
if (rec.type == EditRecordType::VAR_ENTRY)
RUNTIME_CHECK(rec.entry.checkpoint_info.has_value());
}

for (const auto & rec : records)
{
// Only VAR_ENTRY will contain checkpoint info.
if (rec.type != EditRecordType::VAR_ENTRY)
continue;

// TODO: Improve from O(nlogn) to O(n).

typename MVCCMapType::iterator iter;
{
std::shared_lock read_lock(table_rw_mutex);
iter = mvcc_table_directory.find(rec.page_id);
if (iter == mvcc_table_directory.end())
// There may be obsolete entries deleted.
// For example, if there is a `Put 1` with sequence 10, `Del 1` with sequence 11,
// and the snapshot sequence is 12, Page with id 1 may be deleted by the gc process.
continue;
}

auto & entries = iter->second;
entries->copyCheckpointInfoFromEdit(rec);
}
}

template <typename Trait>
typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::gcInMemEntries(bool return_removed_entries)
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern const Metric PSMVCCNumSnapshots;

namespace DB::PS::V3
{

class PageDirectorySnapshot : public DB::PageStorageSnapshot
{
public:
Expand Down Expand Up @@ -186,6 +187,8 @@ class VersionedPageEntries

std::optional<PageEntryV3> getLastEntry(std::optional<UInt64> seq) const;

void copyCheckpointInfoFromEdit(const typename PageEntriesEdit::EditRecord & edit);

bool isVisible(UInt64 seq) const;

/**
Expand Down Expand Up @@ -353,6 +356,8 @@ class PageDirectory
/// And we don't restore the entries in blob store, because this PageDirectory is just read only for its entries.
bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr, bool force = false);

void copyCheckpointInfoFromEdit(PageEntriesEdit & edit);

// Perform a GC for in-memory entries and return the removed entries.
// If `return_removed_entries` is false, then just return an empty set.
PageEntries gcInMemEntries(bool return_removed_entries = true);
Expand Down
78 changes: 78 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Common/TiFlashMetrics.h>
#include <Storages/Page/V3/Blob/BlobConfig.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>
#include <Storages/Page/V3/CheckpointFile/CPWriteDataSource.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
#include <Storages/Page/V3/Universal/UniversalWriteBatchImpl.h>
Expand Down Expand Up @@ -202,10 +204,86 @@ void UniversalPageStorage::registerUniversalExternalPagesCallbacks(const Univers
{
manager.registerExternalPagesCallbacks(callbacks);
}

void UniversalPageStorage::unregisterUniversalExternalPagesCallbacks(const String & prefix)
{
manager.unregisterExternalPagesCallbacks(prefix);
// clean all external ids ptrs
page_directory->unregisterNamespace(prefix);
}

UniversalPageStorage::DumpCheckpointResult
UniversalPageStorage::dumpIncrementalCheckpoint(const UniversalPageStorage::DumpCheckpointOptions & options)
{
std::scoped_lock lock(checkpoint_mu);

// Let's keep this snapshot until all finished, so that blob data will not be GCed.
auto snap = page_directory->createSnapshot(/*tracing_id*/ "dumpIncrementalCheckpoint");

if (snap->sequence == last_checkpoint_sequence)
return {};

auto edit_from_mem = page_directory->dumpSnapshotToEdit(snap);

// As a checkpoint, we write both entries (in manifest) and its data.
// Some entries' data may be already written by a previous checkpoint. These data will not be written again.

auto data_file_id = fmt::format(
fmt::runtime(options.data_file_id_pattern),
fmt::arg("sequence", snap->sequence),
fmt::arg("sub_file_index", 0));
auto data_file_path = fmt::format(
fmt::runtime(options.data_file_path_pattern),
fmt::arg("sequence", snap->sequence),
fmt::arg("sub_file_index", 0));

auto manifest_file_id = fmt::format(
fmt::runtime(options.manifest_file_id_pattern),
fmt::arg("sequence", snap->sequence));
auto manifest_file_path = fmt::format(
fmt::runtime(options.manifest_file_path_pattern),
fmt::arg("sequence", snap->sequence));

RUNTIME_CHECK(
data_file_path != manifest_file_path,
data_file_path,
manifest_file_path);

auto writer = PS::V3::CPFilesWriter::create({
.data_file_path = data_file_path,
.data_file_id = data_file_id,
.manifest_file_path = manifest_file_path,
.manifest_file_id = manifest_file_id,
.data_source = PS::V3::CPWriteDataSourceBlobStore::create(*blob_store),
});

writer->writePrefix({
.writer = options.writer_info,
.sequence = snap->sequence,
.last_sequence = last_checkpoint_sequence,
});
bool has_new_data = writer->writeEditsAndApplyCheckpointInfo(edit_from_mem);
writer->writeSuffix();
writer.reset();

SYNC_FOR("before_PageStorage::dumpIncrementalCheckpoint_copyInfo");

// TODO: Currently, even when has_new_data == false,
// something will be written to DataFile (i.e., the file prefix).
// This can be avoided, as its content is useless.
if (has_new_data)
{
// Copy back the checkpoint info to the current PageStorage.
// New checkpoint infos are attached in `writeEditsAndApplyCheckpointInfo`.
page_directory->copyCheckpointInfoFromEdit(edit_from_mem);
}

last_checkpoint_sequence = snap->sequence;

return DumpCheckpointResult{
.new_data_files = {{.id = data_file_id, .path = data_file_path}},
.new_manifest_files = {{.id = manifest_file_id, .path = manifest_file_path}},
};
}

} // namespace DB
Loading