Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dumpIncrementalCheckpoint utility #6928

Merged
merged 14 commits into from
Mar 3, 2023
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void CPFilesWriter::writePrefix(const CPFilesWriter::PrefixInfo & info)
write_stage = WriteStage::WritingEdits;
}

bool CPFilesWriter::writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edits)
bool CPFilesWriter::writeEditsAndApplyCheckpointInfo(universal::PageEntriesEdit & edits)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage));

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CPFilesWriter : private boost::noncopyable
* The list of lock files that will be always appended to the checkpoint file.
*
* Note: In addition to the specified lock files, the checkpoint file will also contain
* lock files from `writeEditsAndApplyRemoteInfo`.
* lock files from `writeEditsAndApplyCheckpointInfo`.
*/
const std::unordered_set<String> & must_locked_files = {};
};
Expand Down Expand Up @@ -71,7 +71,7 @@ class CPFilesWriter : private boost::noncopyable
*
* You must call `writeSuffix` finally, if you don't plan to write edits anymore.
*/
bool /* has_new_data */ writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edit);
bool /* has_new_data */ writeEditsAndApplyCheckpointInfo(universal::PageEntriesEdit & edit);

/**
* This function must be called, and must be called last, after other `writeXxx`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ try
edits.appendRecord({.type = EditRecordType::DEL});

ASSERT_THROW({
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
},
DB::Exception);
}
Expand All @@ -124,7 +124,7 @@ try
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -180,15 +180,15 @@ try
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "abc", .entry = {.size = 29, .offset = 5}});
edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "rain"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -290,7 +290,7 @@ try
edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "sun"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -379,7 +379,7 @@ try
.sequence = 5,
.last_sequence = 3,
});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
writer->writeSuffix();
writer.reset();

Expand Down Expand Up @@ -461,12 +461,12 @@ try
});
{
auto edits = universal::PageEntriesEdit{};
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
{
auto edits = universal::PageEntriesEdit{};
edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "snow"});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -562,7 +562,7 @@ try
},
});
edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down Expand Up @@ -621,7 +621,7 @@ try
.page_id = fmt::format("record_{}", i),
.entry = {.size = 22, .offset = 10},
});
writer->writeEditsAndApplyRemoteInfo(edits);
writer->writeEditsAndApplyCheckpointInfo(edits);
}
writer->writeSuffix();
writer.reset();
Expand Down
206 changes: 206 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Common/assert_cast.h>
#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>
#include <Storages/Page/V3/MapUtils.h>
#include <Storages/Page/V3/PageDefines.h>
#include <Storages/Page/V3/PageDirectory.h>
Expand Down Expand Up @@ -486,6 +487,102 @@ std::optional<PageEntryV3> VersionedPageEntries<Trait>::getLastEntry(std::option
return std::nullopt;
}

template <typename Trait>
void VersionedPageEntries<Trait>::copyCheckpointInfoFromEdit(const typename PageEntriesEdit::EditRecord & edit)
{
// We have a running PageStorage instance, and did a checkpoint dump. The checkpoint dump is encoded using
// PageEntriesEdit. During the checkpoint dump, this function is invoked so that we can write back where
// (the checkpoint info) each page's data was dumped.
// In this case, there is a living snapshot protecting the data.

// Pre-check: All ENTRY edit record must contain checkpoint info.
if (edit.type == EditRecordType::VAR_ENTRY)
RUNTIME_CHECK(edit.entry.checkpoint_info.has_value());

auto page_lock = acquireLock();

switch (type)
{
case EditRecordType::VAR_DELETE:
{
// For the same page this must not happen:
// Impossible case #A: Edit is delete and current is delete:
// VAR_DELETE will not create an edit.
// Impossible case #B: Edit is not delete and current is delete:
// VAR_EXTERNAL / VAR_REF / VAR_ENTRY page will not turn into VAR_DELETE.

// TODO: May be possible? Put X -> Delete X -> Full GC -> Delete X

// If this really happens, it means we are applying checkpoint info over a wrong PageStorage instance.
// May be we should provide a better message instead.
RUNTIME_CHECK(false,
toDebugString(),
edit);
break;
}
case EditRecordType::VAR_EXTERNAL:
{
// TODO: May be possible? Put X -> Delete X -> Full GC -> External X

RUNTIME_CHECK(
edit.type == EditRecordType::VAR_DELETE || edit.type == EditRecordType::VAR_EXTERNAL,
toDebugString(),
edit);
break;
}
case EditRecordType::VAR_REF:
{
// TODO: May be possible? Put X -> Delete X -> Full GC -> Ref X

RUNTIME_CHECK(
edit.type == EditRecordType::VAR_DELETE || edit.type == EditRecordType::VAR_REF,
toDebugString(),
edit);
break;
}
case EditRecordType::VAR_ENTRY:
{
// TODO: May be possible? Ref X -> Delete X -> Full GC -> Put X

RUNTIME_CHECK(
edit.type == EditRecordType::VAR_DELETE || edit.type == EditRecordType::VAR_ENTRY,
toDebugString(),
edit);

if (edit.type == EditRecordType::VAR_DELETE)
break;

// Due to GC movement, (sequence, epoch) may be changed to (sequence, epoch+x), so
// we search within [ (sequence, 0), (sequence+1, 0) ), and assign checkpoint info for all of it.
auto iter = MapUtils::findMutLess(entries, PageVersion(edit.version.sequence + 1));
RUNTIME_CHECK(iter != entries.end());
RUNTIME_CHECK(iter->first.sequence == edit.version.sequence); // TODO: If there is a full GC this may be false?

// Discard epoch, and only check sequence.
while (iter->first.sequence == edit.version.sequence)
{
// We will never meet the same Version mapping to one entry and one delete, so let's verify it is an entry.
RUNTIME_CHECK(iter->second.isEntry());
iter->second.entry.checkpoint_info = edit.entry.checkpoint_info;

if (iter == entries.begin())
break;
--iter;
}

// TODO: Check whether this is fine: Put X -> Delete X -> Full GC -> Put X
// ↑ A ↑ B
// The checkpoint info of A must not be recovered into B.

break;
}
default:
throw Exception(fmt::format(
"Calling VersionedPageEntries::copyCheckpointInfoFromEdit() with unexpected type: {}",
magic_enum::enum_name(type)));
}
}

// Returns true when **this id** is "visible" by `seq`.
// If this page id is marked as deleted or not created, it is "not visible".
// Note that not visible does not means this id can be GC.
Expand Down Expand Up @@ -1623,6 +1720,115 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter,
return done_any_io;
}

template <typename Trait>
void PageDirectory<Trait>::copyCheckpointInfoFromEdit(PageEntriesEdit & edit)
{
const auto & records = edit.getRecords();
if (records.empty())
return;

// Pre-check: All ENTRY edit record must contain remote info.
// We do the pre-check before copying any remote info to avoid partial completion.
for (const auto & rec : records)
{
if (rec.type == EditRecordType::VAR_ENTRY)
RUNTIME_CHECK(rec.entry.checkpoint_info.has_value());
}

for (const auto & rec : records)
{
// TODO: Improve from O(nlogn) to O(n).

typename MVCCMapType::iterator iter;
{
std::shared_lock read_lock(table_rw_mutex);
iter = mvcc_table_directory.find(rec.page_id);
if (iter == mvcc_table_directory.end())
// There may be obsolete entries deleted.
// For example, if there is a `Put 1` with sequence 10, `Del 1` with sequence 11,
// and the snapshot sequence is 12, Page with id 1 may be deleted by the gc process.
continue;
}

auto & entries = iter->second;
entries->copyCheckpointInfoFromEdit(rec);
}
}

template <>
typename PageDirectory<universal::PageDirectoryTrait>::DumpCheckpointResult
PageDirectory<universal::PageDirectoryTrait>::dumpIncrementalCheckpoint(const DumpCheckpointOptions & options)
Copy link
Contributor

@JaySon-Huang JaySon-Huang Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about moving this method to UniversalPageStorage? This method involves both getting snapshot entries from PageDirectory and reading page data from the BlobStore
And we don't need another DumpCheckpointOptions in PageDirectory

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! I have moved to UniversalPageStorage.

{
std::scoped_lock lock(checkpoint_mu);

// Let's keep this snapshot until all finished, so that blob data will not be GCed.
auto snap = createSnapshot(/*tracing_id*/ "dumpIncrementalCheckpoint");

