Skip to content

Commit 08c464a

Browse files
committed
lazy loading for s3 environements variables
1 parent 653453a commit 08c464a

File tree

2 files changed

+27
-29
lines changed

2 files changed

+27
-29
lines changed

tensorflow_io/core/plugins/s3/s3_filesystem.cc

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@ static Aws::Client::ClientConfiguration& GetDefaultClientConfig() {
135135
absl::MutexLock l(&cfg_lock);
136136

137137
if (!init) {
138-
const char* endpoint = getenv("S3_ENDPOINT");
139-
if (endpoint) cfg.endpointOverride = Aws::String(endpoint);
140138
const char* region = getenv("AWS_REGION");
141139
// TODO (yongtang): `S3_REGION` should be deprecated after 2.0.
142140
if (!region) region = getenv("S3_REGION");
@@ -241,6 +239,11 @@ static void GetS3Client(tf_s3_filesystem::S3File* s3_file) {
241239
tf_s3_filesystem::AWSLogSystem::ShutdownAWSLogging();
242240
}
243241
});
242+
int temp_value;
243+
if (absl::SimpleAtoi(getenv("S3_DISABLE_MULTI_PART_DOWNLOAD"), &temp_value))
244+
s3_file->use_multi_part_download = (temp_value != 1);
245+
const char* endpoint = getenv("S3_ENDPOINT");
246+
if (endpoint) cfg.endpointOverride = Aws::String(endpoint);
244247
}
245248
}
246249

@@ -263,15 +266,26 @@ static void GetTransferManager(
263266

264267
absl::MutexLock l(&s3_file->initialization_lock);
265268

266-
if (s3_file->transfer_managers[direction].get() == nullptr) {
269+
if (s3_file->transfer_managers.count(direction) == 0) {
270+
uint64_t temp_value;
271+
if (direction == Aws::Transfer::TransferDirection::UPLOAD) {
272+
if (!absl::SimpleAtoi(getenv("S3_MULTI_PART_UPLOAD_CHUNK_SIZE"),
273+
&temp_value))
274+
temp_value = kS3MultiPartUploadChunkSize;
275+
} else if (direction == Aws::Transfer::TransferDirection::DOWNLOAD) {
276+
if (!absl::SimpleAtoi(getenv("S3_MULTI_PART_DOWNLOAD_CHUNK_SIZE"),
277+
&temp_value))
278+
temp_value = kS3MultiPartDownloadChunkSize;
279+
}
280+
s3_file->multi_part_chunk_sizes.emplace(direction, temp_value);
281+
267282
Aws::Transfer::TransferManagerConfiguration config(s3_file->executor.get());
268283
config.s3Client = s3_file->s3_client;
269-
config.bufferSize = s3_file->multi_part_chunk_sizes[direction];
284+
config.bufferSize = temp_value;
270285
// must be larger than pool size * multi part chunk size
271-
config.transferBufferMaxHeapSize =
272-
(kExecutorPoolSize + 1) * s3_file->multi_part_chunk_sizes[direction];
273-
s3_file->transfer_managers[direction] =
274-
Aws::Transfer::TransferManager::Create(config);
286+
config.transferBufferMaxHeapSize = (kExecutorPoolSize + 1) * temp_value;
287+
s3_file->transfer_managers.emplace(
288+
direction, Aws::Transfer::TransferManager::Create(config));
275289
}
276290
}
277291

@@ -529,24 +543,7 @@ S3File::S3File()
529543
transfer_managers(),
530544
multi_part_chunk_sizes(),
531545
use_multi_part_download(false), // TODO: change to true after fix
532-
initialization_lock() {
533-
uint64_t temp_value;
534-
multi_part_chunk_sizes[Aws::Transfer::TransferDirection::UPLOAD] =
535-
absl::SimpleAtoi(getenv("S3_MULTI_PART_UPLOAD_CHUNK_SIZE"), &temp_value)
536-
? temp_value
537-
: kS3MultiPartUploadChunkSize;
538-
multi_part_chunk_sizes[Aws::Transfer::TransferDirection::DOWNLOAD] =
539-
absl::SimpleAtoi(getenv("S3_MULTI_PART_DOWNLOAD_CHUNK_SIZE"), &temp_value)
540-
? temp_value
541-
: kS3MultiPartDownloadChunkSize;
542-
use_multi_part_download =
543-
absl::SimpleAtoi(getenv("S3_DISABLE_MULTI_PART_DOWNLOAD"), &temp_value)
544-
? (temp_value != 1)
545-
: use_multi_part_download;
546-
transfer_managers.emplace(Aws::Transfer::TransferDirection::UPLOAD, nullptr);
547-
transfer_managers.emplace(Aws::Transfer::TransferDirection::DOWNLOAD,
548-
nullptr);
549-
}
546+
initialization_lock() {}
550547
void Init(TF_Filesystem* filesystem, TF_Status* status) {
551548
filesystem->plugin_filesystem = new S3File();
552549
TF_SetStatus(status, TF_OK, "");

tensorflow_io/core/plugins/s3/s3_filesystem.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ typedef struct S3File {
6060
std::shared_ptr<Aws::S3::S3Client> s3_client;
6161
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor;
6262
// We need 2 `TransferManager`, for multipart upload/download.
63-
Aws::Map<Aws::Transfer::TransferDirection,
64-
std::shared_ptr<Aws::Transfer::TransferManager>>
63+
Aws::UnorderedMap<Aws::Transfer::TransferDirection,
64+
std::shared_ptr<Aws::Transfer::TransferManager>>
6565
transfer_managers;
6666
// Sizes to split objects during multipart upload/download.
67-
Aws::Map<Aws::Transfer::TransferDirection, uint64_t> multi_part_chunk_sizes;
67+
Aws::UnorderedMap<Aws::Transfer::TransferDirection, uint64_t>
68+
multi_part_chunk_sizes;
6869
bool use_multi_part_download;
6970
absl::Mutex initialization_lock;
7071
S3File();

0 commit comments

Comments
 (0)