diff --git a/dbms/src/Flash/Disaggregated/MockS3LockClient.h b/dbms/src/Flash/Disaggregated/MockS3LockClient.h new file mode 100644 index 00000000000..320eda7d98a --- /dev/null +++ b/dbms/src/Flash/Disaggregated/MockS3LockClient.h @@ -0,0 +1,72 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB::S3 +{ + +class MockS3LockClient : public IS3LockClient +{ +public: + explicit MockS3LockClient(std::shared_ptr c) + : s3_client(std::move(c)) + { + } + + std::pair + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64) override + { + auto view = S3FilenameView::fromKey(data_file_key); + if (!objectExists(*s3_client, s3_client->bucket(), data_file_key)) + { + return {false, ""}; + } + auto delmark_key = view.getDelMarkKey(); + if (objectExists(*s3_client, s3_client->bucket(), delmark_key)) + { + return {false, ""}; + } + uploadEmptyFile(*s3_client, s3_client->bucket(), view.getLockKey(lock_store_id, lock_seq)); + return {true, ""}; + } + + std::pair + sendTryMarkDeleteRequest(const String & data_file_key, Int64) override + { + auto view = S3FilenameView::fromKey(data_file_key); + auto lock_prefix = view.getLockPrefix(); + bool any_lock_exist = false; + listPrefix(*s3_client, s3_client->bucket(), lock_prefix, [&any_lock_exist](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult { + if (!result.GetContents().empty()) + any_lock_exist = true; + return S3::PageResult{.num_keys = result.GetContents().size(), .more = false}; + }); + if (any_lock_exist) + { + return {false, ""}; + } + uploadEmptyFile(*s3_client, s3_client->bucket(), view.getDelMarkKey()); + return {true, ""}; + } + +private: + std::shared_ptr s3_client; +}; + +} // namespace DB::S3 diff --git a/dbms/src/Flash/Disaggregated/S3LockClient.h b/dbms/src/Flash/Disaggregated/S3LockClient.h new file mode 100644 index 00000000000..800f38cc748 --- /dev/null +++ b/dbms/src/Flash/Disaggregated/S3LockClient.h @@ -0,0 +1,56 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class Logger; +using LoggerPtr = std::shared_ptr; +} // namespace DB + +namespace DB::S3 +{ + +class IS3LockClient; +using S3LockClientPtr = std::shared_ptr; + +class IS3LockClient +{ +public: + virtual ~IS3LockClient() = default; + + // Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq` + // If the file is locked successfully, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + virtual std::pair + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) = 0; + + // Try mark the `data_file_key` as deleted + // If the file is marked as deleted, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + virtual std::pair + sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) = 0; +}; + +} // namespace DB::S3 diff --git a/dbms/src/Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.cpp b/dbms/src/Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.cpp index 24eed97b437..93e8b6f6e69 100644 --- a/dbms/src/Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.cpp +++ b/dbms/src/Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.cpp @@ -20,7 +20,8 @@ #include #include -#include "magic_enum.hpp" + +#include #ifdef FIU_ENABLE #include diff --git a/dbms/src/Storages/Page/V3/Remote/RemoteDataLocation.h b/dbms/src/Storages/Page/V3/Remote/RemoteDataLocation.h index 12dec332a7a..dcd806b845c 100644 --- a/dbms/src/Storages/Page/V3/Remote/RemoteDataLocation.h +++ b/dbms/src/Storages/Page/V3/Remote/RemoteDataLocation.h @@ -10,4 +10,4 @@ struct RemoteDataLocation UInt64 size_in_file; }; -} // namespace DB::PS::V3::Remote +} // namespace DB::Remote diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h index 49be892e6f1..32cae5174c7 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h @@ -130,7 +130,7 @@ class UniversalPageStorage final friend class PageReaderImplUniversal; -// private: // TODO: make these private + // private: // TODO: make these private String storage_name; // Identify between different Storage PSDiskDelegatorPtr delegator; // Get paths for storing data PageStorageConfig config; diff --git a/dbms/src/Storages/S3/CheckpointManifestS3Set.cpp b/dbms/src/Storages/S3/CheckpointManifestS3Set.cpp index d9f7a39c4fd..002881c8a37 100644 --- a/dbms/src/Storages/S3/CheckpointManifestS3Set.cpp +++ b/dbms/src/Storages/S3/CheckpointManifestS3Set.cpp @@ -1,3 +1,17 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include #include #include diff --git a/dbms/src/Storages/S3/CheckpointManifestS3Set.h b/dbms/src/Storages/S3/CheckpointManifestS3Set.h index 41f1b36de2d..670c4e82ce5 100644 --- a/dbms/src/Storages/S3/CheckpointManifestS3Set.h +++ b/dbms/src/Storages/S3/CheckpointManifestS3Set.h @@ -1,3 +1,17 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #pragma once #include @@ -46,4 +60,4 @@ class CheckpointManifestS3Set std::map manifests; }; -} // namespace DB::Remote +} // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index 84fd12cc25d..7623adf0b41 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -121,6 +121,11 @@ TiFlashS3Client::TiFlashS3Client( { } +bool ClientFactory::isEnabled() const +{ + return config.isS3Enabled(); +} + void ClientFactory::init(const StorageS3Config & config_) { config = config_; diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h index 5eda4a793dd..0d57661d4a6 100644 --- a/dbms/src/Storages/S3/S3Common.h +++ b/dbms/src/Storages/S3/S3Common.h @@ -56,6 +56,8 @@ class ClientFactory static ClientFactory & instance(); + bool isEnabled() const; + void init(const StorageS3Config & config_); void shutdown(); std::unique_ptr create() const; diff --git a/dbms/src/Storages/S3/S3GCManager.cpp b/dbms/src/Storages/S3/S3GCManager.cpp index 40f54c70b1c..88763f2506f 100644 --- a/dbms/src/Storages/S3/S3GCManager.cpp +++ b/dbms/src/Storages/S3/S3GCManager.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 PingCAP, Ltd. +// Copyright 2023 PingCAP, Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ bool S3GCManager::runOnAllStores() { const std::vector all_store_ids = getAllStoreIds(); LOG_TRACE(log, "all_store_ids: {}", all_store_ids); + // TODO: Get all store status from pd after getting the store ids from S3. for (const auto gc_store_id : all_store_ids) { runForStore(gc_store_id); @@ -311,22 +312,21 @@ String S3GCManager::getTemporaryDownloadFile(String s3_key) return fmt::format("{}/{}_{}", config.temp_path, s3_key, std::hash()(std::this_thread::get_id())); } -S3GCManagerService::S3GCManagerService(Context & context, Int64 interval_seconds) +S3GCManagerService::S3GCManagerService( + Context & context, + S3LockClientPtr lock_client, + const S3GCConfig & config) : global_ctx(context.getGlobalContext()) { - S3GCConfig config; - config.temp_path = global_ctx.getTemporaryPath(); - auto s3_client = S3::ClientFactory::instance().createWithBucket(); - S3LockClientPtr lock_client; // TODO: get lock_client from TMTContext - manager = std::make_unique(s3_client, lock_client, config); + manager = std::make_unique(std::move(s3_client), std::move(lock_client), config); timer = global_ctx.getBackgroundPool().addTask( [this]() { return manager->runOnAllStores(); }, false, - /*interval_ms*/ interval_seconds * 1000); + /*interval_ms*/ config.interval_seconds * 1000); } S3GCManagerService::~S3GCManagerService() diff --git a/dbms/src/Storages/S3/S3GCManager.h b/dbms/src/Storages/S3/S3GCManager.h index 747526823c0..593900474f7 100644 --- a/dbms/src/Storages/S3/S3GCManager.h +++ b/dbms/src/Storages/S3/S3GCManager.h @@ -1,4 +1,4 @@ -// Copyright 2022 PingCAP, Ltd. +// Copyright 2023 PingCAP, Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,8 +48,13 @@ struct S3FilenameView; class IS3LockClient; using S3LockClientPtr = std::shared_ptr; +// fwd +class S3GCManagerService; +using S3GCManagerServicePtr = std::unique_ptr; + struct S3GCConfig { + Int64 interval_seconds = 600; Int64 manifest_expired_hour = 1; Int64 delmark_expired_hour = 1; @@ -65,9 +70,11 @@ class S3GCManager public: explicit S3GCManager(std::shared_ptr client_, S3LockClientPtr lock_client_, S3GCConfig config_); + ~S3GCManager() = default; + bool runOnAllStores(); -// private: + // private: void runForStore(UInt64 gc_store_id); void cleanUnusedLocks( @@ -108,7 +115,7 @@ class S3GCManager class S3GCManagerService { public: - explicit S3GCManagerService(Context & context, Int64 interval_seconds); + explicit S3GCManagerService(Context & context, S3LockClientPtr lock_client, const S3GCConfig & config); ~S3GCManagerService(); private: @@ -116,6 +123,5 @@ class S3GCManagerService std::unique_ptr manager; BackgroundProcessingPool::TaskHandle timer; }; -using S3GCManagerServicePtr = std::unique_ptr; } // namespace DB::S3 diff --git a/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp b/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp index 279022257e9..ec52aeece82 100644 --- a/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp +++ b/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp @@ -1,3 +1,17 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include #include #include diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index d37b8a46fb1..58adb67328a 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include #include #include @@ -85,7 +87,19 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config , wait_index_timeout_ms(DEFAULT_WAIT_INDEX_TIMEOUT_MS) , read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS) , wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC) -{} +{ + if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled()) + { +#if 0 + // TODO depends on S3LockClient + S3::S3GCConfig gc_config; + gc_config.temp_path = context.getTemporaryPath(); + s3gc_manager = std::make_unique(context, s3_lock_client, gc_config); +#endif + } +} + +TMTContext::~TMTContext() = default; void TMTContext::updateSecurityConfig(const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config) { diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index 4441d1be2f7..6c3781dfda8 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -48,6 +48,14 @@ struct TiFlashRaftConfig; // `share` the resource of cluster. using KVClusterPtr = std::shared_ptr; +namespace S3 +{ +class IS3LockClient; +using S3LockClientPtr = std::shared_ptr; +class S3GCManagerService; +using S3GCManagerServicePtr = std::unique_ptr; +} // namespace S3 + class TMTContext : private boost::noncopyable { public: @@ -84,6 +92,7 @@ class TMTContext : private boost::noncopyable explicit TMTContext(Context & context_, const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config_); + ~TMTContext(); SchemaSyncerPtr getSchemaSyncer() const; @@ -130,6 +139,8 @@ class TMTContext : private boost::noncopyable KVClusterPtr cluster; + S3::S3GCManagerServicePtr s3gc_manager; + mutable std::mutex mutex; std::atomic store_status{StoreStatus::Idle}; diff --git a/dbms/src/TestUtils/MockS3Client.h b/dbms/src/TestUtils/MockS3Client.h index efdbf605669..bfca6320bad 100644 --- a/dbms/src/TestUtils/MockS3Client.h +++ b/dbms/src/TestUtils/MockS3Client.h @@ -23,7 +23,7 @@ class MockS3Client final : public S3::TiFlashS3Client Aws::S3::Model::DeleteObjectOutcome DeleteObject(const Aws::S3::Model::DeleteObjectRequest & r) const override; mutable Strings delete_keys; - Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(const Aws::S3::Model::ListObjectsV2Request &r) const override; + Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(const Aws::S3::Model::ListObjectsV2Request & r) const override; mutable Strings list_result; std::optional head_result_mtime;