Skip to content

Commit

Permalink
Add dumpIncrementalCheckpoint utility (#6928)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
breezewish authored Mar 3, 2023
1 parent e42d690 commit 7949001
Show file tree
Hide file tree
Showing 8 changed files with 821 additions and 15 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void CPFilesWriter::writePrefix(const CPFilesWriter::PrefixInfo & info)
write_stage = WriteStage::WritingEdits;
}

bool CPFilesWriter::writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edits)
bool CPFilesWriter::writeEditsAndApplyCheckpointInfo(universal::PageEntriesEdit & edits)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage));

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CPFilesWriter : private boost::noncopyable
* The list of lock files that will be always appended to the checkpoint file.
*
* Note: In addition to the specified lock files, the checkpoint file will also contain
* lock files from `writeEditsAndApplyRemoteInfo`.
* lock files from `writeEditsAndApplyCheckpointInfo`.
*/
const std::unordered_set<String> & must_locked_files = {};
};
Expand Down Expand Up @@ -71,7 +71,7 @@ class CPFilesWriter : private boost::noncopyable
*
* You must call `writeSuffix` finally, if you don't plan to write edits anymore.
*/
bool /* has_new_data */ writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edit);
bool /* has_new_data */ writeEditsAndApplyCheckpointInfo(universal::PageEntriesEdit & edit);

