Skip to content

Commit

Permalink
Add S3 config and utils. (#6892)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
JinheLin authored Feb 26, 2023
1 parent fbed3eb commit 6a56277
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 38 deletions.
29 changes: 27 additions & 2 deletions dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,17 +529,42 @@ void StorageS3Config::parse(const String & content, const LoggerPtr & log)

readConfig(table, "endpoint", endpoint);
readConfig(table, "bucket", bucket);
readConfig(table, "max_connections", max_connections);
RUNTIME_CHECK(max_connections > 0);
readConfig(table, "connection_timeout_ms", connection_timeout_ms);
RUNTIME_CHECK(connection_timeout_ms > 0);
readConfig(table, "request_timeout_ms", request_timeout_ms);
RUNTIME_CHECK(request_timeout_ms > 0);
readConfig(table, "cache_dir", cache_dir);
readConfig(table, "cache_capacity", cache_capacity);

access_key_id = Poco::Environment::get("AWS_ACCESS_KEY_ID", /*default*/ "");
secret_access_key = Poco::Environment::get("AWS_SECRET_ACCESS_KEY", /*default*/ "");

LOG_INFO(log, "endpoint={} bucket={} isS3Enabled={}", endpoint, bucket, isS3Enabled());
LOG_INFO(
log,
"endpoint={} bucket={} max_connections={} connection_timeout_ms={} "
"request_timeout_ms={} cache_dir={} cache_capacity={} "
"access_key_id_size={} secret_access_key_size={}",
endpoint,
bucket,
max_connections,
connection_timeout_ms,
request_timeout_ms,
cache_dir,
cache_capacity,
access_key_id.size(),
secret_access_key.size());
}

bool StorageS3Config::isS3Enabled() const
{
return !endpoint.empty() && !bucket.empty() && !access_key_id.empty() && !secret_access_key.empty();
return !bucket.empty();
}

bool StorageS3Config::isFileCacheEnabled() const
{
return !cache_dir.empty() && cache_capacity != 0;
}

} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,15 @@ struct StorageS3Config
String bucket;
String access_key_id;
String secret_access_key;
UInt64 max_connections = 1024;
UInt64 connection_timeout_ms = 1000;
UInt64 request_timeout_ms = 3000;
String cache_dir;
UInt64 cache_capacity = 0;

void parse(const String & content, const LoggerPtr & log);
bool isS3Enabled() const;
bool isFileCacheEnabled() const;
};

struct TiFlashStorageConfig
Expand Down
75 changes: 49 additions & 26 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ void ClientFactory::init(const StorageS3Config & config_)
config = config_;
Aws::InitAPI(aws_options);
Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>());
shared_client = create();
}

void ClientFactory::shutdown()
{
shared_client.reset(); // Reset S3Client before Aws::ShutdownAPI.
Aws::Utils::Logging::ShutdownAWSLogging();
Aws::ShutdownAPI(aws_options);
}
Expand All @@ -144,13 +146,53 @@ ClientFactory & ClientFactory::instance()

std::unique_ptr<Aws::S3::S3Client> ClientFactory::create() const
{
auto scheme = parseScheme(config.endpoint);
return create(
config.endpoint,
scheme,
scheme == Aws::Http::Scheme::HTTPS,
config.access_key_id,
config.secret_access_key);
return create(config);
}

const String & ClientFactory::bucket() const
{
// `bucket` is read-only.
return config.bucket;
}

std::shared_ptr<Aws::S3::S3Client> ClientFactory::sharedClient() const
{
// `shared_client` is created during initialization and destroyed when process exits
// which means it is read-only when processing requests. So, it is safe to read `shared_client`
// without acquiring lock.
return shared_client;
}

std::unique_ptr<Aws::S3::S3Client> ClientFactory::create(const StorageS3Config & config_)
{
Aws::Client::ClientConfiguration cfg;
cfg.maxConnections = config_.max_connections;
cfg.requestTimeoutMs = config_.request_timeout_ms;
cfg.connectTimeoutMs = config_.connection_timeout_ms;
if (!config_.endpoint.empty())
{
cfg.endpointOverride = config_.endpoint;
auto scheme = parseScheme(config_.endpoint);
cfg.scheme = scheme;
cfg.verifySSL = scheme == Aws::Http::Scheme::HTTPS;
}
if (config_.access_key_id.empty() && config_.secret_access_key.empty())
{
// Request that does not require authentication.
// Such as the EC2 access permission to the S3 bucket is configured.
// If the empty access_key_id and secret_access_key are passed to S3Client,
// an authentication error will be reported.
return std::make_unique<Aws::S3::S3Client>(cfg);
}
else
{
Aws::Auth::AWSCredentials cred(config_.access_key_id, config_.secret_access_key);
return std::make_unique<Aws::S3::S3Client>(
cred,
cfg,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
/*useVirtualAddressing*/ true);
}
}

std::unique_ptr<TiFlashS3Client> ClientFactory::createWithBucket() const
Expand All @@ -169,25 +211,6 @@ std::unique_ptr<TiFlashS3Client> ClientFactory::createWithBucket() const
/*useVirtualAddressing*/ true);
}

std::unique_ptr<Aws::S3::S3Client> ClientFactory::create(
const String & endpoint,
Aws::Http::Scheme scheme,
bool verifySSL,
const String & access_key_id,
const String & secret_access_key)
{
Aws::Client::ClientConfiguration cfg;
cfg.endpointOverride = endpoint;
cfg.scheme = scheme;
cfg.verifySSL = verifySSL;
Aws::Auth::AWSCredentials cred(access_key_id, secret_access_key);
return std::make_unique<Aws::S3::S3Client>(
cred,
cfg,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
/*useVirtualAddressing*/ true);
}

Aws::Http::Scheme ClientFactory::parseScheme(std::string_view endpoint)
{
return boost::algorithm::starts_with(endpoint, "https://") ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP;
Expand Down
18 changes: 8 additions & 10 deletions dbms/src/Storages/S3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,23 @@ class ClientFactory

void init(const StorageS3Config & config_);
void shutdown();
std::unique_ptr<Aws::S3::S3Client> create() const;

std::unique_ptr<TiFlashS3Client> createWithBucket() const;

static std::unique_ptr<Aws::S3::S3Client> create(
const String & endpoint,
Aws::Http::Scheme scheme,
bool verifySSL,
const String & access_key_id,
const String & secret_access_key);
const String & bucket() const;
std::shared_ptr<Aws::S3::S3Client> sharedClient() const;

static Aws::Http::Scheme parseScheme(std::string_view endpoint);
std::unique_ptr<TiFlashS3Client> createWithBucket() const;

private:
ClientFactory() = default;
DISALLOW_COPY_AND_MOVE(ClientFactory);
std::unique_ptr<Aws::S3::S3Client> create() const;

static std::unique_ptr<Aws::S3::S3Client> create(const StorageS3Config & config_);
static Aws::Http::Scheme parseScheme(std::string_view endpoint);

Aws::SDKOptions aws_options;
StorageS3Config config;
std::shared_ptr<Aws::S3::S3Client> shared_client;
};

struct ObjectInfo
Expand Down

0 comments on commit 6a56277

Please sign in to comment.