Skip to content

Commit

Permalink
Add S3 key utils (#6878)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
JaySon-Huang authored Feb 23, 2023
1 parent 4a4f5d8 commit 4b83471
Show file tree
Hide file tree
Showing 12 changed files with 891 additions and 58 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#pragma once

#include <Interpreters/Context.h>
#include <Debug/MockServerInfo.h>
#include <common/ThreadPool.h>
#include <common/logger_useful.h>

Expand All @@ -35,6 +35,8 @@ class IServer;
class IAsyncCallData;
class EstablishCallData;
class MockStorage;
class Context;
using ContextPtr = std::shared_ptr<Context>;
using MockMPPServerInfo = tests::MockMPPServerInfo;

namespace Management
Expand Down Expand Up @@ -114,4 +116,4 @@ class AsyncFlashService final : public tikvpb::Tikv::WithAsyncMethod_EstablishMP
/// Return non-OK grpc::Status when the connection can not be established.
grpc::Status establishMPPConnectionAsync(grpc::ServerContext * context, const mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data);
};
} // namespace DB
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <Flash/Mpp/ReceiverChannelWriter.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <common/StringRef.h>
#include <common/defines.h>
#include <common/logger_useful.h>
#include <common/types.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTunnelSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Flash/Mpp/MPPTaskId.h>
#include <Flash/Mpp/MPPTunnel.h>
#include <Flash/Mpp/MppVersion.h>
#include <IO/CompressedStream.h>

namespace DB
{
Expand Down
226 changes: 175 additions & 51 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
#include <aws/core/utils/logging/LogMacros.h>
#include <aws/core/utils/logging/LogSystemInterface.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <common/logger_useful.h>

#include <boost/algorithm/string/predicate.hpp>
#include <fstream>
#include <magic_enum.hpp>

namespace ProfileEvents
{
Expand Down Expand Up @@ -103,10 +107,20 @@ class AWSLogger final : public Aws::Utils::Logging::LogSystemInterface
} // namespace


namespace DB
namespace DB::S3
{
namespace S3

TiFlashS3Client::TiFlashS3Client(
const String & bucket_name_,
const Aws::Auth::AWSCredentials & credentials,
const Aws::Client::ClientConfiguration & clientConfiguration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy signPayloads,
bool useVirtualAddressing)
: Aws::S3::S3Client(credentials, clientConfiguration, signPayloads, useVirtualAddressing)
, bucket_name(bucket_name_)
{
}

void ClientFactory::init(const StorageS3Config & config_)
{
config = config_;
Expand All @@ -130,15 +144,31 @@ ClientFactory & ClientFactory::instance()

std::unique_ptr<Aws::S3::S3Client> ClientFactory::create() const
{
auto schema = parseScheme(config.endpoint);
auto scheme = parseScheme(config.endpoint);
return create(
config.endpoint,
schema,
schema == Aws::Http::Scheme::HTTPS,
scheme,
scheme == Aws::Http::Scheme::HTTPS,
config.access_key_id,
config.secret_access_key);
}

std::unique_ptr<TiFlashS3Client> ClientFactory::createWithBucket() const
{
auto scheme = parseScheme(config.endpoint);
Aws::Client::ClientConfiguration cfg;
cfg.endpointOverride = config.endpoint;
cfg.scheme = scheme;
cfg.verifySSL = scheme == Aws::Http::Scheme::HTTPS;
Aws::Auth::AWSCredentials cred(config.access_key_id, config.secret_access_key);
return std::make_unique<TiFlashS3Client>(
config.bucket,
cred,
cfg,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
/*useVirtualAddressing*/ true);
}

std::unique_ptr<Aws::S3::S3Client> ClientFactory::create(
const String & endpoint,
Aws::Http::Scheme scheme,
Expand Down Expand Up @@ -168,6 +198,18 @@ bool isNotFoundError(Aws::S3::S3Errors error)
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
}

namespace details
{
template <typename... Args>
Exception fromS3Error(const Aws::S3::S3Error & e, const std::string & fmt, Args &&... args)
{
return DB::Exception(
ErrorCodes::S3_ERROR,
fmt + fmt::format(" s3error={} s3msg={}", magic_enum::enum_name(e.GetErrorType()), e.GetMessage()),
args...);
}
} // namespace details

Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id)
{
ProfileEvents::increment(ProfileEvents::S3HeadObject);
Expand All @@ -192,11 +234,7 @@ S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bu
}
else if (throw_on_error)
{
const auto & error = outcome.GetError();
throw DB::Exception(ErrorCodes::S3_ERROR,
"Failed to HEAD object: {}. HTTP response code: {}",
error.GetMessage(),
static_cast<size_t>(error.GetResponseCode()));
throw details::fromS3Error(outcome.GetError(), "Failed to HEAD object, key={}", key);
}
return {};
}
Expand All @@ -218,30 +256,40 @@ bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const
{
return false;
}
throw Exception(ErrorCodes::S3_ERROR,
"Failed to check existence of key {} in bucket {}: {}",
key,
bucket,
error.GetMessage());
throw details::fromS3Error(outcome.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key);
}

