Skip to content

Commit

Permalink
Fix CopyFile and add test (+ nits)
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Oct 17, 2024
1 parent 33962a8 commit 3fce2f5
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 21 deletions.
41 changes: 32 additions & 9 deletions cpp/src/arrow/filesystem/s3_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
#endif // !ARROW_AWS_SDK_VERSION_CHECK

#if ARROW_AWS_SDK_VERSION_CHECK(1, 9, 201)
# define ARROW_S3_HAS_SSE_C
# define ARROW_S3_HAS_SSE_CUSTOMER_KEY
#endif

namespace arrow {
Expand Down Expand Up @@ -339,19 +339,42 @@ inline Result<std::string> CalculateSSECustomerKeyMD5(
sse_customer_key_md5.GetLength()));
}

template <typename S3RequestType>
Status SetSSECustomerKey(S3RequestType* request, const std::string& sse_customer_key) {
struct SSECustomerKeyHeaders {
std::string sse_customer_key;
std::string sse_customer_key_md5;
std::string sse_customer_algorithm;
};

inline Result<std::optional<SSECustomerKeyHeaders>> GetSSECustomerKeyHeaders(
const std::string& sse_customer_key) {
if (sse_customer_key.empty()) {
return Status::OK(); // do nothing if the sse_customer_key is not configured
return std::nullopt;
}
#ifdef ARROW_S3_HAS_SSE_C
#ifdef ARROW_S3_HAS_SSE_CUSTOMER_KEY
ARROW_ASSIGN_OR_RAISE(auto md5, internal::CalculateSSECustomerKeyMD5(sse_customer_key));
request->SetSSECustomerKeyMD5(md5);
request->SetSSECustomerKey(arrow::util::base64_encode(sse_customer_key));
request->SetSSECustomerAlgorithm("AES256");
return SSECustomerKeyHeaders{arrow::util::base64_encode(sse_customer_key), md5,
"AES256"};
#else
return Status::NotImplemented(
"SSE customer key not supported by this version of the AWS SDK");
#endif
}

template <typename S3RequestType>
Status SetSSECustomerKey(S3RequestType* request, const std::string& sse_customer_key) {
ARROW_ASSIGN_OR_RAISE(auto maybe_headers, GetSSECustomerKeyHeaders(sse_customer_key));
if (!maybe_headers.has_value()) {
return Status::OK();
}
#ifdef ARROW_S3_HAS_SSE_CUSTOMER_KEY
auto headers = std::move(maybe_headers).value();
request->SetSSECustomerKey(headers.sse_customer_key);
request->SetSSECustomerKeyMD5(headers.sse_customer_key_md5);
request->SetSSECustomerAlgorithm(headers.sse_customer_algorithm);
return Status::OK();
#else
return Status::NotImplemented("SSE-C is not supported");
return Status::NotImplemented(
"SSE customer key not supported by this version of the AWS SDK");
#endif
}

Expand Down
12 changes: 11 additions & 1 deletion cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2367,8 +2367,18 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

S3Model::CopyObjectRequest req;
req.SetBucket(ToAwsString(dest_path.bucket));
RETURN_NOT_OK(SetSSECustomerKey(&req, options().sse_customer_key));
req.SetKey(ToAwsString(dest_path.key));
ARROW_ASSIGN_OR_RAISE(auto maybe_sse_headers,
internal::GetSSECustomerKeyHeaders(options().sse_customer_key));
if (maybe_sse_headers.has_value()) {
auto sse_headers = std::move(maybe_sse_headers).value();
req.SetSSECustomerKey(sse_headers.sse_customer_key);
req.SetSSECustomerKeyMD5(sse_headers.sse_customer_key_md5);
req.SetSSECustomerAlgorithm(sse_headers.sse_customer_algorithm);
req.SetCopySourceSSECustomerKey(sse_headers.sse_customer_key);
req.SetCopySourceSSECustomerKeyMD5(sse_headers.sse_customer_key_md5);
req.SetCopySourceSSECustomerAlgorithm(sse_headers.sse_customer_algorithm);
}
// ARROW-13048: Copy source "Must be URL-encoded" according to AWS SDK docs.
// However at least in 1.8 and 1.9 the SDK URL-encodes the path for you
req.SetCopySource(src_path.ToAwsString());
Expand Down
24 changes: 16 additions & 8 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,31 @@ struct ARROW_EXPORT S3Options {
/// delay between retries.
std::shared_ptr<S3RetryStrategy> retry_strategy;

/// The SSE-C customized key (raw 32 bytes key).
/// Optional customer-provided key for server-side encryption (SSE-C).
///
/// This should be the 32-byte AES-256 key, unencoded.
std::string sse_customer_key;

/// Path to a single PEM file holding all TLS CA certificates
/// Optional path to a single PEM file holding all TLS CA certificates
///
/// If empty, global filesystem options will be used, if the global filesystem options
/// is also empty, the underlying TLS library's defaults will be used.
/// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
/// if the corresponding global filesystem option is also empty, the underlying
/// TLS library's defaults will be used.
std::string tls_ca_file_path;

/// Path to a directory holding TLS CA certificates in individual PEM files
/// Optional path to a directory holding TLS CA
///
/// The given directory should contain CA certificates as individual PEM files
/// named along the OpenSSL "hashed" format.
///
/// If empty, global filesystem options will be used, if the global filesystem options
/// is also empty, the underlying TLS library's defaults will be used.
/// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
/// if the corresponding global filesystem option is also empty, the underlying
/// TLS library's defaults will be used.
std::string tls_ca_dir_path;

/// Whether to verify the S3 endpoint's TLS certificate, if the scheme is "https".
/// Whether to verify the S3 endpoint's TLS certificate
///
/// This option applies if the scheme is "https".
bool tls_verify_certificates = true;

S3Options();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/s3fs_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class MinioFixture : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
minio_.reset(new MinioTestServer());
ASSERT_OK(minio_->Start(false));
ASSERT_OK(minio_->Start(/*enable_tls_if_supported=*/false));

const char* region_str = std::getenv(kEnvAwsRegion);
if (region_str) {
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ ::testing::Environment* s3_env = ::testing::AddGlobalTestEnvironment(new S3Envir
::testing::Environment* minio_env =
::testing::AddGlobalTestEnvironment(new MinioTestEnvironment);

::testing::Environment* minio_env_http =
::testing::AddGlobalTestEnvironment(new MinioTestEnvironment(false));
::testing::Environment* minio_env_http = ::testing::AddGlobalTestEnvironment(
new MinioTestEnvironment(/*enable_tls_if_supported=*/false));

MinioTestEnvironment* GetMinioEnv(bool enable_tls_if_supported) {
if (enable_tls_if_supported) {
Expand Down Expand Up @@ -1349,6 +1349,7 @@ TEST_F(TestS3FS, OpenInputFile) {
ASSERT_RAISES(IOError, file->Seek(10));
}

// Minio only allows Server Side Encryption on HTTPS client connections.
#ifdef MINIO_SERVER_WITH_TLS
TEST_F(TestS3FS, SSECustomerKeyMatch) {
// normal write/read with correct SSE-C key
Expand Down Expand Up @@ -1400,6 +1401,18 @@ TEST_F(TestS3FS, SSECustomerKeyMissing) {
ASSERT_OK(RestoreTestBucket());
}
}

TEST_F(TestS3FS, SSECustomerKeyCopyFile) {
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenOutputStream("bucket/newfile_with_sse_c"));
ASSERT_OK(stream->Write("some"));
ASSERT_OK(stream->Close());
ASSERT_OK(fs_->CopyFile("bucket/newfile_with_sse_c", "bucket/copied_with_sse_c"));

ASSERT_OK_AND_ASSIGN(auto file, fs_->OpenInputFile("bucket/copied_with_sse_c"));
ASSERT_OK_AND_ASSIGN(auto buf, file->Read(5));
AssertBufferEqual(*buf, "some");
ASSERT_OK(RestoreTestBucket());
}
#endif // MINIO_SERVER_WITH_TLS

struct S3OptionsTestParameters {
Expand Down Expand Up @@ -1697,5 +1710,6 @@ TEST(S3GlobalOptions, DefaultsLogLevel) {
ASSERT_EQ(S3LogLevel::Fatal, arrow::fs::S3GlobalOptions::Defaults().log_level);
}
}

} // namespace fs
} // namespace arrow

0 comments on commit 3fce2f5

Please sign in to comment.