From 5a63caca117e2d167ce30214fb83cc074db49a3a Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Sun, 26 Feb 2023 01:13:27 +0800 Subject: [PATCH] Add mock s3 lock client Signed-off-by: JaySon-Huang --- .../Flash/Disaggregated/MockS3LockClient.h | 72 +++++++++++++++++++ dbms/src/Flash/Disaggregated/S3LockClient.cpp | 11 +-- dbms/src/Flash/Disaggregated/S3LockClient.h | 37 ++++++++-- .../src/Flash/Disaggregated/S3LockService.cpp | 3 +- dbms/src/Flash/Disaggregated/S3LockService.h | 4 +- .../tests/gtest_s3_lock_service.cpp | 2 +- dbms/src/Storages/S3/S3Common.cpp | 9 ++- dbms/src/Storages/S3/S3Common.h | 6 +- dbms/src/Storages/Transaction/TMTContext.cpp | 7 +- dbms/src/Storages/Transaction/TMTContext.h | 9 +++ 10 files changed, 141 insertions(+), 19 deletions(-) create mode 100644 dbms/src/Flash/Disaggregated/MockS3LockClient.h diff --git a/dbms/src/Flash/Disaggregated/MockS3LockClient.h b/dbms/src/Flash/Disaggregated/MockS3LockClient.h new file mode 100644 index 00000000000..320eda7d98a --- /dev/null +++ b/dbms/src/Flash/Disaggregated/MockS3LockClient.h @@ -0,0 +1,72 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB::S3 +{ + +class MockS3LockClient : public IS3LockClient +{ +public: + explicit MockS3LockClient(std::shared_ptr c) + : s3_client(std::move(c)) + { + } + + std::pair + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64) override + { + auto view = S3FilenameView::fromKey(data_file_key); + if (!objectExists(*s3_client, s3_client->bucket(), data_file_key)) + { + return {false, ""}; + } + auto delmark_key = view.getDelMarkKey(); + if (objectExists(*s3_client, s3_client->bucket(), delmark_key)) + { + return {false, ""}; + } + uploadEmptyFile(*s3_client, s3_client->bucket(), view.getLockKey(lock_store_id, lock_seq)); + return {true, ""}; + } + + std::pair + sendTryMarkDeleteRequest(const String & data_file_key, Int64) override + { + auto view = S3FilenameView::fromKey(data_file_key); + auto lock_prefix = view.getLockPrefix(); + bool any_lock_exist = false; + listPrefix(*s3_client, s3_client->bucket(), lock_prefix, [&any_lock_exist](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult { + if (!result.GetContents().empty()) + any_lock_exist = true; + return S3::PageResult{.num_keys = result.GetContents().size(), .more = false}; + }); + if (any_lock_exist) + { + return {false, ""}; + } + uploadEmptyFile(*s3_client, s3_client->bucket(), view.getDelMarkKey()); + return {true, ""}; + } + +private: + std::shared_ptr s3_client; +}; + +} // namespace DB::S3 diff --git a/dbms/src/Flash/Disaggregated/S3LockClient.cpp b/dbms/src/Flash/Disaggregated/S3LockClient.cpp index 835b490f49b..42f1ba033e4 100644 --- a/dbms/src/Flash/Disaggregated/S3LockClient.cpp +++ b/dbms/src/Flash/Disaggregated/S3LockClient.cpp @@ -76,12 +76,13 @@ extern const int TIMEOUT_EXCEEDED; namespace DB::S3 { -S3LockClient::S3LockClient(Context & context_) - : log(Logger::get()) +S3LockClient::S3LockClient( + pingcap::kv::Cluster * kv_cluster_, + OwnerManagerPtr s3gc_owner_) + : kv_cluster(kv_cluster_) + , s3gc_owner(s3gc_owner_) + , log(Logger::get()) { - auto & tmt = context_.getTMTContext(); - kv_cluster = tmt.getKVCluster(); - s3gc_owner = tmt.getS3GCOwnerManager(); } std::pair diff --git a/dbms/src/Flash/Disaggregated/S3LockClient.h b/dbms/src/Flash/Disaggregated/S3LockClient.h index 9ba60d02f38..569e5950220 100644 --- a/dbms/src/Flash/Disaggregated/S3LockClient.h +++ b/dbms/src/Flash/Disaggregated/S3LockClient.h @@ -30,10 +30,39 @@ using LoggerPtr = std::shared_ptr; namespace DB::S3 { -class S3LockClient +class IS3LockClient; +using S3LockClientPtr = std::shared_ptr; + +class IS3LockClient +{ +public: + virtual ~IS3LockClient() = default; + + // Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq` + // If the file is locked successfully, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + virtual std::pair + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) = 0; + + // Try mark the `data_file_key` as deleted + // If the file is marked as deleted, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + virtual std::pair + sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) = 0; +}; + +class S3LockClient : public IS3LockClient { public: - explicit S3LockClient(Context & context_); + explicit S3LockClient( + pingcap::kv::Cluster * kv_cluster_, + OwnerManagerPtr s3gc_owner_); // Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq` // If the file is locked successfully, return @@ -42,7 +71,7 @@ class S3LockClient // If deadline exceed or failed to get the owner info within // `timeour_s`, it will throw exception. std::pair - sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s); + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) override; // Try mark the `data_file_key` as deleted // If the file is marked as deleted, return @@ -51,7 +80,7 @@ class S3LockClient // If deadline exceed or failed to get the owner info within // `timeour_s`, it will throw exception. std::pair - sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s); + sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) override; private: template diff --git a/dbms/src/Flash/Disaggregated/S3LockService.cpp b/dbms/src/Flash/Disaggregated/S3LockService.cpp index 181c9973603..799549720b3 100644 --- a/dbms/src/Flash/Disaggregated/S3LockService.cpp +++ b/dbms/src/Flash/Disaggregated/S3LockService.cpp @@ -47,7 +47,7 @@ S3LockService::S3LockService(Context & context_) { } -S3LockService::S3LockService(OwnerManagerPtr owner_mgr_, std::unique_ptr && s3_cli_) +S3LockService::S3LockService(OwnerManagerPtr owner_mgr_, std::shared_ptr && s3_cli_) : gc_owner(std::move(owner_mgr_)) , s3_client(std::move(s3_cli_)) , log(Logger::get()) @@ -251,7 +251,6 @@ std::optional S3LockService::anyLockExist(const String & lock_prefix) co *s3_client, s3_client->bucket(), lock_prefix, - "", [&lock_key](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult { const auto & contents = result.GetContents(); if (!contents.empty()) diff --git a/dbms/src/Flash/Disaggregated/S3LockService.h b/dbms/src/Flash/Disaggregated/S3LockService.h index dd694fce860..ce413cf1c86 100644 --- a/dbms/src/Flash/Disaggregated/S3LockService.h +++ b/dbms/src/Flash/Disaggregated/S3LockService.h @@ -54,7 +54,7 @@ class S3LockService final : private boost::noncopyable public: explicit S3LockService(Context & context_); - S3LockService(OwnerManagerPtr owner_mgr_, std::unique_ptr && s3_cli_); + S3LockService(OwnerManagerPtr owner_mgr_, std::shared_ptr && s3_cli_); ~S3LockService() = default; @@ -107,7 +107,7 @@ class S3LockService final : private boost::noncopyable std::mutex file_latch_map_mutex; OwnerManagerPtr gc_owner; - const std::unique_ptr s3_client; + const std::shared_ptr s3_client; LoggerPtr log; }; diff --git a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp index 257c23b1288..3410d42a71b 100644 --- a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp +++ b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp @@ -83,7 +83,7 @@ class S3LockServiceTest std::shared_ptr owner_manager; std::unique_ptr s3_lock_service; - std::unique_ptr s3_client; + std::shared_ptr s3_client; const UInt64 store_id = 1; const Int64 physical_table_id = 1; UInt64 dm_file_id = 1; diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index 95f2c37c20f..e5b8cbb0e28 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -121,6 +121,11 @@ TiFlashS3Client::TiFlashS3Client( { } +bool ClientFactory::isEnabled() const +{ + return config.isS3Enabled(); +} + void ClientFactory::init(const StorageS3Config & config_) { config = config_; @@ -195,7 +200,7 @@ std::unique_ptr ClientFactory::create(const StorageS3Config & } } -std::unique_ptr ClientFactory::createWithBucket() const +std::shared_ptr ClientFactory::createWithBucket() const { auto scheme = parseScheme(config.endpoint); Aws::Client::ClientConfiguration cfg; @@ -203,7 +208,7 @@ std::unique_ptr ClientFactory::createWithBucket() const cfg.scheme = scheme; cfg.verifySSL = scheme == Aws::Http::Scheme::HTTPS; Aws::Auth::AWSCredentials cred(config.access_key_id, config.secret_access_key); - return std::make_unique( + return std::make_shared( config.bucket, cred, cfg, diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h index 655a7bd6bc4..63848f0ec47 100644 --- a/dbms/src/Storages/S3/S3Common.h +++ b/dbms/src/Storages/S3/S3Common.h @@ -38,7 +38,7 @@ class TiFlashS3Client : public Aws::S3::S3Client Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy signPayloads, bool useVirtualAddressing); - const String & bucket() { return bucket_name; } + const String & bucket() const { return bucket_name; } private: const String bucket_name; @@ -52,13 +52,15 @@ class ClientFactory static ClientFactory & instance(); + bool isEnabled() const; + void init(const StorageS3Config & config_); void shutdown(); const String & bucket() const; std::shared_ptr sharedClient() const; - std::unique_ptr createWithBucket() const; + std::shared_ptr createWithBucket() const; private: ClientFactory() = default; diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index ed99919f83b..3c6ccabe4f0 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -13,11 +13,13 @@ // limitations under the License. #include +#include #include #include #include #include #include +#include #include #include #include @@ -90,13 +92,16 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config , read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS) , wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC) { - if (!raft_config.pd_addrs.empty()) + if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled()) { etcd_client = Etcd::Client::create(cluster->pd_client, cluster_config); s3gc_owner = OwnerManager::createS3GCOwner(context, /*id*/ raft_config.flash_server_addr, etcd_client); + s3_lock_client = std::make_shared(cluster.get(), s3gc_owner); } } +TMTContext::~TMTContext() = default; + void TMTContext::updateSecurityConfig(const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config) { if (!raft_config.pd_addrs.empty()) diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index d51fb4b4bbe..818d0841e22 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -55,6 +55,13 @@ using ClientPtr = std::shared_ptr; } // namespace Etcd class OwnerManager; using OwnerManagerPtr = std::shared_ptr; +namespace S3 +{ +class IS3LockClient; +using S3LockClientPtr = std::shared_ptr; +class S3GCManagerService; +using S3GCManagerServicePtr = std::unique_ptr; +} // namespace S3 class TMTContext : private boost::noncopyable { @@ -92,6 +99,7 @@ class TMTContext : private boost::noncopyable explicit TMTContext(Context & context_, const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config_); + ~TMTContext(); SchemaSyncerPtr getSchemaSyncer() const; @@ -142,6 +150,7 @@ class TMTContext : private boost::noncopyable Etcd::ClientPtr etcd_client; OwnerManagerPtr s3gc_owner; + S3::S3LockClientPtr s3_lock_client; mutable std::mutex mutex;