/**
* This function must be called, and must be called last, after other `writeXxx`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CheckpointFileTest : public DB::base::TiFlashStorageTestBasic
void SetUp() override
{
dir = getTemporaryPath();
DB::tests::TiFlashTestEnv::tryRemovePath(dir);
dropDataOnDisk(dir);
createIfNotExist(dir);
}

Expand Down Expand Up @@ -99,7 +99,7 @@ try
edits.appendRecord({.type = EditRecordType::DEL});

ASSERT_THROW({
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
},
DB::Exception);
}
Expand All @@ -124,7 +124,7 @@ try
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -180,15 +180,15 @@ try
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "abc", .entry = {.size = 29, .offset = 5}});
edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "rain"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -290,7 +290,7 @@ try
edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "sun"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -379,7 +379,7 @@ try
.sequence = 5,
.last_sequence = 3,
});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
writer->writeSuffix();
writer.reset();

Expand Down Expand Up @@ -461,12 +461,12 @@ try
});
{
auto edits = universal::PageEntriesEdit{};
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "snow"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -562,7 +562,7 @@ try
},
});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -621,7 +621,7 @@ try
.page_id = fmt::format("record_{}", i),
.entry = {.size = 22, .offset = 10},
});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down
90 changes: 89 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ std::optional<PageEntryV3> VersionedPageEntries<Trait>::getLastEntry(std::option
auto page_lock = acquireLock();
if (type == EditRecordType::VAR_ENTRY)
{
for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++)
for (auto it_r = entries.rbegin(); it_r != entries.rend(); it_r++) // NOLINT(modernize-loop-convert)
{
if (seq.has_value() && it_r->first.sequence > seq.value())
continue;
Expand All @@ -486,6 +486,55 @@ std::optional<PageEntryV3> VersionedPageEntries<Trait>::getLastEntry(std::option
return std::nullopt;
}

template <typename Trait>
void VersionedPageEntries<Trait>::copyCheckpointInfoFromEdit(const typename PageEntriesEdit::EditRecord & edit)
{
// We have a running PageStorage instance, and did a checkpoint dump. The checkpoint dump is encoded using
// PageEntriesEdit. During the checkpoint dump, this function is invoked so that we can write back where
// (the checkpoint info) each page's data was dumped.
// In this case, there is a living snapshot protecting the data.

// Pre-check: All ENTRY edit record must contain checkpoint info for copying.
RUNTIME_CHECK(edit.type == EditRecordType::VAR_ENTRY);
RUNTIME_CHECK(edit.entry.checkpoint_info.has_value());

auto page_lock = acquireLock();

if (type != EditRecordType::VAR_ENTRY)
{
// For example, Put X -> Delete X -> dumpSnapshotToEdit -> Full GC -> Delete X -> copyCheckpointInfoFromEdit.
// In this case, we have X=VAR_ENTRY in the `edit`, but will get X=VAR_DELETE in the page directory.
return;
}

// Due to GC movement, (sequence, epoch) may be changed to (sequence, epoch+x), so
// we search within [ (sequence, 0), (sequence+1, 0) ), and assign checkpoint info for all of it.
auto iter = MapUtils::findMutLess(entries, PageVersion(edit.version.sequence + 1));
if (iter == entries.end())
{
// The referenced version may be GCed so that we cannot find any matching entry.
return;
}

// TODO: Not sure if there is a full GC this may be false? Let's keep it here for now.
RUNTIME_CHECK(
iter->first.sequence == edit.version.sequence,
iter->first.sequence,
edit.version.sequence);

// Discard epoch, and only check sequence.
while (iter->first.sequence == edit.version.sequence)
{
// We will never meet the same Version mapping to one entry and one delete, so let's verify it is an entry.
RUNTIME_CHECK(iter->second.isEntry());
iter->second.entry.checkpoint_info = edit.entry.checkpoint_info;

if (iter == entries.begin())
break;
--iter;
}
}

// Returns true when **this id** is "visible" by `seq`.
// If this page id is marked as deleted or not created, it is "not visible".
// Note that not visible does not means this id can be GC.
Expand Down Expand Up @@ -1623,6 +1672,45 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter,
return done_any_io;
}

template <typename Trait>
void PageDirectory<Trait>::copyCheckpointInfoFromEdit(PageEntriesEdit & edit)
{
const auto & records = edit.getRecords();
if (records.empty())
return;

// Pre-check: All ENTRY edit record must contain checkpoint info.
// We do the pre-check before copying any remote info to avoid partial completion.
for (const auto & rec : records)
{
if (rec.type == EditRecordType::VAR_ENTRY)
RUNTIME_CHECK(rec.entry.checkpoint_info.has_value());
}

for (const auto & rec : records)
{
// Only VAR_ENTRY will contain checkpoint info.
if (rec.type != EditRecordType::VAR_ENTRY)
continue;

// TODO: Improve from O(nlogn) to O(n).

typename MVCCMapType::iterator iter;
{
std::shared_lock read_lock(table_rw_mutex);
iter = mvcc_table_directory.find(rec.page_id);
if (iter == mvcc_table_directory.end())
// There may be obsolete entries deleted.
// For example, if there is a `Put 1` with sequence 10, `Del 1` with sequence 11,
// and the snapshot sequence is 12, Page with id 1 may be deleted by the gc process.
continue;
}

auto & entries = iter->second;
entries->copyCheckpointInfoFromEdit(rec);
}
}

template <typename Trait>
typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::gcInMemEntries(bool return_removed_entries)
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern const Metric PSMVCCNumSnapshots;

namespace DB::PS::V3
{

class PageDirectorySnapshot : public DB::PageStorageSnapshot
{
public:
Expand Down Expand Up @@ -186,6 +187,8 @@ class VersionedPageEntries

std::optional<PageEntryV3> getLastEntry(std::optional<UInt64> seq) const;

void copyCheckpointInfoFromEdit(const typename PageEntriesEdit::EditRecord & edit);

bool isVisible(UInt64 seq) const;

/**
Expand Down Expand Up @@ -353,6 +356,8 @@ class PageDirectory
/// And we don't restore the entries in blob store, because this PageDirectory is just read only for its entries.
bool tryDumpSnapshot(const ReadLimiterPtr & read_limiter = nullptr, const WriteLimiterPtr & write_limiter = nullptr, bool force = false);

void copyCheckpointInfoFromEdit(PageEntriesEdit & edit);

// Perform a GC for in-memory entries and return the removed entries.
// If `return_removed_entries` is false, then just return an empty set.
PageEntries gcInMemEntries(bool return_removed_entries = true);
Expand Down
78 changes: 78 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Common/TiFlashMetrics.h>
#include <Storages/Page/V3/Blob/BlobConfig.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>
#include <Storages/Page/V3/CheckpointFile/CPWriteDataSource.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
#include <Storages/Page/V3/Universal/UniversalWriteBatchImpl.h>
Expand Down Expand Up @@ -202,10 +204,86 @@ void UniversalPageStorage::registerUniversalExternalPagesCallbacks(const Univers
{
manager.registerExternalPagesCallbacks(callbacks);
}

void UniversalPageStorage::unregisterUniversalExternalPagesCallbacks(const String & prefix)
{
manager.unregisterExternalPagesCallbacks(prefix);
// clean all external ids ptrs
page_directory->unregisterNamespace(prefix);
}

UniversalPageStorage::DumpCheckpointResult
UniversalPageStorage::dumpIncrementalCheckpoint(const UniversalPageStorage::DumpCheckpointOptions & options)
{
std::scoped_lock lock(checkpoint_mu);

// Let's keep this snapshot until all finished, so that blob data will not be GCed.
auto snap = page_directory->createSnapshot(/*tracing_id*/ "dumpIncrementalCheckpoint");

