Skip to content

Commit

Permalink
Register s3gc manager to TMTContext
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang committed Feb 25, 2023
1 parent 71f379a commit 7dbf01a
Show file tree
Hide file tree
Showing 15 changed files with 227 additions and 18 deletions.
72 changes: 72 additions & 0 deletions dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Disaggregated/S3LockClient.h>
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3Filename.h>

namespace DB::S3
{

class MockS3LockClient : public IS3LockClient
{
public:
explicit MockS3LockClient(std::shared_ptr<TiFlashS3Client> c)
: s3_client(std::move(c))
{
}

std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64) override
{
auto view = S3FilenameView::fromKey(data_file_key);
if (!objectExists(*s3_client, s3_client->bucket(), data_file_key))
{
return {false, ""};
}
auto delmark_key = view.getDelMarkKey();
if (objectExists(*s3_client, s3_client->bucket(), delmark_key))
{
return {false, ""};
}
uploadEmptyFile(*s3_client, s3_client->bucket(), view.getLockKey(lock_store_id, lock_seq));
return {true, ""};
}

std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64) override
{
auto view = S3FilenameView::fromKey(data_file_key);
auto lock_prefix = view.getLockPrefix();
bool any_lock_exist = false;
listPrefix(*s3_client, s3_client->bucket(), lock_prefix, [&any_lock_exist](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult {
if (!result.GetContents().empty())
any_lock_exist = true;
return S3::PageResult{.num_keys = result.GetContents().size(), .more = false};
});
if (any_lock_exist)
{
return {false, ""};
}
uploadEmptyFile(*s3_client, s3_client->bucket(), view.getDelMarkKey());
return {true, ""};
}

private:
std::shared_ptr<TiFlashS3Client> s3_client;
};

} // namespace DB::S3
56 changes: 56 additions & 0 deletions dbms/src/Flash/Disaggregated/S3LockClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/Transaction/Types.h>
#include <common/types.h>

namespace DB
{
class Logger;
using LoggerPtr = std::shared_ptr<Logger>;
} // namespace DB

namespace DB::S3
{

class IS3LockClient;
using S3LockClientPtr = std::shared_ptr<IS3LockClient>;

class IS3LockClient
{
public:
virtual ~IS3LockClient() = default;

// Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq`
// If the file is locked successfully, return <true, "">
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
virtual std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) = 0;

// Try mark the `data_file_key` as deleted
// If the file is marked as deleted, return <true, "">
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
virtual std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) = 0;
};

} // namespace DB::S3
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
#include <common/types.h>

#include <stack>
#include "magic_enum.hpp"

#include <magic_enum.hpp>

#ifdef FIU_ENABLE
#include <Common/randomSeed.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/Remote/RemoteDataLocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ struct RemoteDataLocation
UInt64 size_in_file;
};

} // namespace DB::PS::V3::Remote
} // namespace DB::Remote
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class UniversalPageStorage final

friend class PageReaderImplUniversal;

