Skip to content

Commit f72e64a

Browse files
committed
move CRT creation logic to client factory
1 parent 04f83c6 commit f72e64a

File tree

6 files changed

+23
-37
lines changed

6 files changed

+23
-37
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ public class DefaultS3ClientFactory extends Configured
8585
protected static final Logger LOG =
8686
LoggerFactory.getLogger(DefaultS3ClientFactory.class);
8787

88+
/**
89+
* A one-off log stating whether S3 CRT client is enabled
90+
*/
91+
private static final LogExactlyOnce LOG_S3_CRT_ENABLED = new LogExactlyOnce(LOG);
92+
8893

8994
/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
9095
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
@@ -118,7 +123,17 @@ public S3Client createS3Client(
118123
}
119124

120125
@Override
121-
public S3AsyncClient createS3AsyncClient(
126+
public S3AsyncClient createS3AsyncClient(final URI uri,
127+
final S3ClientCreationParameters parameters) throws IOException {
128+
if (parameters.isCrtEnabled()) {
129+
LOG_S3_CRT_ENABLED.info("The S3 CRT client is enabled");
130+
return createS3CrtAsyncClient(uri, parameters);
131+
} else {
132+
return createJavaAsyncClient(uri, parameters);
133+
}
134+
}
135+
136+
public S3AsyncClient createJavaAsyncClient(
122137
final URI uri,
123138
final S3ClientCreationParameters parameters) throws IOException {
124139

@@ -148,8 +163,7 @@ public S3AsyncClient createS3AsyncClient(
148163
return s3AsyncClientBuilder.build();
149164
}
150165

151-
@Override
152-
public S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters)
166+
private S3AsyncClient createS3CrtAsyncClient(URI uri, S3ClientCreationParameters parameters)
153167
throws IOException {
154168
Configuration conf = getConf();
155169
String bucket = uri.getHost();

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ S3AsyncClient createS3AsyncClient(URI uri,
8282
S3ClientCreationParameters parameters) throws IOException;
8383

8484

85-
S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters) throws IOException;
86-
8785
/**
8886
* Creates a new {@link S3TransferManager}.
8987
*

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,6 @@ public class ClientManagerImpl
6060

6161
public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);
6262

63-
/**
64-
* A one-off log stating whether S3 CRT client is enabled
65-
*/
66-
private static final LogExactlyOnce LOG_S3_CRT_ENABLED = new LogExactlyOnce(LOG);
6763

6864
/**
6965
* Client factory to invoke.
@@ -148,15 +144,9 @@ private CallableRaisingIOE<S3Client> createS3Client() {
148144
* @return a callable which will create the client.
149145
*/
150146
private CallableRaisingIOE<S3AsyncClient> createAsyncClient() {
151-
return trackDurationOfOperation(durationTrackerFactory, STORE_CLIENT_CREATION.getSymbol(),
152-
() -> {
153-
if (clientCreationParameters.isCrtEnabled()) {
154-
LOG_S3_CRT_ENABLED.info("S3 CRT client is enabled");
155-
return clientFactory.createS3CrtClient(getUri(), clientCreationParameters);
156-
} else {
157-
return clientFactory.createS3AsyncClient(getUri(), clientCreationParameters);
158-
}
159-
});
147+
return trackDurationOfOperation(durationTrackerFactory,
148+
STORE_CLIENT_CREATION.getSymbol(),
149+
() -> clientFactory.createS3AsyncClient(getUri(), clientCreationParameters));
160150
}
161151

162152

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,10 +1775,9 @@ enhanced connection pool management, and can provide higher transfer from and to
17751775
ability to split PUT requests into MPU, and GETs into multiple byte-ranged GETs. More information
17761776
can be found [here](https://aws.amazon.com/blogs/developer/introducing-crt-based-s3-client-and-the-s3-transfer-manager-in-the-aws-sdk-for-java-2-x/).
17771777

1778-
While S3A does not benefit directly from all these features, since when writing files S3A
1779-
implements its own MPU is used, the enhanced connection pool management can help when making
1780-
many parallel GET requests to ensure load is evenly distributed across S3, as happens in the
1781-
Analytics input stream.
1778+
Using CRT ensures load is evenly distributed across S3, as happens in the
1779+
Analytics input stream when making multiple parallel GET requests, due to the enhanced connection
1780+
pool management.
17821781

17831782
The CRT client can be enabled as follows:
17841783

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,6 @@ public S3AsyncClient createS3AsyncClient(URI uri, final S3ClientCreationParamete
6262
return s3;
6363
}
6464

65-
@Override
66-
public S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters)
67-
throws IOException {
68-
S3AsyncClient s3 = mock(S3AsyncClient.class);
69-
return s3;
70-
}
71-
7265
@Override
7366
public S3TransferManager createS3TransferManager(S3AsyncClient s3AsyncClient) {
7467
S3TransferManager tm = mock(S3TransferManager.class);

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/StubS3ClientFactory.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,6 @@ public S3AsyncClient createS3AsyncClient(final URI uri,
9090
return asyncClient;
9191
}
9292

93-
@Override
94-
public S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters parameters)
95-
throws IOException {
96-
asyncClientCreationCount.incrementAndGet();
97-
launcher.apply();
98-
return asyncClient;
99-
}
100-
10193
@Override
10294
public S3TransferManager createS3TransferManager(final S3AsyncClient s3AsyncClient) {
10395
transferManagerCreationCount.incrementAndGet();

0 commit comments

Comments
 (0)