void uploadEmptyFile(const Aws::S3::S3Client & client, const String & bucket, const String & key)
{
Stopwatch sw;
Aws::S3::Model::PutObjectRequest req;
req.WithBucket(bucket).WithKey(key);
req.SetContentType("binary/octet-stream");
auto istr = Aws::MakeShared<Aws::StringStream>("EmptyObjectInputStream", "", std::ios_base::in | std::ios_base::binary);
req.SetBody(istr);
auto result = client.PutObject(req);
if (!result.IsSuccess())
{
throw details::fromS3Error(result.GetError(), "S3 PutEmptyObject failed, bucket={} key={}", bucket, key);
}
static auto * log = &Poco::Logger::get("S3UploadFile");
LOG_DEBUG(log, "remote_fname={}, cost={}ms", key, sw.elapsedMilliseconds());
}

void uploadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname)
{
Stopwatch sw;
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(bucket);
req.SetKey(remote_fname);
req.WithBucket(bucket).WithKey(remote_fname);
req.SetContentType("binary/octet-stream");
auto istr = Aws::MakeShared<Aws::FStream>("PutObjectInputStream", local_fname, std::ios_base::in | std::ios_base::binary);
req.SetBody(istr);
req.SetContentType("binary/octet-stream");
auto result = client.PutObject(req);
RUNTIME_CHECK_MSG(result.IsSuccess(),
"S3 PutObject failed, local_fname={}, remote_fname={}, exception={}, message={}",
local_fname,
remote_fname,
result.GetError().GetExceptionName(),
result.GetError().GetMessage());
static auto * log = &Poco::Logger::get("S3PutObject");
if (!result.IsSuccess())
{
throw details::fromS3Error(result.GetError(), "S3 PutObject failed, local_fname={} bucket={} key={}", local_fname, bucket, remote_fname);
}
static auto log = Logger::get();
LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds());
}

Expand All @@ -252,42 +300,118 @@ void downloadFile(const Aws::S3::S3Client & client, const String & bucket, const
req.SetBucket(bucket);
req.SetKey(remote_fname);
auto result = client.GetObject(req);
RUNTIME_CHECK_MSG(result.IsSuccess(),
"S3 GetObject failed, local_fname={}, remote_fname={}, exception={}, message={}",
local_fname,
remote_fname,
result.GetError().GetExceptionName(),
result.GetError().GetMessage());
if (!result.IsSuccess())
{
throw details::fromS3Error(result.GetError(), "S3 GetObject failed, local_fname={} bucket={} key={}", local_fname, bucket, remote_fname);
}
Aws::OFStream ostr(local_fname, std::ios_base::out | std::ios_base::binary);
ostr << result.GetResult().GetBody().rdbuf();
static auto * log = &Poco::Logger::get("S3GetObject");
static auto log = Logger::get();
LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds());
}

std::unordered_map<String, size_t> listPrefix(const Aws::S3::S3Client & client, const String & bucket, const String & prefix)
void listPrefix(
const Aws::S3::S3Client & client,
const String & bucket,
const String & prefix,
std::function<PageResult(const Aws::S3::Model::ListObjectsV2Result & result)> pager)
{
// Usually we don't need to set delimiter.
// Check the docs here for Delimiter && CommonPrefixes when you really need it.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
listPrefix(client, bucket, prefix, /*delimiter*/ "", pager);
}