if (snap->sequence == last_checkpoint_sequence)
return {};

auto edit_from_mem = page_directory->dumpSnapshotToEdit(snap);

// As a checkpoint, we write both entries (in manifest) and its data.
// Some entries' data may be already written by a previous checkpoint. These data will not be written again.

auto data_file_id = fmt::format(
fmt::runtime(options.data_file_id_pattern),
fmt::arg("sequence", snap->sequence),
fmt::arg("sub_file_index", 0));
auto data_file_path = fmt::format(
fmt::runtime(options.data_file_path_pattern),
fmt::arg("sequence", snap->sequence),
fmt::arg("sub_file_index", 0));

auto manifest_file_id = fmt::format(
fmt::runtime(options.manifest_file_id_pattern),
fmt::arg("sequence", snap->sequence));
auto manifest_file_path = fmt::format(
fmt::runtime(options.manifest_file_path_pattern),
fmt::arg("sequence", snap->sequence));

RUNTIME_CHECK(
data_file_path != manifest_file_path,
data_file_path,
manifest_file_path);

auto writer = PS::V3::CPFilesWriter::create({
.data_file_path = data_file_path,
.data_file_id = data_file_id,
.manifest_file_path = manifest_file_path,
.manifest_file_id = manifest_file_id,
.data_source = PS::V3::CPWriteDataSourceBlobStore::create(*blob_store),
});

writer->writePrefix({
.writer = options.writer_info,
.sequence = snap->sequence,
.last_sequence = last_checkpoint_sequence,
});
bool has_new_data = writer->writeEditsAndApplyCheckpointInfo(edit_from_mem);
writer->writeSuffix();
writer.reset();

SYNC_FOR("before_PageStorage::dumpIncrementalCheckpoint_copyInfo");

// TODO: Currently, even when has_new_data == false,
// something will be written to DataFile (i.e., the file prefix).
// This can be avoided, as its content is useless.
if (has_new_data)
{
// Copy back the checkpoint info to the current PageStorage.
// New checkpoint infos are attached in `writeEditsAndApplyCheckpointInfo`.
page_directory->copyCheckpointInfoFromEdit(edit_from_mem);
}

last_checkpoint_sequence = snap->sequence;

return DumpCheckpointResult{
.new_data_files = {{.id = data_file_id, .path = data_file_path}},
.new_manifest_files = {{.id = manifest_file_id, .path = manifest_file_path}},
};
}

} // namespace DB
Loading

0 comments on commit 7949001

Please sign in to comment.