Skip to content

Commit

Permalink
Add mock s3 lock client
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang committed Feb 27, 2023
1 parent 0f8f3d8 commit 5a63cac
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 19 deletions.
72 changes: 72 additions & 0 deletions dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Disaggregated/S3LockClient.h>
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3Filename.h>

namespace DB::S3
{

class MockS3LockClient : public IS3LockClient
{
public:
explicit MockS3LockClient(std::shared_ptr<TiFlashS3Client> c)
: s3_client(std::move(c))
{
}

std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64) override
{
auto view = S3FilenameView::fromKey(data_file_key);
if (!objectExists(*s3_client, s3_client->bucket(), data_file_key))
{
return {false, ""};
}
auto delmark_key = view.getDelMarkKey();
if (objectExists(*s3_client, s3_client->bucket(), delmark_key))
{
return {false, ""};
}
uploadEmptyFile(*s3_client, s3_client->bucket(), view.getLockKey(lock_store_id, lock_seq));
return {true, ""};
}

std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64) override
{
auto view = S3FilenameView::fromKey(data_file_key);
auto lock_prefix = view.getLockPrefix();
bool any_lock_exist = false;
listPrefix(*s3_client, s3_client->bucket(), lock_prefix, [&any_lock_exist](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult {
if (!result.GetContents().empty())
any_lock_exist = true;
return S3::PageResult{.num_keys = result.GetContents().size(), .more = false};
});
if (any_lock_exist)
{
return {false, ""};
}
uploadEmptyFile(*s3_client, s3_client->bucket(), view.getDelMarkKey());
return {true, ""};
}

private:
std::shared_ptr<TiFlashS3Client> s3_client;
};

} // namespace DB::S3
11 changes: 6 additions & 5 deletions dbms/src/Flash/Disaggregated/S3LockClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ extern const int TIMEOUT_EXCEEDED;

namespace DB::S3
{
S3LockClient::S3LockClient(Context & context_)
: log(Logger::get())
S3LockClient::S3LockClient(
pingcap::kv::Cluster * kv_cluster_,
OwnerManagerPtr s3gc_owner_)
: kv_cluster(kv_cluster_)
, s3gc_owner(s3gc_owner_)
, log(Logger::get())
{
auto & tmt = context_.getTMTContext();
kv_cluster = tmt.getKVCluster();
s3gc_owner = tmt.getS3GCOwnerManager();
}

std::pair<bool, String>
Expand Down
37 changes: 33 additions & 4 deletions dbms/src/Flash/Disaggregated/S3LockClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,39 @@ using LoggerPtr = std::shared_ptr<Logger>;
namespace DB::S3
{

class S3LockClient
class IS3LockClient;
using S3LockClientPtr = std::shared_ptr<IS3LockClient>;

class IS3LockClient
{
public:
virtual ~IS3LockClient() = default;

// Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq`
// If the file is locked successfully, return <true, "">
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
virtual std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) = 0;

// Try mark the `data_file_key` as deleted
// If the file is marked as deleted, return <true, "">
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
virtual std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) = 0;
};

class S3LockClient : public IS3LockClient
{
public:
explicit S3LockClient(Context & context_);
explicit S3LockClient(
pingcap::kv::Cluster * kv_cluster_,
OwnerManagerPtr s3gc_owner_);

// Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq`
// If the file is locked successfully, return <true, "">
Expand All @@ -42,7 +71,7 @@ class S3LockClient
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s);
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) override;

// Try mark the `data_file_key` as deleted
// If the file is marked as deleted, return <true, "">
Expand All @@ -51,7 +80,7 @@ class S3LockClient
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s);
sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) override;

private:
template <typename Response, typename Request, typename SendRpc>
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Disaggregated/S3LockService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ S3LockService::S3LockService(Context & context_)
{
}