void listPrefix(
const Aws::S3::S3Client & client,
const String & bucket,
const String & prefix,
std::string_view delimiter,
std::function<PageResult(const Aws::S3::Model::ListObjectsV2Result & result)> pager)
{
Stopwatch sw;
Aws::S3::Model::ListObjectsRequest req;
req.SetBucket(bucket);
req.SetPrefix(prefix);
auto result = client.ListObjects(req);
RUNTIME_CHECK_MSG(result.IsSuccess(),
"S3 ListObjects failed, prefix={}, exception={}, message={}",
prefix,
result.GetError().GetExceptionName(),
result.GetError().GetMessage());
const auto & objects = result.GetResult().GetContents();
std::unordered_map<String, size_t> keys_with_size;
keys_with_size.reserve(objects.size());
for (const auto & object : objects)
Aws::S3::Model::ListObjectsV2Request req;
req.WithBucket(bucket).WithPrefix(prefix);
if (!delimiter.empty())
{
req.SetDelimiter(String(delimiter));
}

static auto log = Logger::get("S3ListPrefix");

bool done = false;
size_t num_keys = 0;
while (!done)
{
keys_with_size.emplace(object.GetKey().substr(prefix.size()), object.GetSize()); // Cut prefix
auto outcome = client.ListObjectsV2(req);
if (!outcome.IsSuccess())
{
throw details::fromS3Error(outcome.GetError(), "S3 ListObjects failed, bucket={} prefix={} delimiter={}", bucket, prefix, delimiter);
}

const auto & result = outcome.GetResult();
PageResult page_res = pager(result);
num_keys += page_res.num_keys;

// handle the result size over max size
done = !result.GetIsTruncated();
if (!done && page_res.more)
{
const auto & next_token = result.GetNextContinuationToken();
req.SetContinuationToken(next_token);
LOG_DEBUG(log, "prefix={}, keys={}, total_keys={}, next_token={}", prefix, page_res.num_keys, num_keys, next_token);
}
}
static auto * log = &Poco::Logger::get("S3ListObjects");
LOG_DEBUG(log, "prefix={}, keys={}, cost={}", prefix, keys_with_size, sw.elapsedMilliseconds());
LOG_DEBUG(log, "prefix={}, total_keys={}, cost={}", prefix, num_keys, sw.elapsedMilliseconds());
}

std::unordered_map<String, size_t> listPrefixWithSize(const Aws::S3::S3Client & client, const String & bucket, const String & prefix)
{
std::unordered_map<String, size_t> keys_with_size;
listPrefix(client, bucket, prefix, "", [&](const Aws::S3::Model::ListObjectsV2Result & result) {
const auto & objects = result.GetContents();
keys_with_size.reserve(keys_with_size.size() + objects.size());
for (const auto & object : objects)
{
keys_with_size.emplace(object.GetKey().substr(prefix.size()), object.GetSize()); // Cut prefix
}
return PageResult{.num_keys = objects.size(), .more = true};
});

return keys_with_size;
}

} // namespace S3
std::pair<bool, Aws::Utils::DateTime> tryGetObjectModifiedTime(
const Aws::S3::S3Client & client,
const String & bucket,
const String & key)
{
auto o = headObject(client, bucket, key);
if (!o.IsSuccess())
{
if (const auto & err = o.GetError(); isNotFoundError(err.GetErrorType()))
{
return {false, {}};
}
throw details::fromS3Error(o.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key);
}
// Else the object still exist
const auto & res = o.GetResult();
// "DeleteMark" of S3 service, don't know what will lead to this
RUNTIME_CHECK(!res.GetDeleteMarker(), bucket, key);
return {true, res.GetLastModified()};
}

void deleteObject(const Aws::S3::S3Client & client, const String & bucket, const String & key)
{
Aws::S3::Model::DeleteObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
auto o = client.DeleteObject(req);
RUNTIME_CHECK(o.IsSuccess(), o.GetError().GetMessage());
const auto & res = o.GetResult();
// "DeleteMark" of S3 service, don't know what will lead to this
RUNTIME_CHECK(!res.GetDeleteMarker(), bucket, key);
}

} // namespace DB
} // namespace DB::S3
Loading

0 comments on commit 4b83471

Please sign in to comment.