Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add checkpoint utilities #6900

Merged
merged 22 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ add_headers_and_sources(dbms src/Storages/Page/V3/spacemap)
add_headers_and_sources(dbms src/Storages/Page/V3/PageDirectory)
add_headers_and_sources(dbms src/Storages/Page/V3/Blob)
add_headers_and_sources(dbms src/Storages/Page/V3/Universal)
add_headers_and_sources(dbms src/Storages/Page/V3/CheckpointFile)
add_headers_and_sources(dbms src/Storages/Page/)
add_headers_and_sources(dbms src/Storages/S3)
add_headers_and_sources(dbms src/TiDB)
Expand Down Expand Up @@ -213,6 +214,13 @@ target_link_libraries (tiflash_common_io
target_include_directories (tiflash_common_io BEFORE PRIVATE ${kvClient_SOURCE_DIR}/include)
target_compile_definitions(tiflash_common_io PUBLIC -DTIFLASH_SOURCE_PREFIX=\"${TiFlash_SOURCE_DIR}\")
target_link_libraries (dbms
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
tiflash_contrib::aws_s3

tiflash_parsers
tiflash_common_config
tiflash_common_io
Expand All @@ -221,12 +229,7 @@ target_link_libraries (dbms
kv_client
tipb
dtpb
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
tiflash_contrib::aws_s3
PSCheckpointProto
)

# always add GmSSL include dir to the include path for static analysis
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_subdirectory (Page)
add_subdirectory (DeltaMerge/File/dtpb)
add_subdirectory (DeltaMerge/workload)
add_subdirectory (Page/workload)
add_subdirectory (Page/V3/CheckpointFile/Proto)

if (ENABLE_TESTS)
add_subdirectory (tests EXCLUDE_FROM_ALL)
Expand Down
73 changes: 73 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Page/V3/CheckpointFile/CPDataFileWriter.h>

namespace DB::PS::V3
{

void CPDataFileWriter::writePrefix(const CheckpointProto::DataFilePrefix & prefix)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingPrefix, "unexpected write stage {}", magic_enum::enum_name(write_stage));

details::writeMessageWithLength(*file_writer, prefix);
write_stage = WriteStage::WritingRecords;
}

CheckpointLocation CPDataFileWriter::write(UniversalPageId page_id, PageVersion version, const char * data, size_t n)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingRecords, "unexpected write stage {}", magic_enum::enum_name(write_stage));

// Every record is prefixed with the length, so that this data file can be parsed standalone.
writeIntBinary(n, *file_writer);

// TODO: getMaterializedBytes only works for FramedChecksumWriteBuffer, but does not work for a normal WriteBufferFromFile.
// There must be something wrong and should be fixed.
// uint64_t file_offset = file_writer->getMaterializedBytes();
// file_writer->write(data, n);
// uint64_t write_n = file_writer->getMaterializedBytes() - file_offset;

uint64_t file_offset = file_writer->count();
file_writer->write(data, n);
uint64_t write_n = file_writer->count() - file_offset;
breezewish marked this conversation as resolved.
Show resolved Hide resolved

auto * suffix_record = file_suffix.add_records();
suffix_record->set_page_id(page_id.asStr());
suffix_record->set_version_sequence(version.sequence);
suffix_record->set_version_epoch(version.epoch);
suffix_record->set_offset_in_file(file_offset);
suffix_record->set_size_in_file(write_n);

return CheckpointLocation{
.data_file_id = file_id,
.offset_in_file = file_offset,
.size_in_file = write_n,
};
}

void CPDataFileWriter::writeSuffix()
{
if (write_stage == WriteStage::WritingFinished)
return; // writeSuffix can be called multiple times without causing issues.
if (write_stage != WriteStage::WritingRecords)
RUNTIME_CHECK_MSG(false, "unexpected write stage {}", magic_enum::enum_name(write_stage));

std::string json;
google::protobuf::util::MessageToJsonString(file_suffix, &json);
writeStringBinary(json, *file_writer);
breezewish marked this conversation as resolved.
Show resolved Hide resolved

write_stage = WriteStage::WritingFinished;
}

} // namespace DB::PS::V3
95 changes: 95 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <IO/WriteBufferFromFile.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Storages/Page/V3/CheckpointFile/Proto/data_file.pb.h>
#include <Storages/Page/V3/CheckpointFile/ProtoHelper.h>
#include <Storages/Page/V3/CheckpointFile/fwd.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
#include <Storages/Page/V3/Universal/UniversalPageId.h>
#include <google/protobuf/util/json_util.h>

#include <magic_enum.hpp>
#include <string>

