|
55 | 55 | import org.apache.hadoop.classification.InterfaceStability;
|
56 | 56 | import org.apache.hadoop.conf.Configuration;
|
57 | 57 | import org.apache.hadoop.conf.Configured;
|
| 58 | +import org.apache.hadoop.fs.s3a.impl.AWSRegionEndpointInformation; |
| 59 | +import org.apache.hadoop.fs.s3a.impl.AWSRegionEndpointResolver; |
58 | 60 | import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
|
59 | 61 | import org.apache.hadoop.fs.store.LogExactlyOnce;
|
60 | 62 |
|
|
85 | 87 | */
|
86 | 88 | @InterfaceAudience.Private
|
87 | 89 | @InterfaceStability.Unstable
|
88 |
| -public class |
89 |
| -DefaultS3ClientFactory extends Configured |
| 90 | +public class DefaultS3ClientFactory extends Configured |
90 | 91 | implements S3ClientFactory {
|
91 | 92 |
|
92 | 93 | private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
|
93 | 94 |
|
94 |
| - private static final String S3_SERVICE_NAME = "s3"; |
95 |
| - |
96 |
| - private static final Pattern VPC_ENDPOINT_PATTERN = |
97 |
| - Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$"); |
98 |
| - |
99 | 95 | /**
|
100 | 96 | * Subclasses refer to this.
|
101 | 97 | */
|
102 | 98 | protected static final Logger LOG =
|
103 | 99 | LoggerFactory.getLogger(DefaultS3ClientFactory.class);
|
104 | 100 |
|
105 |
| - /** |
106 |
| - * A one-off warning of default region chains in use. |
107 |
| - */ |
108 |
| - private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN = |
109 |
| - new LogExactlyOnce(LOG); |
110 |
| - |
111 |
| - /** |
112 |
| - * Warning message printed when the SDK Region chain is in use. |
113 |
| - */ |
114 |
| - private static final String SDK_REGION_CHAIN_IN_USE = |
115 |
| - "S3A filesystem client is using" |
116 |
| - + " the SDK region resolution chain."; |
117 |
| - |
118 | 101 |
|
119 | 102 | /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
|
120 | 103 | private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
|
@@ -189,7 +172,19 @@ public S3AsyncClient createS3CrtClient(URI uri, S3ClientCreationParameters param
|
189 | 172 | s3CrtAsyncClientBuilder
|
190 | 173 | .credentialsProvider(parameters.getCredentialSet());
|
191 | 174 |
|
192 |
| - configureCRTClientRegion(s3CrtAsyncClientBuilder, parameters, conf); |
| 175 | + AWSRegionEndpointInformation regionEndpointInformation = |
| 176 | + AWSRegionEndpointResolver.getEndpointRegionResolution(parameters, conf); |
| 177 | + |
| 178 | + if (regionEndpointInformation.getRegion() == null) { |
| 179 | + s3CrtAsyncClientBuilder.region(regionEndpointInformation.getRegion()); |
| 180 | + } |
| 181 | + |
| 182 | + if (regionEndpointInformation.getEndpoint() == null) { |
| 183 | + s3CrtAsyncClientBuilder.endpointOverride(regionEndpointInformation.getEndpoint()); |
| 184 | + } |
| 185 | + |
| 186 | + s3CrtAsyncClientBuilder |
| 187 | + .crossRegionAccessEnabled(regionEndpointInformation.isCrossRegionAccessEnabled()); |
193 | 188 |
|
194 | 189 | return s3CrtAsyncClientBuilder.build();
|
195 | 190 | }
|
@@ -218,7 +213,19 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
|
218 | 213 | BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket)
|
219 | 214 | throws IOException {
|
220 | 215 |
|
221 |
| - configureEndpointAndRegion(builder, parameters, conf); |
| 216 | + AWSRegionEndpointInformation regionEndpointInformation = |
| 217 | + AWSRegionEndpointResolver.getEndpointRegionResolution(parameters, conf); |
| 218 | + |
| 219 | + if(regionEndpointInformation.getRegion() != null) { |
| 220 | + builder.region(regionEndpointInformation.getRegion()); |
| 221 | + } |
| 222 | + |
| 223 | + if (regionEndpointInformation.getEndpoint() != null) { |
| 224 | + builder.endpointOverride(regionEndpointInformation.getEndpoint()); |
| 225 | + } |
| 226 | + |
| 227 | + builder.crossRegionAccessEnabled(regionEndpointInformation.isCrossRegionAccessEnabled()); |
| 228 | + builder.fipsEnabled(regionEndpointInformation.isFipsEnabled()); |
222 | 229 |
|
223 | 230 | maybeApplyS3AccessGrantsConfigurations(builder, conf);
|
224 | 231 |
|
@@ -289,216 +296,6 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
|
289 | 296 | return clientOverrideConfigBuilder;
|
290 | 297 | }
|
291 | 298 |
|
292 |
| - /** |
293 |
| - * Configures the region for the S3 CRT client |
294 |
| - * |
295 |
| - * @param builder S3 CRT client builder |
296 |
| - * @param parameters parameter object |
297 |
| - * @param conf conf object |
298 |
| - */ |
299 |
| - private void configureCRTClientRegion(S3CrtAsyncClientBuilder builder, S3ClientCreationParameters parameters, Configuration conf) { |
300 |
| - final String configuredRegion = parameters.getRegion(); |
301 |
| - Region region = null; |
302 |
| - String origin = ""; |
303 |
| - |
304 |
| - // If the region was configured, set it. |
305 |
| - if (configuredRegion != null && !configuredRegion.isEmpty()) { |
306 |
| - origin = AWS_REGION; |
307 |
| - region = Region.of(configuredRegion); |
308 |
| - } |
309 |
| - |
310 |
| - if (region != null) { |
311 |
| - builder.region(region); |
312 |
| - } else if (configuredRegion == null) { |
313 |
| - // no region is configured, use US_EAST_2 as default. |
314 |
| - region = Region.of(AWS_S3_DEFAULT_REGION); |
315 |
| - builder.region(region); |
316 |
| - origin = "cross region access fallback"; |
317 |
| - } |
318 |
| - |
319 |
| - boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, |
320 |
| - AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT); |
321 |
| - // s3 cross region access |
322 |
| - if (isCrossRegionAccessEnabled) { |
323 |
| - builder.crossRegionAccessEnabled(true); |
324 |
| - } |
325 |
| - |
326 |
| - LOG.debug("Setting region to {} from {} with cross region access {}", |
327 |
| - region, origin, isCrossRegionAccessEnabled); |
328 |
| - } |
329 |
| - |
330 |
| - /** |
331 |
| - * This method configures the endpoint and region for a S3 client. |
332 |
| - * The order of configuration is: |
333 |
| - * |
334 |
| - * <ol> |
335 |
| - * <li>If region is configured via fs.s3a.endpoint.region, use it.</li> |
336 |
| - * <li>If endpoint is configured via via fs.s3a.endpoint, set it. |
337 |
| - * If no region is configured, try to parse region from endpoint. </li> |
338 |
| - * <li> If no region is configured, and it could not be parsed from the endpoint, |
339 |
| - * set the default region as US_EAST_2</li> |
340 |
| - * <li> If configured region is empty, fallback to SDK resolution chain. </li> |
341 |
| - * <li> S3 cross region is enabled by default irrespective of region or endpoint |
342 |
| - * is set or not.</li> |
343 |
| - * </ol> |
344 |
| - * |
345 |
| - * @param builder S3 client builder. |
346 |
| - * @param parameters parameter object |
347 |
| - * @param conf conf configuration object |
348 |
| - * @param <BuilderT> S3 client builder type |
349 |
| - * @param <ClientT> S3 client type |
350 |
| - * @throws IllegalArgumentException if endpoint is set when FIPS is enabled. |
351 |
| - */ |
352 |
| - private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion( |
353 |
| - BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) { |
354 |
| - final String endpointStr = parameters.getEndpoint(); |
355 |
| - final URI endpoint = getS3Endpoint(endpointStr, conf); |
356 |
| - |
357 |
| - final String configuredRegion = parameters.getRegion(); |
358 |
| - Region region = null; |
359 |
| - String origin = ""; |
360 |
| - |
361 |
| - // If the region was configured, set it. |
362 |
| - if (configuredRegion != null && !configuredRegion.isEmpty()) { |
363 |
| - origin = AWS_REGION; |
364 |
| - region = Region.of(configuredRegion); |
365 |
| - } |
366 |
| - |
367 |
| - // FIPs? Log it, then reject any attempt to set an endpoint |
368 |
| - final boolean fipsEnabled = parameters.isFipsEnabled(); |
369 |
| - if (fipsEnabled) { |
370 |
| - LOG.debug("Enabling FIPS mode"); |
371 |
| - } |
372 |
| - // always setting it guarantees the value is non-null, |
373 |
| - // which tests expect. |
374 |
| - builder.fipsEnabled(fipsEnabled); |
375 |
| - |
376 |
| - if (endpoint != null) { |
377 |
| - boolean endpointEndsWithCentral = |
378 |
| - endpointStr.endsWith(CENTRAL_ENDPOINT); |
379 |
| - checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s", |
380 |
| - ERROR_ENDPOINT_WITH_FIPS, |
381 |
| - endpoint); |
382 |
| - |
383 |
| - // No region was configured, |
384 |
| - // determine the region from the endpoint. |
385 |
| - if (region == null) { |
386 |
| - region = getS3RegionFromEndpoint(endpointStr, |
387 |
| - endpointEndsWithCentral); |
388 |
| - if (region != null) { |
389 |
| - origin = "endpoint"; |
390 |
| - } |
391 |
| - } |
392 |
| - |
393 |
| - // No need to override endpoint with "s3.amazonaws.com". |
394 |
| - // Let the client take care of endpoint resolution. Overriding |
395 |
| - // the endpoint with "s3.amazonaws.com" causes 400 Bad Request |
396 |
| - // errors for non-existent buckets and objects. |
397 |
| - // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846 |
398 |
| - if (!endpointEndsWithCentral) { |
399 |
| - builder.endpointOverride(endpoint); |
400 |
| - LOG.debug("Setting endpoint to {}", endpoint); |
401 |
| - } else { |
402 |
| - origin = "central endpoint with cross region access"; |
403 |
| - LOG.debug("Enabling cross region access for endpoint {}", |
404 |
| - endpointStr); |
405 |
| - } |
406 |
| - } |
407 |
| - |
408 |
| - if (region != null) { |
409 |
| - builder.region(region); |
410 |
| - } else if (configuredRegion == null) { |
411 |
| - // no region is configured, and none could be determined from the endpoint. |
412 |
| - // Use US_EAST_2 as default. |
413 |
| - region = Region.of(AWS_S3_DEFAULT_REGION); |
414 |
| - builder.region(region); |
415 |
| - origin = "cross region access fallback"; |
416 |
| - } else if (configuredRegion.isEmpty()) { |
417 |
| - // region configuration was set to empty string. |
418 |
| - // allow this if people really want it; it is OK to rely on this |
419 |
| - // when deployed in EC2. |
420 |
| - WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE); |
421 |
| - LOG.debug(SDK_REGION_CHAIN_IN_USE); |
422 |
| - origin = "SDK region chain"; |
423 |
| - } |
424 |
| - boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, |
425 |
| - AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT); |
426 |
| - // s3 cross region access |
427 |
| - if (isCrossRegionAccessEnabled) { |
428 |
| - builder.crossRegionAccessEnabled(true); |
429 |
| - } |
430 |
| - LOG.debug("Setting region to {} from {} with cross region access {}", |
431 |
| - region, origin, isCrossRegionAccessEnabled); |
432 |
| - } |
433 |
| - |
434 |
| - /** |
435 |
| - * Given a endpoint string, create the endpoint URI. |
436 |
| - * |
437 |
| - * @param endpoint possibly null endpoint. |
438 |
| - * @param conf config to build the URI from. |
439 |
| - * @return an endpoint uri |
440 |
| - */ |
441 |
| - protected static URI getS3Endpoint(String endpoint, final Configuration conf) { |
442 |
| - |
443 |
| - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); |
444 |
| - |
445 |
| - String protocol = secureConnections ? "https" : "http"; |
446 |
| - |
447 |
| - if (endpoint == null || endpoint.isEmpty()) { |
448 |
| - // don't set an endpoint if none is configured, instead let the SDK figure it out. |
449 |
| - return null; |
450 |
| - } |
451 |
| - |
452 |
| - if (!endpoint.contains("://")) { |
453 |
| - endpoint = String.format("%s://%s", protocol, endpoint); |
454 |
| - } |
455 |
| - |
456 |
| - try { |
457 |
| - return new URI(endpoint); |
458 |
| - } catch (URISyntaxException e) { |
459 |
| - throw new IllegalArgumentException(e); |
460 |
| - } |
461 |
| - } |
462 |
| - |
463 |
| - /** |
464 |
| - * Parses the endpoint to get the region. |
465 |
| - * If endpoint is the central one, use US_EAST_2. |
466 |
| - * |
467 |
| - * @param endpoint the configure endpoint. |
468 |
| - * @param endpointEndsWithCentral true if the endpoint is configured as central. |
469 |
| - * @return the S3 region, null if unable to resolve from endpoint. |
470 |
| - */ |
471 |
| - @VisibleForTesting |
472 |
| - static Region getS3RegionFromEndpoint(final String endpoint, |
473 |
| - final boolean endpointEndsWithCentral) { |
474 |
| - |
475 |
| - if (!endpointEndsWithCentral) { |
476 |
| - // S3 VPC endpoint parsing |
477 |
| - Matcher matcher = VPC_ENDPOINT_PATTERN.matcher(endpoint); |
478 |
| - if (matcher.find()) { |
479 |
| - LOG.debug("Mapping to VPCE"); |
480 |
| - LOG.debug("Endpoint {} is vpc endpoint; parsing region as {}", endpoint, matcher.group(1)); |
481 |
| - return Region.of(matcher.group(1)); |
482 |
| - } |
483 |
| - |
484 |
| - LOG.debug("Endpoint {} is not the default; parsing", endpoint); |
485 |
| - return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null); |
486 |
| - } |
487 |
| - |
488 |
| - // Select default region here to enable cross-region access. |
489 |
| - // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty, |
490 |
| - // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com". |
491 |
| - // This applies to Spark versions with the changes of SPARK-35878. |
492 |
| - // ref: |
493 |
| - // https://github.com/apache/spark/blob/v3.5.0/core/ |
494 |
| - // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528 |
495 |
| - // If we do not allow cross region access, Spark would not be able to |
496 |
| - // access any bucket that is not present in the given region. |
497 |
| - // Hence, we should use default region us-east-2 to allow cross-region |
498 |
| - // access. |
499 |
| - return Region.of(AWS_S3_DEFAULT_REGION); |
500 |
| - } |
501 |
| - |
502 | 299 | private static <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
|
503 | 300 | maybeApplyS3AccessGrantsConfigurations(BuilderT builder, Configuration conf) {
|
504 | 301 | boolean isS3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);
|
|
0 commit comments