Skip to content

Commit

Permalink
Merge branch 'master' into lock_service
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 28, 2023
2 parents 994c65b + 9ea9cf7 commit b29591e
Show file tree
Hide file tree
Showing 29 changed files with 2,192 additions and 11 deletions.
16 changes: 9 additions & 7 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ target_link_libraries (tiflash_common_io
target_include_directories (tiflash_common_io BEFORE PRIVATE ${kvClient_SOURCE_DIR}/include)
target_compile_definitions(tiflash_common_io PUBLIC -DTIFLASH_SOURCE_PREFIX=\"${TiFlash_SOURCE_DIR}\")
target_link_libraries (dbms
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
tiflash_contrib::aws_s3

etcdpb
tiflash_parsers
tiflash_common_config
tiflash_common_io
Expand All @@ -193,13 +201,7 @@ target_link_libraries (dbms
kv_client
tipb
dtpb
etcdpb
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
tiflash_contrib::aws_s3
PSCheckpointProto
)

# always add GmSSL include dir to the include path for static analysis
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)

add_subdirectory (./V3/CheckpointFile/Proto)

add_headers_and_sources(page .)
add_headers_and_sources(page ./V1)
add_headers_and_sources(page ./V1/mvcc)
Expand All @@ -28,6 +30,7 @@ add_headers_and_sources(page ./V3/LogFile)
add_headers_and_sources(page ./V3/PageDirectory)
add_headers_and_sources(page ./V3/spacemap)
add_headers_and_sources(page ./V3/Universal)
add_headers_and_sources(page ./V3/CheckpointFile)
add_headers_and_sources(page ./V3/WAL)

add_library(page ${page_headers} ${page_sources})
Expand Down
77 changes: 77 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Page/V3/CheckpointFile/CPDataFileWriter.h>

namespace DB::PS::V3
{

void CPDataFileWriter::writePrefix(const CheckpointProto::DataFilePrefix & prefix)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingPrefix, "unexpected write stage {}", magic_enum::enum_name(write_stage));

std::string json;
google::protobuf::util::MessageToJsonString(prefix, &json);
writeStringBinary(json, *file_writer);

write_stage = WriteStage::WritingRecords;
}

CheckpointLocation CPDataFileWriter::write(UniversalPageId page_id, PageVersion version, const char * data, size_t n)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingRecords, "unexpected write stage {}", magic_enum::enum_name(write_stage));

// Every record is prefixed with the length, so that this data file can be parsed standalone.
writeIntBinary(n, *file_writer);

// TODO: getMaterializedBytes only works for FramedChecksumWriteBuffer, but does not work for a normal WriteBufferFromFile.
// There must be something wrong and should be fixed.
// uint64_t file_offset = file_writer->getMaterializedBytes();
// file_writer->write(data, n);
// uint64_t write_n = file_writer->getMaterializedBytes() - file_offset;

uint64_t file_offset = file_writer->count();
file_writer->write(data, n);
uint64_t write_n = file_writer->count() - file_offset;
RUNTIME_CHECK(write_n == n, write_n, n); // Note: When we add compression later, write_n == n may be false.

auto * suffix_record = file_suffix.add_records();
suffix_record->set_page_id(page_id.asStr());
suffix_record->set_version_sequence(version.sequence);
suffix_record->set_version_epoch(version.epoch);
suffix_record->set_offset_in_file(file_offset);
suffix_record->set_size_in_file(write_n);

return CheckpointLocation{
.data_file_id = file_id,
.offset_in_file = file_offset,
.size_in_file = write_n,
};
}

void CPDataFileWriter::writeSuffix()
{
if (write_stage == WriteStage::WritingFinished)
return; // writeSuffix can be called multiple times without causing issues.
if (write_stage != WriteStage::WritingRecords)
RUNTIME_CHECK_MSG(false, "unexpected write stage {}", magic_enum::enum_name(write_stage));

std::string json;
google::protobuf::util::MessageToJsonString(file_suffix, &json);
writeStringBinary(json, *file_writer);

write_stage = WriteStage::WritingFinished;
}

} // namespace DB::PS::V3
95 changes: 95 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <IO/WriteBufferFromFile.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Storages/Page/V3/CheckpointFile/Proto/data_file.pb.h>
#include <Storages/Page/V3/CheckpointFile/ProtoHelper.h>
#include <Storages/Page/V3/CheckpointFile/fwd.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
#include <Storages/Page/V3/Universal/UniversalPageId.h>
#include <google/protobuf/util/json_util.h>

#include <magic_enum.hpp>
#include <string>

