From 8098958c8714a9ec253242ca22d0338bc853155a Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 23 Feb 2023 19:05:06 +0800 Subject: [PATCH] Add S3 key utils (#6878) ref pingcap/tiflash#6827 --- dbms/src/Flash/FlashService.h | 6 +- dbms/src/Flash/Mpp/MPPTunnel.h | 1 + dbms/src/Flash/Mpp/MPPTunnelSet.h | 1 + dbms/src/Storages/S3/S3Common.cpp | 226 ++++++++++--- dbms/src/Storages/S3/S3Common.h | 56 +++- dbms/src/Storages/S3/S3Filename.cpp | 313 ++++++++++++++++++ dbms/src/Storages/S3/S3Filename.h | 165 +++++++++ .../Storages/S3/tests/gtest_s3filename.cpp | 171 ++++++++++ dbms/src/Storages/Transaction/KVStore.cpp | 2 +- dbms/src/Storages/Transaction/KVStore.h | 2 +- dbms/src/Storages/Transaction/Types.h | 4 + libs/libcommon/include/common/StringRef.h | 2 +- 12 files changed, 891 insertions(+), 58 deletions(-) create mode 100644 dbms/src/Storages/S3/S3Filename.cpp create mode 100644 dbms/src/Storages/S3/S3Filename.h create mode 100644 dbms/src/Storages/S3/tests/gtest_s3filename.cpp diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index c1b38d3d8f5..01b6399de2b 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include @@ -35,6 +35,8 @@ class IServer; class IAsyncCallData; class EstablishCallData; class MockStorage; +class Context; +using ContextPtr = std::shared_ptr; using MockMPPServerInfo = tests::MockMPPServerInfo; namespace Management @@ -114,4 +116,4 @@ class AsyncFlashService final : public tikvpb::Tikv::WithAsyncMethod_EstablishMP /// Return non-OK grpc::Status when the connection can not be established. grpc::Status establishMPPConnectionAsync(grpc::ServerContext * context, const mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data); }; -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 635a4b83f12..c857506901b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index e0a8d4115d2..4f2ff4e7b33 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB { diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index 25decfefc21..84fd12cc25d 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -21,14 +21,18 @@ #include #include #include +#include +#include #include #include #include +#include #include #include #include #include +#include namespace ProfileEvents { @@ -103,10 +107,20 @@ class AWSLogger final : public Aws::Utils::Logging::LogSystemInterface } // namespace -namespace DB +namespace DB::S3 { -namespace S3 + +TiFlashS3Client::TiFlashS3Client( + const String & bucket_name_, + const Aws::Auth::AWSCredentials & credentials, + const Aws::Client::ClientConfiguration & clientConfiguration, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy signPayloads, + bool useVirtualAddressing) + : Aws::S3::S3Client(credentials, clientConfiguration, signPayloads, useVirtualAddressing) + , bucket_name(bucket_name_) { +} + void ClientFactory::init(const StorageS3Config & config_) { config = config_; @@ -130,15 +144,31 @@ ClientFactory & ClientFactory::instance() std::unique_ptr ClientFactory::create() const { - auto schema = parseScheme(config.endpoint); + auto scheme = parseScheme(config.endpoint); return create( config.endpoint, - schema, - schema == Aws::Http::Scheme::HTTPS, + scheme, + scheme == Aws::Http::Scheme::HTTPS, config.access_key_id, config.secret_access_key); } +std::unique_ptr ClientFactory::createWithBucket() const +{ + auto scheme = parseScheme(config.endpoint); + Aws::Client::ClientConfiguration cfg; + cfg.endpointOverride = config.endpoint; + cfg.scheme = scheme; + cfg.verifySSL = scheme == Aws::Http::Scheme::HTTPS; + Aws::Auth::AWSCredentials cred(config.access_key_id, config.secret_access_key); + return std::make_unique( + config.bucket, + cred, + cfg, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + /*useVirtualAddressing*/ true); +} + std::unique_ptr ClientFactory::create( const String & endpoint, Aws::Http::Scheme scheme, @@ -168,6 +198,18 @@ bool isNotFoundError(Aws::S3::S3Errors error) return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY; } +namespace details +{ +template +Exception fromS3Error(const Aws::S3::S3Error & e, const std::string & fmt, Args &&... args) +{ + return DB::Exception( + ErrorCodes::S3_ERROR, + fmt + fmt::format(" s3error={} s3msg={}", magic_enum::enum_name(e.GetErrorType()), e.GetMessage()), + args...); +} +} // namespace details + Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id) { ProfileEvents::increment(ProfileEvents::S3HeadObject); @@ -192,11 +234,7 @@ S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bu } else if (throw_on_error) { - const auto & error = outcome.GetError(); - throw DB::Exception(ErrorCodes::S3_ERROR, - "Failed to HEAD object: {}. HTTP response code: {}", - error.GetMessage(), - static_cast(error.GetResponseCode())); + throw details::fromS3Error(outcome.GetError(), "Failed to HEAD object, key={}", key); } return {}; } @@ -218,30 +256,40 @@ bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const { return false; } - throw Exception(ErrorCodes::S3_ERROR, - "Failed to check existence of key {} in bucket {}: {}", - key, - bucket, - error.GetMessage()); + throw details::fromS3Error(outcome.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key); +} + +void uploadEmptyFile(const Aws::S3::S3Client & client, const String & bucket, const String & key) +{ + Stopwatch sw; + Aws::S3::Model::PutObjectRequest req; + req.WithBucket(bucket).WithKey(key); + req.SetContentType("binary/octet-stream"); + auto istr = Aws::MakeShared("EmptyObjectInputStream", "", std::ios_base::in | std::ios_base::binary); + req.SetBody(istr); + auto result = client.PutObject(req); + if (!result.IsSuccess()) + { + throw details::fromS3Error(result.GetError(), "S3 PutEmptyObject failed, bucket={} key={}", bucket, key); + } + static auto * log = &Poco::Logger::get("S3UploadFile"); + LOG_DEBUG(log, "remote_fname={}, cost={}ms", key, sw.elapsedMilliseconds()); } void uploadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname) { Stopwatch sw; Aws::S3::Model::PutObjectRequest req; - req.SetBucket(bucket); - req.SetKey(remote_fname); + req.WithBucket(bucket).WithKey(remote_fname); + req.SetContentType("binary/octet-stream"); auto istr = Aws::MakeShared("PutObjectInputStream", local_fname, std::ios_base::in | std::ios_base::binary); req.SetBody(istr); - req.SetContentType("binary/octet-stream"); auto result = client.PutObject(req); - RUNTIME_CHECK_MSG(result.IsSuccess(), - "S3 PutObject failed, local_fname={}, remote_fname={}, exception={}, message={}", - local_fname, - remote_fname, - result.GetError().GetExceptionName(), - result.GetError().GetMessage()); - static auto * log = &Poco::Logger::get("S3PutObject"); + if (!result.IsSuccess()) + { + throw details::fromS3Error(result.GetError(), "S3 PutObject failed, local_fname={} bucket={} key={}", local_fname, bucket, remote_fname); + } + static auto log = Logger::get(); LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds()); } @@ -252,42 +300,118 @@ void downloadFile(const Aws::S3::S3Client & client, const String & bucket, const req.SetBucket(bucket); req.SetKey(remote_fname); auto result = client.GetObject(req); - RUNTIME_CHECK_MSG(result.IsSuccess(), - "S3 GetObject failed, local_fname={}, remote_fname={}, exception={}, message={}", - local_fname, - remote_fname, - result.GetError().GetExceptionName(), - result.GetError().GetMessage()); + if (!result.IsSuccess()) + { + throw details::fromS3Error(result.GetError(), "S3 GetObject failed, local_fname={} bucket={} key={}", local_fname, bucket, remote_fname); + } Aws::OFStream ostr(local_fname, std::ios_base::out | std::ios_base::binary); ostr << result.GetResult().GetBody().rdbuf(); - static auto * log = &Poco::Logger::get("S3GetObject"); + static auto log = Logger::get(); LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds()); } -std::unordered_map listPrefix(const Aws::S3::S3Client & client, const String & bucket, const String & prefix) +void listPrefix( + const Aws::S3::S3Client & client, + const String & bucket, + const String & prefix, + std::function pager) +{ + // Usually we don't need to set delimiter. + // Check the docs here for Delimiter && CommonPrefixes when you really need it. + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html + listPrefix(client, bucket, prefix, /*delimiter*/ "", pager); +} + +void listPrefix( + const Aws::S3::S3Client & client, + const String & bucket, + const String & prefix, + std::string_view delimiter, + std::function pager) { Stopwatch sw; - Aws::S3::Model::ListObjectsRequest req; - req.SetBucket(bucket); - req.SetPrefix(prefix); - auto result = client.ListObjects(req); - RUNTIME_CHECK_MSG(result.IsSuccess(), - "S3 ListObjects failed, prefix={}, exception={}, message={}", - prefix, - result.GetError().GetExceptionName(), - result.GetError().GetMessage()); - const auto & objects = result.GetResult().GetContents(); - std::unordered_map keys_with_size; - keys_with_size.reserve(objects.size()); - for (const auto & object : objects) + Aws::S3::Model::ListObjectsV2Request req; + req.WithBucket(bucket).WithPrefix(prefix); + if (!delimiter.empty()) + { + req.SetDelimiter(String(delimiter)); + } + + static auto log = Logger::get("S3ListPrefix"); + + bool done = false; + size_t num_keys = 0; + while (!done) { - keys_with_size.emplace(object.GetKey().substr(prefix.size()), object.GetSize()); // Cut prefix + auto outcome = client.ListObjectsV2(req); + if (!outcome.IsSuccess()) + { + throw details::fromS3Error(outcome.GetError(), "S3 ListObjects failed, bucket={} prefix={} delimiter={}", bucket, prefix, delimiter); + } + + const auto & result = outcome.GetResult(); + PageResult page_res = pager(result); + num_keys += page_res.num_keys; + + // handle the result size over max size + done = !result.GetIsTruncated(); + if (!done && page_res.more) + { + const auto & next_token = result.GetNextContinuationToken(); + req.SetContinuationToken(next_token); + LOG_DEBUG(log, "prefix={}, keys={}, total_keys={}, next_token={}", prefix, page_res.num_keys, num_keys, next_token); + } } - static auto * log = &Poco::Logger::get("S3ListObjects"); - LOG_DEBUG(log, "prefix={}, keys={}, cost={}", prefix, keys_with_size, sw.elapsedMilliseconds()); + LOG_DEBUG(log, "prefix={}, total_keys={}, cost={}", prefix, num_keys, sw.elapsedMilliseconds()); +} + +std::unordered_map listPrefixWithSize(const Aws::S3::S3Client & client, const String & bucket, const String & prefix) +{ + std::unordered_map keys_with_size; + listPrefix(client, bucket, prefix, "", [&](const Aws::S3::Model::ListObjectsV2Result & result) { + const auto & objects = result.GetContents(); + keys_with_size.reserve(keys_with_size.size() + objects.size()); + for (const auto & object : objects) + { + keys_with_size.emplace(object.GetKey().substr(prefix.size()), object.GetSize()); // Cut prefix + } + return PageResult{.num_keys = objects.size(), .more = true}; + }); + return keys_with_size; } -} // namespace S3 +std::pair tryGetObjectModifiedTime( + const Aws::S3::S3Client & client, + const String & bucket, + const String & key) +{ + auto o = headObject(client, bucket, key); + if (!o.IsSuccess()) + { + if (const auto & err = o.GetError(); isNotFoundError(err.GetErrorType())) + { + return {false, {}}; + } + throw details::fromS3Error(o.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key); + } + // Else the object still exist + const auto & res = o.GetResult(); + // "DeleteMark" of S3 service, don't know what will lead to this + RUNTIME_CHECK(!res.GetDeleteMarker(), bucket, key); + return {true, res.GetLastModified()}; +} + +void deleteObject(const Aws::S3::S3Client & client, const String & bucket, const String & key) +{ + Aws::S3::Model::DeleteObjectRequest req; + req.SetBucket(bucket); + req.SetKey(key); + auto o = client.DeleteObject(req); + RUNTIME_CHECK(o.IsSuccess(), o.GetError().GetMessage()); + const auto & res = o.GetResult(); + // "DeleteMark" of S3 service, don't know what will lead to this + RUNTIME_CHECK(!res.GetDeleteMarker(), bucket, key); +} -} // namespace DB +} // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h index b4d2bb824ea..df8698f4a79 100644 --- a/dbms/src/Storages/S3/S3Common.h +++ b/dbms/src/Storages/S3/S3Common.h @@ -25,6 +25,26 @@ namespace DB::S3 { +class TiFlashS3Client : public Aws::S3::S3Client +{ +public: + // Usually one tiflash instance only need access one bucket. + // Store the bucket name to simpilfy some param passing. + + TiFlashS3Client( + const String & bucket_name_, + const Aws::Auth::AWSCredentials & credentials, + const Aws::Client::ClientConfiguration & clientConfiguration, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy signPayloads, + bool useVirtualAddressing); + + const String & bucket() { return bucket_name; } + +private: + const String bucket_name; +}; + + class ClientFactory { public: @@ -36,6 +56,8 @@ class ClientFactory void shutdown(); std::unique_ptr create() const; + std::unique_ptr createWithBucket() const; + static std::unique_ptr create( const String & endpoint, Aws::Http::Scheme scheme, @@ -71,7 +93,37 @@ bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const void uploadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname); +void uploadEmptyFile(const Aws::S3::S3Client & client, const String & bucket, const String & key); + void downloadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname); -std::unordered_map listPrefix(const Aws::S3::S3Client & client, const String & bucket, const String & prefix); -} // namespace DB::S3 \ No newline at end of file +struct PageResult +{ + size_t num_keys; + // true - continue to call next `LIST` when available + // false - stop `LIST` + bool more; +}; +void listPrefix( + const Aws::S3::S3Client & client, + const String & bucket, + const String & prefix, + std::function pager); +void listPrefix( + const Aws::S3::S3Client & client, + const String & bucket, + const String & prefix, + std::string_view delimiter, + std::function pager); + +std::unordered_map listPrefixWithSize(const Aws::S3::S3Client & client, const String & bucket, const String & prefix); + + +std::pair tryGetObjectModifiedTime( + const Aws::S3::S3Client & client, + const String & bucket, + const String & key); + +void deleteObject(const Aws::S3::S3Client & client, const String & bucket, const String & key); + +} // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3Filename.cpp b/dbms/src/Storages/S3/S3Filename.cpp new file mode 100644 index 00000000000..bec35a2e886 --- /dev/null +++ b/dbms/src/Storages/S3/S3Filename.cpp @@ -0,0 +1,313 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB::S3 +{ + +//==== Serialize/Deserialize ====// + +namespace details +{ + +/// parsing LockFile +const static re2::RE2 rgx_lock("^lock/s(?P[0-9]+)/(?P.+)$"); +const static re2::RE2 rgx_lock_suffix(".lock_s(?P[0-9]+)_(?P[0-9]+)"); + +/// parsing CheckpointManifest or DataFile +const static re2::RE2 rgx_store_prefix("^s(?P[0-9]+)/$"); +const static re2::RE2 rgx_data_or_manifest("^s(?P[0-9]+)/(data|manifest)/(?P.+)$"); +const static re2::RE2 rgx_subpath_manifest("mf_(?P[0-9]+)"); + +constexpr static std::string_view DELMARK_SUFFIX = ".del"; + +// clang-format off + +constexpr static std::string_view fmt_store_prefix = "s{store_id}/"; +constexpr static std::string_view fmt_manifest_prefix = "s{store_id}/manifest/"; +constexpr static std::string_view fmt_manifest = "s{store_id}/manifest/{subpath}"; +constexpr static std::string_view fmt_subpath_manifest = "mf_{seq}"; + +constexpr static std::string_view fmt_datafile_prefix = "s{store_id}/data/"; +constexpr static std::string_view fmt_data_file = "s{store_id}/data/{subpath}"; +constexpr static std::string_view fmt_subpath_checkpoint_data = "dat_{seq}_{index}"; +constexpr static std::string_view fmt_subpath_dtfile = "t_{table_id}/dmf_{id}"; +constexpr static std::string_view fmt_subpath_keyspace_dtfile = "ks_{keyspace}_t_{table_id}/dmf_{id}"; + +// lock prefix for all stores +constexpr static std::string_view fmt_lock_prefix = "lock/"; +// lock prefix for a specific data file +constexpr static std::string_view fmt_lock_datafile_prefix = "lock/s{store_id}/{subpath}.lock_"; +constexpr static std::string_view fmt_lock_file = "lock/s{store_id}/{subpath}.lock_s{lock_store}_{lock_seq}"; + +// clang-format on + + +String toFullKey(const S3FilenameType type, const StoreID store_id, const std::string_view data_subpath) +{ + switch (type) + { + case S3FilenameType::DataFile: + return fmt::format(fmt_data_file, fmt::arg("store_id", store_id), fmt::arg("subpath", data_subpath)); + case S3FilenameType::CheckpointManifest: + return fmt::format(fmt_manifest, fmt::arg("store_id", store_id), fmt::arg("subpath", data_subpath)); + case S3FilenameType::StorePrefix: + return fmt::format(fmt_store_prefix, fmt::arg("store_id", store_id)); + default: + throw Exception(fmt::format("Not support type! type={}", magic_enum::enum_name(type))); + } + __builtin_unreachable(); +} + +} // namespace details + +String S3FilenameView::toFullKey() const +{ + return details::toFullKey(type, store_id, data_subpath); +} + +String S3Filename::toFullKey() const +{ + return details::toFullKey(type, store_id, data_subpath); +} + +String S3Filename::toManifestPrefix() const +{ + RUNTIME_CHECK(type == S3FilenameType::StorePrefix); + return fmt::format(details::fmt_manifest_prefix, fmt::arg("store_id", store_id)); +} + +String S3Filename::toDataPrefix() const +{ + RUNTIME_CHECK(type == S3FilenameType::StorePrefix); + return fmt::format(details::fmt_datafile_prefix, fmt::arg("store_id", store_id)); +} + +String S3Filename::getLockPrefix() +{ + return String(details::fmt_lock_prefix); +} + +S3FilenameView S3FilenameView::fromKey(const std::string_view fullpath) +{ + S3FilenameView res{.type = S3FilenameType::Invalid}; + re2::StringPiece fullpath_sp{fullpath.data(), fullpath.size()}; + re2::StringPiece type_view, datafile_path; + // lock/s${store_id}/${data_subpath}.lock_s${lock_store_id}_${lock_seq} + if (startsWith(fullpath, "lock/")) + { + if (!re2::RE2::FullMatch(fullpath_sp, details::rgx_lock, &res.store_id, &datafile_path)) + return res; + + const auto lock_start_npos = datafile_path.find(".lock_"); + if (lock_start_npos == re2::StringPiece::npos) + { + res.type = S3FilenameType::Invalid; + return res; + } + + // ${data_subpath}.lock_s${lock_store_id}_${lock_seq} + if (datafile_path.starts_with("dat_") || datafile_path.starts_with("t_") || datafile_path.starts_with("ks_")) + res.type = S3FilenameType::LockFile; + else + { + res.type = S3FilenameType::Invalid; + return res; + } + // .lock_s${lock_store_id}_${lock_seq} + res.lock_suffix = std::string_view(datafile_path.begin() + lock_start_npos, datafile_path.size() - lock_start_npos); + datafile_path.remove_suffix(res.lock_suffix.size()); + res.data_subpath = std::string_view(datafile_path.data(), datafile_path.size()); + return res; + } + + if (!re2::RE2::FullMatch(fullpath_sp, details::rgx_data_or_manifest, &res.store_id, &type_view, &datafile_path)) + return res; // invalid + + if (type_view == "manifest") + res.type = S3FilenameType::CheckpointManifest; + else if (type_view == "data") + { + bool is_delmark = datafile_path.ends_with(re2::StringPiece(details::DELMARK_SUFFIX.data(), details::DELMARK_SUFFIX.size())); + if (is_delmark) + { + datafile_path.remove_suffix(details::DELMARK_SUFFIX.size()); + res.type = S3FilenameType::DelMark; + } + else + { + if (datafile_path.starts_with("dat_")) + { + // "dat_${upload_seq}_${idx}" + res.type = S3FilenameType::DataFile; + } + else if (datafile_path.starts_with("t_") || datafile_path.starts_with("ks_")) + { + // "t_${table_id}/dmf_${id}" + // "ks_${table_id}/dmf_${id}" + res.type = S3FilenameType::DataFile; + } + else + { + res.type = S3FilenameType::Invalid; + } + } + } + res.data_subpath = std::string_view(datafile_path.data(), datafile_path.size()); + return res; +} + +S3FilenameView S3FilenameView::fromStoreKeyPrefix(const std::string_view prefix) +{ + S3FilenameView res{.type = S3FilenameType::Invalid}; + re2::StringPiece prefix_sp{prefix.data(), prefix.size()}; + if (!re2::RE2::FullMatch(prefix_sp, details::rgx_store_prefix, &res.store_id)) + return res; + + res.type = S3FilenameType::StorePrefix; + return res; +} + +//==== Data file utils ====// + +String S3FilenameView::getLockPrefix() const +{ + RUNTIME_CHECK(isDataFile()); + return fmt::format(details::fmt_lock_datafile_prefix, fmt::arg("store_id", store_id), fmt::arg("subpath", data_subpath)); +} + +String S3FilenameView::getLockKey(StoreID lock_store_id, UInt64 lock_seq) const +{ + RUNTIME_CHECK(isDataFile()); + return fmt::format( + details::fmt_lock_file, + fmt::arg("store_id", store_id), + fmt::arg("subpath", data_subpath), + fmt::arg("lock_store", lock_store_id), + fmt::arg("lock_seq", lock_seq)); +} + +String S3FilenameView::getDelMarkKey() const +{ + switch (type) + { + case S3FilenameType::DataFile: + return fmt::format(details::fmt_data_file, fmt::arg("store_id", store_id), fmt::arg("subpath", data_subpath)) + String(details::DELMARK_SUFFIX); + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupport type: {}", magic_enum::enum_name(type)); + } + __builtin_unreachable(); +} + +UInt64 S3FilenameView::getUploadSequence() const +{ + UInt64 upload_seq = 0; + switch (type) + { + case S3FilenameType::CheckpointManifest: + { + re2::StringPiece path_sp{data_subpath.data(), data_subpath.size()}; + if (!re2::RE2::FullMatch(path_sp, details::rgx_subpath_manifest, &upload_seq)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid {}, path={}", magic_enum::enum_name(type), data_subpath); + return upload_seq; + } + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupport type: {}", magic_enum::enum_name(type)); + } + __builtin_unreachable(); +} + +//==== Lock file utils ====// + +S3FilenameView S3FilenameView::asDataFile() const +{ + switch (type) + { + case S3FilenameType::LockFile: + case S3FilenameType::DelMark: + return S3FilenameView{.type = S3FilenameType::DataFile, .store_id = store_id, .data_subpath = data_subpath}; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupport type: {}", magic_enum::enum_name(type)); + } + __builtin_unreachable(); +} + +S3FilenameView::LockInfo S3FilenameView::getLockInfo() const +{ + LockInfo lock_info; + switch (type) + { + case S3FilenameType::LockFile: + { + RUNTIME_CHECK(!lock_suffix.empty()); + re2::StringPiece lock_suffix_sp{lock_suffix.data(), lock_suffix.size()}; + if (!re2::RE2::FullMatch(lock_suffix_sp, details::rgx_lock_suffix, &lock_info.store_id, &lock_info.sequence)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid {}, lock_suffix={}", magic_enum::enum_name(type), lock_suffix); + return lock_info; + } + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupport type: {}", magic_enum::enum_name(type)); + } + __builtin_unreachable(); +} + +//==== Generate S3 key from raw parts ====// + +S3Filename S3Filename::fromStoreId(StoreID store_id) +{ + return S3Filename{ + .type = S3FilenameType::StorePrefix, + .store_id = store_id, + }; +} + +S3Filename S3Filename::fromDMFileOID(const DMFileOID & oid) +{ + // TODO: adapt with keyspace + UNUSED(details::fmt_subpath_keyspace_dtfile); + return S3Filename{ + .type = S3FilenameType::DataFile, + .store_id = oid.store_id, + .data_subpath = fmt::format(details::fmt_subpath_dtfile, fmt::arg("table_id", oid.table_id), fmt::arg("id", oid.file_id)), + }; +} + +S3Filename S3Filename::newCheckpointData(StoreID store_id, UInt64 upload_seq, UInt64 file_idx) +{ + return S3Filename{ + .type = S3FilenameType::DataFile, + .store_id = store_id, + .data_subpath = fmt::format(details::fmt_subpath_checkpoint_data, fmt::arg("seq", upload_seq), fmt::arg("index", file_idx)), + }; +} + +S3Filename S3Filename::newCheckpointManifest(StoreID store_id, UInt64 upload_seq) +{ + return S3Filename{ + .type = S3FilenameType::CheckpointManifest, + .store_id = store_id, + .data_subpath = fmt::format(details::fmt_subpath_manifest, fmt::arg("seq", upload_seq)), + }; +} + +} // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3Filename.h b/dbms/src/Storages/S3/S3Filename.h new file mode 100644 index 00000000000..4545d2057a3 --- /dev/null +++ b/dbms/src/Storages/S3/S3Filename.h @@ -0,0 +1,165 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include + +namespace DB::S3 +{ + +struct DMFileOID +{ + StoreID store_id = 0; + Int64 table_id = 0; + UInt64 file_id = 0; +}; + +enum class S3FilenameType +{ + Invalid, + CheckpointManifest, + DataFile, // StableDTFile or CheckpointDataFile, + LockFile, // Its key point to a DataFile that is hold by a tiflash node + DelMark, // Its key point to a DataFile that is marked as deleted + StorePrefix, // The key prefix for a store +}; + +/// S3Filename and S3FilenameView are utils class for generating the S3 object key +/// from a local file or parsing the info from S3 object key. They define how objects +/// are organized on S3. +/// +/// Specifically, there are 4 kinds of objects stored on S3 +/// - CheckpointManifest +/// - DataFile, including CheckpointDataFile and DTFile in the Stable layer +/// - LockFile, its key point to a DataFile that is held by a tiflash node +/// - DelMark, its key point to a DataFile that is marked as deleted +/// +/// CheckpointManifest are stored with a store_id prefix: "s${store_id}/manifest/mf_${upload_seq}". +/// +/// DataFile are also stored with a store_id prefix: "s${store_id}/data/${data_subpath}" +/// - for CheckpointDataFile, data_subpath is "dat_${upload_seq}_{dat_index}" +/// - for StableDTFile, data_subpath is "t_${table_id}/dmf_${dmf_id}" +/// - for StableDTFile under keyspace, data_subpath is "ks_${ks_id}_t_${table_id}/dmf_${dmf_id}" +/// However, thus DataFile is generated and upload to S3 by the ${store_id}, it could be shared +/// by multiple TiFlash instances. We need LockFile to specify the DataFile is being used. +/// +/// The pattern of LockFile is "lock/s${store_id}/${data_subpath}.lock_s${lock_store}_${lock_seq}" +/// All LockFiles are store with the same prefix "lock/s${store_id}/". So that we can use +/// less S3 LIST request for the S3 data GC. +/// +/// Delmark are stored along with its DataFile: "s${store_id}/data/${data_subpath}.del". +/// It is a mark that will be upload during S3 data GC, it means the file is not held by any +/// TiFlash instance. + + +// Usually use for parsing key from S3 Object. And it can also generate related keys to the object. +// Note that it is only a "view". Caller should ensure the key is valid during this view's lifetime. +struct S3FilenameView +{ + S3FilenameType type{S3FilenameType::Invalid}; + StoreID store_id{0}; + std::string_view data_subpath; + std::string_view lock_suffix; + + String toFullKey() const; + + /// CheckpointDataFile/StableFile utils /// + + ALWAYS_INLINE bool isDataFile() const { return type == S3FilenameType::DataFile; } + // Return the lock key prefix for finding any locks on this data file through `S3::LIST` + String getLockPrefix() const; + // Return the lock key for writing lock file on S3 + String getLockKey(StoreID lock_store_id, UInt64 lock_seq) const; + // Return the delmark key for writing delmark file on S3 + String getDelMarkKey() const; + + /// CheckpointManifest/CheckpointDataFile utils /// + + // Get the upload sequence from Manifest S3 key. + // Exception will be throw if the key is invalid or the type is not + // one of CheckpointManifest + UInt64 getUploadSequence() const; + + /// LockFile utils /// + + ALWAYS_INLINE inline bool isLockFile() const { return type == S3FilenameType::LockFile; } + struct LockInfo + { + StoreID store_id{0}; + UInt64 sequence{0}; + }; + LockInfo getLockInfo() const; + S3FilenameView asDataFile() const; + + /// DelMark + + ALWAYS_INLINE bool isDelMark() const { return type == S3FilenameType::DelMark; } + +public: + // The result return a view from the `fullpath`. + // If parsing from a raw char ptr, do NOT create a temporary String object. + static S3FilenameView fromKey(std::string_view fullpath); + + static S3FilenameView fromStoreKeyPrefix(std::string_view prefix); +}; + +// Use for generating the S3 object key +struct S3Filename +{ + S3FilenameType type{S3FilenameType::Invalid}; + StoreID store_id{0}; + String data_subpath; + + static S3Filename fromStoreId(StoreID store_id); + static S3Filename fromDMFileOID(const DMFileOID & oid); + static S3Filename newCheckpointData(StoreID store_id, UInt64 upload_seq, UInt64 file_idx); + static S3Filename newCheckpointManifest(StoreID store_id, UInt64 upload_seq); + + String toFullKey() const; + + String toManifestPrefix() const; + + String toDataPrefix() const; + + static String getLockPrefix(); + + S3FilenameView toView() const + { + return S3FilenameView{ + .type = type, + .store_id = store_id, + .data_subpath = data_subpath, + }; + } +}; + +} // namespace DB::S3 + +template <> +struct fmt::formatter +{ + static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } + + template + auto format(const DB::S3::DMFileOID & value, FormatContext & ctx) const -> decltype(ctx.out()) + { + return format_to(ctx.out(), "{}_{}_{}", value.store_id, value.table_id, value.file_id); + } +}; diff --git a/dbms/src/Storages/S3/tests/gtest_s3filename.cpp b/dbms/src/Storages/S3/tests/gtest_s3filename.cpp new file mode 100644 index 00000000000..b5e7a9d350d --- /dev/null +++ b/dbms/src/Storages/S3/tests/gtest_s3filename.cpp @@ -0,0 +1,171 @@ + +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +namespace DB::S3::tests +{ +TEST(S3FilenameTest, Manifest) +{ + UInt64 test_store_id = 1027; + UInt64 test_seq = 20; + String fullkey = "s1027/manifest/mf_20"; + auto check = [&](const S3FilenameView & view) { + ASSERT_EQ(view.type, S3FilenameType::CheckpointManifest) << magic_enum::enum_name(view.type); + ASSERT_EQ(view.store_id, test_store_id); + ASSERT_EQ(view.data_subpath, "mf_20"); + + ASSERT_EQ(view.toFullKey(), fullkey); + + ASSERT_FALSE(view.isDataFile()); + ASSERT_FALSE(view.isLockFile()); + + ASSERT_EQ(view.getUploadSequence(), test_seq); + }; + + auto view = S3FilenameView::fromKey(fullkey); + check(view); + + { + auto r = S3Filename::newCheckpointManifest(test_store_id, test_seq); + ASSERT_EQ(r.toFullKey(), fullkey); + check(r.toView()); + } +} + +TEST(S3FilenameTest, CheckpointDataFile) +{ + UInt64 test_store_id = 2077; + UInt64 test_seq = 99; + UInt64 test_file_idx = 1; + String fullkey = "s2077/data/dat_99_1"; + auto check = [&](const S3FilenameView & view) { + ASSERT_EQ(view.type, S3FilenameType::DataFile) << magic_enum::enum_name(view.type); + ASSERT_EQ(view.store_id, test_store_id); + ASSERT_EQ(view.data_subpath, "dat_99_1"); + + ASSERT_EQ(view.toFullKey(), fullkey); + + ASSERT_TRUE(view.isDataFile()); + ASSERT_EQ(view.getLockKey(1234, 50), "lock/s2077/dat_99_1.lock_s1234_50"); + ASSERT_EQ(view.getLockPrefix(), "lock/s2077/dat_99_1.lock_"); // prefix for S3 LIST + ASSERT_EQ(view.getDelMarkKey(), "s2077/data/dat_99_1.del"); + // ASSERT_EQ(view.getUploadSequence(), test_seq); not used + + ASSERT_FALSE(view.isLockFile()); + + // test lockkey for checkpoint data file + const auto lockkey = view.getLockKey(1234, 50); + const auto lock_view = S3FilenameView::fromKey(lockkey); + ASSERT_EQ(lock_view.type, S3FilenameType::LockFile) << magic_enum::enum_name(view.type); + ASSERT_EQ(lock_view.store_id, test_store_id); + ASSERT_EQ(String(lock_view.data_subpath), "dat_99_1"); + + ASSERT_FALSE(lock_view.isDataFile()); + ASSERT_TRUE(lock_view.isLockFile()); + const auto lock_info = lock_view.getLockInfo(); + ASSERT_EQ(lock_info.store_id, 1234); + ASSERT_EQ(lock_info.sequence, 50); + + // test delmark + auto delmark_view = S3FilenameView::fromKey(view.getDelMarkKey()); + ASSERT_TRUE(delmark_view.isDelMark()); + }; + + auto view = S3FilenameView::fromKey(fullkey); + check(view); + + auto r = S3Filename::newCheckpointData(test_store_id, test_seq, test_file_idx); + ASSERT_EQ(r.toFullKey(), fullkey); + check(r.toView()); +} + +TEST(S3FilenameTest, StableFile) +{ + UInt64 test_store_id = 2077; + String fullkey = "s2077/data/t_44/dmf_57"; + auto check = [&](const S3FilenameView & view) { + ASSERT_EQ(view.type, S3FilenameType::DataFile) << magic_enum::enum_name(view.type); + ASSERT_EQ(view.store_id, test_store_id); + ASSERT_EQ(view.data_subpath, "t_44/dmf_57"); + + ASSERT_EQ(view.toFullKey(), fullkey); + + ASSERT_TRUE(view.isDataFile()); + ASSERT_EQ(view.getLockKey(1234, 50), "lock/s2077/t_44/dmf_57.lock_s1234_50"); + ASSERT_EQ(view.getLockPrefix(), "lock/s2077/t_44/dmf_57.lock_"); // prefix for S3 LIST + ASSERT_EQ(view.getDelMarkKey(), "s2077/data/t_44/dmf_57.del"); + + ASSERT_FALSE(view.isLockFile()); + + // test lockkey for stable file + const auto lockkey = view.getLockKey(1234, 50); + const auto lock_view = S3FilenameView::fromKey(lockkey); + ASSERT_EQ(lock_view.type, S3FilenameType::LockFile) << magic_enum::enum_name(view.type); + ASSERT_EQ(lock_view.store_id, test_store_id); + ASSERT_EQ(String(lock_view.data_subpath), "t_44/dmf_57"); + + ASSERT_FALSE(lock_view.isDataFile()); + ASSERT_TRUE(lock_view.isLockFile()); + const auto lock_info = lock_view.getLockInfo(); + ASSERT_EQ(lock_info.store_id, 1234); + ASSERT_EQ(lock_info.sequence, 50); + + // test delmark + auto delmark_view = S3FilenameView::fromKey(view.getDelMarkKey()); + ASSERT_TRUE(delmark_view.isDelMark()); + }; + auto view = S3FilenameView::fromKey(fullkey); + check(view); + + DMFileOID oid{.store_id = test_store_id, .table_id = 44, .file_id = 57}; + auto r = S3Filename::fromDMFileOID(oid); + ASSERT_EQ(r.toFullKey(), fullkey); + check(r.toView()); +} + +TEST(S3FilenameTest, StorePrefix) +{ + { + auto r = S3FilenameView::fromStoreKeyPrefix("s5/"); + ASSERT_EQ(r.type, S3FilenameType::StorePrefix); + ASSERT_EQ(r.store_id, 5); + ASSERT_EQ(r.toFullKey(), "s5/"); + } + { + auto r = S3Filename::fromStoreId(5); + ASSERT_EQ(r.toFullKey(), "s5/"); + ASSERT_EQ(r.toManifestPrefix(), "s5/manifest/"); + ASSERT_EQ(r.toDataPrefix(), "s5/data/"); + } + + { + auto r = S3FilenameView::fromStoreKeyPrefix("s1024/"); + ASSERT_EQ(r.type, S3FilenameType::StorePrefix); + ASSERT_EQ(r.store_id, 1024); + ASSERT_EQ(r.toFullKey(), "s1024/"); + } + { + auto r = S3Filename::fromStoreId(1024); + ASSERT_EQ(r.toFullKey(), "s1024/"); + ASSERT_EQ(r.toManifestPrefix(), "s1024/manifest/"); + ASSERT_EQ(r.toDataPrefix(), "s1024/data/"); + } +} + +} // namespace DB::S3::tests diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 72ec8b70726..48738cb31af 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -828,7 +828,7 @@ void KVStore::setStore(metapb::Store store_) LOG_INFO(log, "Set store info {}", getStore().base.ShortDebugString()); } -uint64_t KVStore::getStoreID(std::memory_order memory_order) const +StoreID KVStore::getStoreID(std::memory_order memory_order) const { return getStore().store_id.load(memory_order); } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index ac8aa285a20..9b3f3286695 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -141,7 +141,7 @@ class KVStore final : private boost::noncopyable void setStore(metapb::Store); // May return 0 if uninitialized - uint64_t getStoreID(std::memory_order = std::memory_order_relaxed) const; + StoreID getStoreID(std::memory_order = std::memory_order_relaxed) const; BatchReadIndexRes batchReadIndex(const std::vector & req, uint64_t timeout_ms) const; diff --git a/dbms/src/Storages/Transaction/Types.h b/dbms/src/Storages/Transaction/Types.h index 59622321510..9bb338b2d33 100644 --- a/dbms/src/Storages/Transaction/Types.h +++ b/dbms/src/Storages/Transaction/Types.h @@ -15,12 +15,16 @@ #pragma once #include +#include #include #include namespace DB { +using StoreID = UInt64; +static constexpr StoreID InvalidStoreID = 0; + using TableID = Int64; using TableIDSet = std::unordered_set; diff --git a/libs/libcommon/include/common/StringRef.h b/libs/libcommon/include/common/StringRef.h index d03bd643786..5852a1bd0f6 100644 --- a/libs/libcommon/include/common/StringRef.h +++ b/libs/libcommon/include/common/StringRef.h @@ -60,7 +60,7 @@ struct StringRef : data(s.data()) , size(s.size()) {} - constexpr explicit StringRef(std::string_view s) + constexpr StringRef(std::string_view s) // NOLINT(google-explicit-constructor) : data(s.data()) , size(s.size()) {}