Skip to content

Commit 9a2aef7

Browse files
authored
ARROW-18397: [C++] Clear S3 region resolver client at S3 shutdown (#14718)
This should hopefully suppress a failed assertion on recent AWS SDK versions. See aws/aws-sdk-cpp#2204 for upstream issue report. Authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Sutou Kouhei <kou@clear-code.com>
1 parent ade4266 commit 9a2aef7

File tree

3 files changed

+109
-76
lines changed

3 files changed

+109
-76
lines changed

ci/conda_env_cpp.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
aws-sdk-cpp=1.9.379
18+
aws-sdk-cpp=1.10.13
1919
benchmark>=1.6.0
2020
boost-cpp>=1.68.0
2121
brotli

cpp/src/arrow/filesystem/s3fs.cc

Lines changed: 100 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -122,74 +122,6 @@ using internal::ToURLEncodedAwsString;
122122

123123
static const char kSep = '/';
124124

125-
namespace {
126-
127-
std::mutex aws_init_lock;
128-
Aws::SDKOptions aws_options;
129-
std::atomic<bool> aws_initialized(false);
130-
131-
Status DoInitializeS3(const S3GlobalOptions& options) {
132-
Aws::Utils::Logging::LogLevel aws_log_level;
133-
134-
#define LOG_LEVEL_CASE(level_name) \
135-
case S3LogLevel::level_name: \
136-
aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
137-
break;
138-
139-
switch (options.log_level) {
140-
LOG_LEVEL_CASE(Fatal)
141-
LOG_LEVEL_CASE(Error)
142-
LOG_LEVEL_CASE(Warn)
143-
LOG_LEVEL_CASE(Info)
144-
LOG_LEVEL_CASE(Debug)
145-
LOG_LEVEL_CASE(Trace)
146-
default:
147-
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
148-
}
149-
150-
#undef LOG_LEVEL_CASE
151-
152-
aws_options.loggingOptions.logLevel = aws_log_level;
153-
// By default the AWS SDK logs to files, log to console instead
154-
aws_options.loggingOptions.logger_create_fn = [] {
155-
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
156-
aws_options.loggingOptions.logLevel);
157-
};
158-
#if (defined(AWS_SDK_VERSION_MAJOR) && \
159-
(AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
160-
(AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
161-
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
162-
// This configuration options is only available with AWS SDK 1.9.272 and later.
163-
aws_options.httpOptions.compliantRfc3986Encoding = true;
164-
#endif
165-
Aws::InitAPI(aws_options);
166-
aws_initialized.store(true);
167-
return Status::OK();
168-
}
169-
170-
} // namespace
171-
172-
Status InitializeS3(const S3GlobalOptions& options) {
173-
std::lock_guard<std::mutex> lock(aws_init_lock);
174-
return DoInitializeS3(options);
175-
}
176-
177-
Status FinalizeS3() {
178-
std::lock_guard<std::mutex> lock(aws_init_lock);
179-
Aws::ShutdownAPI(aws_options);
180-
aws_initialized.store(false);
181-
return Status::OK();
182-
}
183-
184-
Status EnsureS3Initialized() {
185-
std::lock_guard<std::mutex> lock(aws_init_lock);
186-
if (!aws_initialized.load()) {
187-
S3GlobalOptions options{S3LogLevel::Fatal};
188-
return DoInitializeS3(options);
189-
}
190-
return Status::OK();
191-
}
192-
193125
// -----------------------------------------------------------------------
194126
// S3ProxyOptions implementation
195127

@@ -268,7 +200,7 @@ std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsStandardRetryStrategy(
268200
// S3Options implementation
269201

270202
S3Options::S3Options() {
271-
DCHECK(aws_initialized.load()) << "Must initialize S3 before using S3Options";
203+
DCHECK(IsS3Initialized()) << "Must initialize S3 before using S3Options";
272204
}
273205

274206
void S3Options::ConfigureDefaultCredentials() {
@@ -459,7 +391,7 @@ bool S3Options::Equals(const S3Options& other) const {
459391
namespace {
460392

461393
Status CheckS3Initialized() {
462-
if (!aws_initialized.load()) {
394+
if (!IsS3Initialized()) {
463395
return Status::Invalid(
464396
"S3 subsystem not initialized; please call InitializeS3() "
465397
"before carrying out any S3-related operation");
@@ -864,8 +796,7 @@ class RegionResolver {
864796
}
865797

866798
static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
867-
static std::shared_ptr<RegionResolver> instance;
868-
auto resolver = std::atomic_load(&instance);
799+
auto resolver = std::atomic_load(&instance_);
869800
if (resolver) {
870801
return resolver;
871802
}
@@ -876,19 +807,24 @@ class RegionResolver {
876807
// Make sure to always return the same instance even if several threads
877808
// call DefaultInstance at once.
878809
std::shared_ptr<RegionResolver> existing;
879-
if (std::atomic_compare_exchange_strong(&instance, &existing, *maybe_resolver)) {
810+
if (std::atomic_compare_exchange_strong(&instance_, &existing, *maybe_resolver)) {
880811
return *maybe_resolver;
881812
} else {
882813
return existing;
883814
}
884815
}
885816

817+
static void ResetDefaultInstance() {
818+
std::atomic_store(&instance_, std::shared_ptr<RegionResolver>());
819+
}
820+
886821
Result<std::string> ResolveRegion(const std::string& bucket) {
887822
std::unique_lock<std::mutex> lock(cache_mutex_);
888823
auto it = cache_.find(bucket);
889824
if (it != cache_.end()) {
890825
return it->second;
891826
}
827+
// Cache miss: do the actual region lookup
892828
lock.unlock();
893829
ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
894830
lock.lock();
@@ -911,6 +847,8 @@ class RegionResolver {
911847
return builder_.BuildClient().Value(&client_);
912848
}
913849

850+
static std::shared_ptr<RegionResolver> instance_;
851+
914852
ClientBuilder builder_;
915853
std::shared_ptr<S3Client> client_;
916854

@@ -920,6 +858,8 @@ class RegionResolver {
920858
std::unordered_map<std::string, std::string> cache_;
921859
};
922860

861+
std::shared_ptr<RegionResolver> RegionResolver::instance_;
862+
923863
// -----------------------------------------------------------------------
924864
// S3 file stream implementations
925865

@@ -2621,9 +2561,94 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
26212561
return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
26222562
}
26232563

2624-
//
2564+
// -----------------------------------------------------------------------
2565+
// Initialization and finalization
2566+
2567+
namespace {
2568+
2569+
std::mutex aws_init_lock;
2570+
Aws::SDKOptions aws_options;
2571+
std::atomic<bool> aws_initialized(false);
2572+
2573+
Status DoInitializeS3(const S3GlobalOptions& options) {
2574+
Aws::Utils::Logging::LogLevel aws_log_level;
2575+
2576+
#define LOG_LEVEL_CASE(level_name) \
2577+
case S3LogLevel::level_name: \
2578+
aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
2579+
break;
2580+
2581+
switch (options.log_level) {
2582+
LOG_LEVEL_CASE(Fatal)
2583+
LOG_LEVEL_CASE(Error)
2584+
LOG_LEVEL_CASE(Warn)
2585+
LOG_LEVEL_CASE(Info)
2586+
LOG_LEVEL_CASE(Debug)
2587+
LOG_LEVEL_CASE(Trace)
2588+
default:
2589+
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
2590+
}
2591+
2592+
#undef LOG_LEVEL_CASE
2593+
2594+
aws_options.loggingOptions.logLevel = aws_log_level;
2595+
// By default the AWS SDK logs to files, log to console instead
2596+
aws_options.loggingOptions.logger_create_fn = [] {
2597+
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
2598+
aws_options.loggingOptions.logLevel);
2599+
};
2600+
#if (defined(AWS_SDK_VERSION_MAJOR) && \
2601+
(AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
2602+
(AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
2603+
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
2604+
// This configuration options is only available with AWS SDK 1.9.272 and later.
2605+
aws_options.httpOptions.compliantRfc3986Encoding = true;
2606+
#endif
2607+
Aws::InitAPI(aws_options);
2608+
aws_initialized.store(true);
2609+
return Status::OK();
2610+
}
2611+
2612+
Status DoFinalizeS3() {
2613+
RegionResolver::ResetDefaultInstance();
2614+
Aws::ShutdownAPI(aws_options);
2615+
aws_initialized.store(false);
2616+
return Status::OK();
2617+
}
2618+
2619+
} // namespace
2620+
2621+
Status InitializeS3(const S3GlobalOptions& options) {
2622+
std::lock_guard<std::mutex> lock(aws_init_lock);
2623+
return DoInitializeS3(options);
2624+
}
2625+
2626+
Status EnsureS3Initialized() {
2627+
std::lock_guard<std::mutex> lock(aws_init_lock);
2628+
if (!aws_initialized.load()) {
2629+
S3GlobalOptions options{S3LogLevel::Fatal};
2630+
return DoInitializeS3(options);
2631+
}
2632+
return Status::OK();
2633+
}
2634+
2635+
Status FinalizeS3() {
2636+
std::lock_guard<std::mutex> lock(aws_init_lock);
2637+
return DoFinalizeS3();
2638+
}
2639+
2640+
Status EnsureS3Finalized() {
2641+
std::lock_guard<std::mutex> lock(aws_init_lock);
2642+
if (aws_initialized.load()) {
2643+
return DoFinalizeS3();
2644+
}
2645+
return Status::OK();
2646+
}
2647+
2648+
bool IsS3Initialized() { return aws_initialized.load(); }
2649+
2650+
// -----------------------------------------------------------------------
26252651
// Top-level utility functions
2626-
//
26272652

26282653
Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
26292654
if (bucket.empty() || bucket.find_first_of(kSep) != bucket.npos ||

cpp/src/arrow/filesystem/s3fs.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,10 +336,18 @@ Status InitializeS3(const S3GlobalOptions& options);
336336
ARROW_EXPORT
337337
Status EnsureS3Initialized();
338338

339+
/// Whether S3 was initialized, and not finalized.
340+
ARROW_EXPORT
341+
bool IsS3Initialized();
342+
339343
/// Shutdown the S3 APIs.
340344
ARROW_EXPORT
341345
Status FinalizeS3();
342346

347+
/// Ensure the S3 APIs are shutdown, but only if not already done.
348+
ARROW_EXPORT
349+
Status EnsureS3Finalized();
350+
343351
ARROW_EXPORT
344352
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);
345353

0 commit comments

Comments
 (0)