S3LockService::S3LockService(OwnerManagerPtr owner_mgr_, std::unique_ptr<TiFlashS3Client> && s3_cli_)
S3LockService::S3LockService(OwnerManagerPtr owner_mgr_, std::shared_ptr<TiFlashS3Client> && s3_cli_)
: gc_owner(std::move(owner_mgr_))
, s3_client(std::move(s3_cli_))
, log(Logger::get())
Expand Down Expand Up @@ -251,7 +251,6 @@ std::optional<String> S3LockService::anyLockExist(const String & lock_prefix) co
*s3_client,
s3_client->bucket(),
lock_prefix,
"",
[&lock_key](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult {
const auto & contents = result.GetContents();
if (!contents.empty())
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Disaggregated/S3LockService.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class S3LockService final : private boost::noncopyable
public:
explicit S3LockService(Context & context_);

S3LockService(OwnerManagerPtr owner_mgr_, std::unique_ptr<TiFlashS3Client> && s3_cli_);
S3LockService(OwnerManagerPtr owner_mgr_, std::shared_ptr<TiFlashS3Client> && s3_cli_);

~S3LockService() = default;

Expand Down Expand Up @@ -107,7 +107,7 @@ class S3LockService final : private boost::noncopyable
std::mutex file_latch_map_mutex;

OwnerManagerPtr gc_owner;
const std::unique_ptr<TiFlashS3Client> s3_client;
const std::shared_ptr<TiFlashS3Client> s3_client;

LoggerPtr log;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class S3LockServiceTest
std::shared_ptr<MockOwnerManager> owner_manager;
std::unique_ptr<DB::S3::S3LockService> s3_lock_service;

std::unique_ptr<S3::TiFlashS3Client> s3_client;
std::shared_ptr<S3::TiFlashS3Client> s3_client;
const UInt64 store_id = 1;
const Int64 physical_table_id = 1;
UInt64 dm_file_id = 1;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ TiFlashS3Client::TiFlashS3Client(
{
}

bool ClientFactory::isEnabled() const
{
return config.isS3Enabled();
}

void ClientFactory::init(const StorageS3Config & config_)
{
config = config_;
Expand Down Expand Up @@ -195,15 +200,15 @@ std::unique_ptr<Aws::S3::S3Client> ClientFactory::create(const StorageS3Config &
}
}

std::unique_ptr<TiFlashS3Client> ClientFactory::createWithBucket() const
std::shared_ptr<TiFlashS3Client> ClientFactory::createWithBucket() const
{
auto scheme = parseScheme(config.endpoint);
Aws::Client::ClientConfiguration cfg;
cfg.endpointOverride = config.endpoint;
cfg.scheme = scheme;
cfg.verifySSL = scheme == Aws::Http::Scheme::HTTPS;
Aws::Auth::AWSCredentials cred(config.access_key_id, config.secret_access_key);
return std::make_unique<TiFlashS3Client>(
return std::make_shared<TiFlashS3Client>(
config.bucket,
cred,
cfg,
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/S3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TiFlashS3Client : public Aws::S3::S3Client
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy signPayloads,
bool useVirtualAddressing);

const String & bucket() { return bucket_name; }
const String & bucket() const { return bucket_name; }

private:
const String bucket_name;
Expand All @@ -52,13 +52,15 @@ class ClientFactory

static ClientFactory & instance();

bool isEnabled() const;

void init(const StorageS3Config & config_);
void shutdown();

const String & bucket() const;
std::shared_ptr<Aws::S3::S3Client> sharedClient() const;

std::unique_ptr<TiFlashS3Client> createWithBucket() const;
std::shared_ptr<TiFlashS3Client> createWithBucket() const;

private:
ClientFactory() = default;
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
// limitations under the License.

#include <Common/DNSCache.h>
#include <Flash/Disaggregated/S3LockClient.h>
#include <Flash/Mpp/MPPHandler.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Flash/Mpp/MinTSOScheduler.h>
#include <Interpreters/Context.h>
#include <Server/RaftConfigParser.h>
#include <Storages/S3/S3Common.h>
#include <Storages/Transaction/BackgroundService.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/RegionExecutionResult.h>
Expand Down Expand Up @@ -90,13 +92,16 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config
, read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS)
, wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC)
{
if (!raft_config.pd_addrs.empty())
if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled())
{
etcd_client = Etcd::Client::create(cluster->pd_client, cluster_config);
s3gc_owner = OwnerManager::createS3GCOwner(context, /*id*/ raft_config.flash_server_addr, etcd_client);
s3_lock_client = std::make_shared<S3::S3LockClient>(cluster.get(), s3gc_owner);
}
}

TMTContext::~TMTContext() = default;

void TMTContext::updateSecurityConfig(const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config)
{
if (!raft_config.pd_addrs.empty())
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ using ClientPtr = std::shared_ptr<Client>;
} // namespace Etcd
class OwnerManager;
using OwnerManagerPtr = std::shared_ptr<OwnerManager>;
namespace S3
{
class IS3LockClient;
using S3LockClientPtr = std::shared_ptr<IS3LockClient>;
class S3GCManagerService;
using S3GCManagerServicePtr = std::unique_ptr<S3GCManagerService>;
} // namespace S3

class TMTContext : private boost::noncopyable
{
Expand Down Expand Up @@ -92,6 +99,7 @@ class TMTContext : private boost::noncopyable
explicit TMTContext(Context & context_,
const TiFlashRaftConfig & raft_config,
const pingcap::ClusterConfig & cluster_config_);
~TMTContext();

SchemaSyncerPtr getSchemaSyncer() const;

Expand Down Expand Up @@ -142,6 +150,7 @@ class TMTContext : private boost::noncopyable
Etcd::ClientPtr etcd_client;

OwnerManagerPtr s3gc_owner;
S3::S3LockClientPtr s3_lock_client;

mutable std::mutex mutex;

Expand Down

0 comments on commit 5a63cac

Please sign in to comment.