// private: // TODO: make these private
// private: // TODO: make these private
String storage_name; // Identify between different Storage
PSDiskDelegatorPtr delegator; // Get paths for storing data
PageStorageConfig config;
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/S3/CheckpointManifestS3Set.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Storages/S3/CheckpointManifestS3Set.h>
#include <Storages/S3/S3Filename.h>
Expand Down
16 changes: 15 additions & 1 deletion dbms/src/Storages/S3/CheckpointManifestS3Set.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/Types.h>
Expand Down Expand Up @@ -46,4 +60,4 @@ class CheckpointManifestS3Set
std::map<UInt64, CheckpointManifestS3Object> manifests;
};

} // namespace DB::Remote
} // namespace DB::S3
5 changes: 5 additions & 0 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ TiFlashS3Client::TiFlashS3Client(
{
}

bool ClientFactory::isEnabled() const
{
return config.isS3Enabled();
}

void ClientFactory::init(const StorageS3Config & config_)
{
config = config_;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/S3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class ClientFactory

static ClientFactory & instance();

bool isEnabled() const;

void init(const StorageS3Config & config_);
void shutdown();
std::unique_ptr<Aws::S3::S3Client> create() const;
Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Storages/S3/S3GCManager.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@ bool S3GCManager::runOnAllStores()
{
const std::vector<UInt64> all_store_ids = getAllStoreIds();
LOG_TRACE(log, "all_store_ids: {}", all_store_ids);
// TODO: Get all store status from pd after getting the store ids from S3.
for (const auto gc_store_id : all_store_ids)
{
runForStore(gc_store_id);
Expand Down Expand Up @@ -311,22 +312,21 @@ String S3GCManager::getTemporaryDownloadFile(String s3_key)
return fmt::format("{}/{}_{}", config.temp_path, s3_key, std::hash<std::thread::id>()(std::this_thread::get_id()));
}

S3GCManagerService::S3GCManagerService(Context & context, Int64 interval_seconds)
S3GCManagerService::S3GCManagerService(
Context & context,
S3LockClientPtr lock_client,
const S3GCConfig & config)
: global_ctx(context.getGlobalContext())
{
S3GCConfig config;
config.temp_path = global_ctx.getTemporaryPath();

auto s3_client = S3::ClientFactory::instance().createWithBucket();
S3LockClientPtr lock_client; // TODO: get lock_client from TMTContext
manager = std::make_unique<S3GCManager>(s3_client, lock_client, config);
manager = std::make_unique<S3GCManager>(std::move(s3_client), std::move(lock_client), config);

timer = global_ctx.getBackgroundPool().addTask(
[this]() {
return manager->runOnAllStores();
},
false,
/*interval_ms*/ interval_seconds * 1000);
/*interval_ms*/ config.interval_seconds * 1000);
}

S3GCManagerService::~S3GCManagerService()
Expand Down
14 changes: 10 additions & 4 deletions dbms/src/Storages/S3/S3GCManager.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,8 +48,13 @@ struct S3FilenameView;
class IS3LockClient;
using S3LockClientPtr = std::shared_ptr<IS3LockClient>;

// fwd
class S3GCManagerService;
using S3GCManagerServicePtr = std::unique_ptr<S3GCManagerService>;

struct S3GCConfig
{
Int64 interval_seconds = 600;
Int64 manifest_expired_hour = 1;
Int64 delmark_expired_hour = 1;

Expand All @@ -65,9 +70,11 @@ class S3GCManager
public:
explicit S3GCManager(std::shared_ptr<TiFlashS3Client> client_, S3LockClientPtr lock_client_, S3GCConfig config_);

~S3GCManager() = default;

bool runOnAllStores();

// private:
// private:
void runForStore(UInt64 gc_store_id);

void cleanUnusedLocks(
Expand Down Expand Up @@ -108,14 +115,13 @@ class S3GCManager
class S3GCManagerService
{
public:
explicit S3GCManagerService(Context & context, Int64 interval_seconds);
explicit S3GCManagerService(Context & context, S3LockClientPtr lock_client, const S3GCConfig & config);
~S3GCManagerService();

private:
Context & global_ctx;
std::unique_ptr<S3GCManager> manager;
BackgroundProcessingPool::TaskHandle timer;
};
using S3GCManagerServicePtr = std::unique_ptr<S3GCManagerService>;

} // namespace DB::S3
14 changes: 14 additions & 0 deletions dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Flash/Disaggregated/MockS3LockClient.h>
#include <Storages/S3/CheckpointManifestS3Set.h>
Expand Down
16 changes: 15 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <Flash/Mpp/MinTSOScheduler.h>
#include <Interpreters/Context.h>
#include <Server/RaftConfigParser.h>
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3GCManager.h>
#include <Storages/Transaction/BackgroundService.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/RegionExecutionResult.h>
Expand Down Expand Up @@ -85,7 +87,19 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config
, wait_index_timeout_ms(DEFAULT_WAIT_INDEX_TIMEOUT_MS)
, read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS)
, wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC)
{}
{
if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled())
{
#if 0
// TODO depends on S3LockClient
S3::S3GCConfig gc_config;
gc_config.temp_path = context.getTemporaryPath();
s3gc_manager = std::make_unique<S3::S3GCManagerService>(context, s3_lock_client, gc_config);
#endif
}
}

TMTContext::~TMTContext() = default;

void TMTContext::updateSecurityConfig(const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config)
{
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ struct TiFlashRaftConfig;
// `share` the resource of cluster.
using KVClusterPtr = std::shared_ptr<pingcap::kv::Cluster>;

namespace S3
{
class IS3LockClient;
using S3LockClientPtr = std::shared_ptr<IS3LockClient>;
class S3GCManagerService;
using S3GCManagerServicePtr = std::unique_ptr<S3GCManagerService>;
} // namespace S3

class TMTContext : private boost::noncopyable
{
public:
Expand Down Expand Up @@ -84,6 +92,7 @@ class TMTContext : private boost::noncopyable
explicit TMTContext(Context & context_,
const TiFlashRaftConfig & raft_config,
const pingcap::ClusterConfig & cluster_config_);
~TMTContext();

SchemaSyncerPtr getSchemaSyncer() const;

Expand Down Expand Up @@ -130,6 +139,8 @@ class TMTContext : private boost::noncopyable

KVClusterPtr cluster;

S3::S3GCManagerServicePtr s3gc_manager;

mutable std::mutex mutex;

std::atomic<StoreStatus> store_status{StoreStatus::Idle};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/MockS3Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MockS3Client final : public S3::TiFlashS3Client
Aws::S3::Model::DeleteObjectOutcome DeleteObject(const Aws::S3::Model::DeleteObjectRequest & r) const override;
mutable Strings delete_keys;

Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(const Aws::S3::Model::ListObjectsV2Request &r) const override;
Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(const Aws::S3::Model::ListObjectsV2Request & r) const override;
mutable Strings list_result;

std::optional<Aws::Utils::DateTime> head_result_mtime;
Expand Down

0 comments on commit 7dbf01a

Please sign in to comment.