namespace DB::PS::V3
{

class CPDataFileWriter
{
public:
struct Options
{
const std::string & file_path;
const std::string & file_id;
};

static CPDataFileWriterPtr create(Options options)
{
return std::make_unique<CPDataFileWriter>(options);
}

explicit CPDataFileWriter(Options options)
: file_writer(std::make_unique<WriteBufferFromFile>(options.file_path))
, file_id(std::make_shared<std::string>(options.file_id))
{
// TODO: FramedChecksumWriteBuffer does not support random access for arbitrary frame sizes.
// So currently we use checksum = false.
// Need to update FramedChecksumWriteBuffer first.
// TODO: Support compressed data file.
}

~CPDataFileWriter()
{
flush();
}

void writePrefix(const CheckpointProto::DataFilePrefix & prefix);

CheckpointLocation write(UniversalPageId page_id, PageVersion version, const char * data, size_t n);

void writeSuffix();

void flush()
{
file_writer->next();
file_writer->sync();
}

size_t writtenRecords() const
{
return file_suffix.records_size();
}

private:
enum class WriteStage
{
WritingPrefix,
WritingRecords,
WritingFinished,
};

const std::unique_ptr<WriteBufferFromFile> file_writer;
const std::shared_ptr<const std::string> file_id; // Shared in each write result

CheckpointProto::DataFileSuffix file_suffix;
WriteStage write_stage = WriteStage::WritingPrefix;
};

} // namespace DB::PS::V3
34 changes: 34 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>

namespace DB::PS::V3
{

CPFilesWriter::CPFilesWriter(CPFilesWriter::Options options)
: manifest_file_id(options.manifest_file_id)
, data_writer(CPDataFileWriter::create({
.file_path = options.data_file_path,
.file_id = options.data_file_id,
}))
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
, manifest_writer(CPManifestFileWriter::create({
.file_path = options.manifest_file_path,
}))
, data_source(options.data_source)
, log(Logger::get())
{
}

} // namespace DB::PS::V3
140 changes: 140 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/CheckpointFile/CPDataFileWriter.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h>
#include <Storages/Page/V3/CheckpointFile/CPWriteDataSource.h>
#include <Storages/Page/V3/CheckpointFile/Proto/common.pb.h>
#include <Storages/Page/V3/CheckpointFile/fwd.h>
#include <Storages/Page/V3/PageEntriesEdit.h>

namespace DB::PS::V3
{

class CPFilesWriter : private boost::noncopyable
{
public:
struct Options
{
const std::string & data_file_path;
const std::string & data_file_id;
const std::string & manifest_file_path;
const std::string & manifest_file_id;
CPWriteDataSourcePtr data_source;
};

static CPFilesWriterPtr create(Options options)
{
return std::make_unique<CPFilesWriter>(std::move(options));
}

explicit CPFilesWriter(Options options);

struct PrefixInfo
{
const CheckpointProto::WriterInfo & writer;
const uint64_t sequence;
const uint64_t last_sequence;
};

void writePrefix(const PrefixInfo & info)
{
auto create_at_ms = Poco::Timestamp().epochMicroseconds() / 1000;

CheckpointProto::DataFilePrefix data_prefix;
data_prefix.set_local_sequence(info.sequence);
data_prefix.set_create_at_ms(create_at_ms);
data_prefix.mutable_writer_info()->CopyFrom(info.writer);
data_prefix.set_manifest_file_id(manifest_file_id);
data_prefix.set_sub_file_index(0);
data_writer->writePrefix(data_prefix);

CheckpointProto::ManifestFilePrefix manifest_prefix;
manifest_prefix.set_local_sequence(info.sequence);
manifest_prefix.set_last_local_sequence(info.last_sequence);
manifest_prefix.set_create_at_ms(create_at_ms);
manifest_prefix.mutable_writer_info()->CopyFrom(info.writer);
manifest_writer->writePrefix(manifest_prefix);
}

/// This function can be called multiple times.
bool /* has_new_data */ writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edit)
{
auto & records = edit.getMutRecords();
if (records.empty())
return false;

// 1. Iterate all edits, find these entry edits without the checkpoint info.
for (auto & rec_edit : records)
{
if (rec_edit.type != EditRecordType::VAR_ENTRY)
continue;
if (rec_edit.entry.checkpoint_info.has_value())
continue;

// 2. For entry edits without the checkpoint info, write them to the data file, and assign a new checkpoint info.
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK(page.isValid());
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
rec_edit.entry.checkpoint_info = {
.data_location = data_location,
.is_local_data_reclaimed = false,
};
}

// 3. Write down everything to the manifest.
manifest_writer->writeEdits(edit);

return data_writer->writtenRecords() > 0;
}

void writeEditsFinish()
{
manifest_writer->writeEditsFinish();
}

void writeSuffix()
{
data_writer->writeSuffix();
manifest_writer->writeSuffix();
}

void flush()
{
data_writer->flush();
manifest_writer->flush();
}

~CPFilesWriter()
{
flush();
}

private:
const std::string manifest_file_id;
const CPDataFileWriterPtr data_writer;
const CPManifestFileWriterPtr manifest_writer;
CPWriteDataSourcePtr data_source;

LoggerPtr log;
};

} // namespace DB::PS::V3
Loading