if (snap->sequence == last_checkpoint_sequence)
return {};

auto edit_from_mem = dumpSnapshotToEdit(snap);

// As a checkpoint, we write both entries (in manifest) and its data.
// Some entries' data may be already written by a previous checkpoint. These data will not be written again.

auto data_file_id = fmt::format(
options.data_file_id_pattern,
fmt::arg("sequence", snap->sequence),
fmt::arg("sub_file_index", 0));
auto data_file_path = fmt::format(
options.data_file_path_pattern,
fmt::arg("sequence", snap->sequence),
fmt::arg("sub_file_index", 0));

auto manifest_file_id = fmt::format(
options.manifest_file_id_pattern,
fmt::arg("sequence", snap->sequence));
auto manifest_file_path = fmt::format(
options.manifest_file_path_pattern,
fmt::arg("sequence", snap->sequence));

RUNTIME_CHECK(
data_file_path != manifest_file_path,
data_file_path,
manifest_file_path);

auto writer = CPFilesWriter::create(
{
.data_file_path = data_file_path,
.data_file_id = data_file_id,
.manifest_file_path = manifest_file_path,
.manifest_file_id = manifest_file_id,
.data_source = options.data_source,
});

writer->writePrefix({
.writer = options.writer_info,
.sequence = snap->sequence,
.last_sequence = last_checkpoint_sequence,
});
bool has_new_data = writer->writeEditsAndApplyCheckpointInfo(edit_from_mem);
writer->writeSuffix();
writer.reset();

// TODO: Currently, even when has_new_data == false,
// something will be written to DataFile (i.e., the file prefix).
// This can be avoided, as its content is useless.
if (has_new_data)
{
// Copy back the checkpoint info to the current PageStorage.
// New checkpoint infos are attached in `writeEditsAndApplyCheckpointInfo`.
copyCheckpointInfoFromEdit(edit_from_mem);
}

last_checkpoint_sequence = snap->sequence;

return DumpCheckpointResult{
.new_data_files = {{.id = data_file_id, .path = data_file_path}},
.new_manifest_files = {{.id = manifest_file_id, .path = manifest_file_path}},
};
}

template <typename Trait>
typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::gcInMemEntries(bool return_removed_entries)
{
Expand Down
Loading