namespace DB::PS::V3
{

class CPDataFileWriter
{
public:
struct Options
{
const std::string & file_path;
const std::string & file_id;
};

static CPDataFileWriterPtr create(Options options)
{
return std::make_unique<CPDataFileWriter>(options);
}

explicit CPDataFileWriter(Options options)
: file_writer(std::make_unique<WriteBufferFromFile>(options.file_path))
, file_id(std::make_shared<std::string>(options.file_id))
{
// TODO: FramedChecksumWriteBuffer does not support random access for arbitrary frame sizes.
// So currently we use checksum = false.
// Need to update FramedChecksumWriteBuffer first.
// TODO: Support compressed data file.
}

~CPDataFileWriter()
{
flush();
}

void writePrefix(const CheckpointProto::DataFilePrefix & prefix);

CheckpointLocation write(UniversalPageId page_id, PageVersion version, const char * data, size_t n);

void writeSuffix();

void flush()
{
file_writer->next();
file_writer->sync();
}

size_t writtenRecords() const
{
return file_suffix.records_size();
}

private:
enum class WriteStage
{
WritingPrefix,
WritingRecords,
WritingFinished,
};

const std::unique_ptr<WriteBufferFromFile> file_writer;
const std::shared_ptr<const std::string> file_id; // Shared in each write result

CheckpointProto::DataFileSuffix file_suffix;
WriteStage write_stage = WriteStage::WritingPrefix;
};

} // namespace DB::PS::V3
131 changes: 131 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>

namespace DB::PS::V3
{

CPFilesWriter::CPFilesWriter(CPFilesWriter::Options options)
: manifest_file_id(options.manifest_file_id)
, data_writer(CPDataFileWriter::create({
.file_path = options.data_file_path,
.file_id = options.data_file_id,
}))
, manifest_writer(CPManifestFileWriter::create({
.file_path = options.manifest_file_path,
}))
, data_source(options.data_source)
, locked_files(options.must_locked_files)
, log(Logger::get())
{
}

void CPFilesWriter::writePrefix(const CPFilesWriter::PrefixInfo & info)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingPrefix, "unexpected write stage {}", magic_enum::enum_name(write_stage));

auto create_at_ms = Poco::Timestamp().epochMicroseconds() / 1000;

CheckpointProto::DataFilePrefix data_prefix;
data_prefix.set_file_format(1);
data_prefix.set_local_sequence(info.sequence);
data_prefix.set_create_at_ms(create_at_ms);
data_prefix.mutable_writer_info()->CopyFrom(info.writer);
data_prefix.set_manifest_file_id(manifest_file_id);
data_prefix.set_sub_file_index(0);
data_writer->writePrefix(data_prefix);

CheckpointProto::ManifestFilePrefix manifest_prefix;
manifest_prefix.set_file_format(1);
manifest_prefix.set_local_sequence(info.sequence);
manifest_prefix.set_last_local_sequence(info.last_sequence);
manifest_prefix.set_create_at_ms(create_at_ms);
manifest_prefix.mutable_writer_info()->CopyFrom(info.writer);
manifest_writer->writePrefix(manifest_prefix);

write_stage = WriteStage::WritingEdits;
}

bool CPFilesWriter::writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edits)
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage));

auto & records = edits.getMutRecords();
if (records.empty())
return false;

// 1. Iterate all edits, find these entry edits without the checkpoint info
// and collect the lock files from applied entries.
for (auto & rec_edit : records)
{
if (rec_edit.type == EditRecordType::VAR_EXTERNAL)
{
RUNTIME_CHECK(
rec_edit.entry.checkpoint_info.has_value() && //
rec_edit.entry.checkpoint_info->data_location.data_file_id && //
!rec_edit.entry.checkpoint_info->data_location.data_file_id->empty());
// for example, the s3 fullpath of external id
locked_files.emplace(*rec_edit.entry.checkpoint_info->data_location.data_file_id);
continue;
}

if (rec_edit.type != EditRecordType::VAR_ENTRY)
continue;

if (rec_edit.entry.checkpoint_info.has_value())
{
// for example, the s3 fullpath that was written in the previous uploaded CheckpointDataFile
locked_files.emplace(*rec_edit.entry.checkpoint_info->data_location.data_file_id);
continue;
}

// 2. For entry edits without the checkpoint info, write them to the data file,
// and assign a new checkpoint info.
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK(page.isValid());
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
rec_edit.entry.checkpoint_info = {
.data_location = data_location,
.is_local_data_reclaimed = false,
};
locked_files.emplace(*data_location.data_file_id);
}

// 3. Write down everything to the manifest.
manifest_writer->writeEdits(edits);

return data_writer->writtenRecords() > 0;
}

void CPFilesWriter::writeSuffix()
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage));

manifest_writer->writeEditsFinish();
manifest_writer->writeLocks(locked_files);
manifest_writer->writeLocksFinish();

data_writer->writeSuffix();
manifest_writer->writeSuffix();

write_stage = WriteStage::WritingFinished;
}

} // namespace DB::PS::V3
Loading

0 comments on commit b29591e

Please sign in to comment.