Skip to content

Commit

Permalink
[SPARK-45404][CORE] Support AWS_ENDPOINT_URL env variable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to support `AWS_ENDPOINT_URL` in addition to the existing `AWS_*` environment variables.

### Why are the changes needed?

To improve the usability. This is useful when we use S3-compatible object storages.
- https://docs.aws.amazon.com/sdkref/latest/guide/settings-reference.html#EVarSettings

### Does this PR introduce _any_ user-facing change?

This is a new feature. If a user have a misconfigured `AWS_ENDPOINT_URL`, it will hang or mostly be denied during the authentication.

### How was this patch tested?

Pass the CIs with newly updated test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#43205 from dongjoon-hyun/SPARK-45404.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
dongjoon-hyun committed Oct 4, 2023
1 parent a99f310 commit d5c8dfc
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ private[spark] object SparkHadoopUtil extends Logging {
* `EnvironmentVariableCredentialsProvider`; those are not propagated.
*/

/**
* AWS Endpoint URL.
*/
private[deploy] val ENV_VAR_AWS_ENDPOINT_URL = "AWS_ENDPOINT_URL"

/**
* AWS Access key.
*/
Expand Down Expand Up @@ -436,6 +441,7 @@ private[spark] object SparkHadoopUtil extends Logging {
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
appendS3CredentialsFromEnvironment(hadoopConf,
System.getenv(ENV_VAR_AWS_ENDPOINT_URL),
System.getenv(ENV_VAR_AWS_ACCESS_KEY),
System.getenv(ENV_VAR_AWS_SECRET_KEY),
System.getenv(ENV_VAR_AWS_SESSION_TOKEN))
Expand Down Expand Up @@ -463,6 +469,7 @@ private[spark] object SparkHadoopUtil extends Logging {
// Exposed for testing
private[deploy] def appendS3CredentialsFromEnvironment(
hadoopConf: Configuration,
endpointUrl: String,
keyId: String,
accessKey: String,
sessionToken: String): Unit = {
Expand All @@ -476,6 +483,10 @@ private[spark] object SparkHadoopUtil extends Logging {
hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey, source + ENV_VAR_AWS_SECRET_KEY)
hadoopConf.set("fs.s3a.secret.key", accessKey, source + ENV_VAR_AWS_SECRET_KEY)

if (endpointUrl != null) {
hadoopConf.set("fs.s3a.endpoint", endpointUrl, source + ENV_VAR_AWS_ENDPOINT_URL)
}

// look for session token if the other variables were set
if (sessionToken != null) {
hadoopConf.set("fs.s3a.session.token", sessionToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,28 @@ class SparkHadoopUtilSuite extends SparkFunSuite {
test("SPARK-40640: aws credentials from environment variables") {
val hadoopConf = new Configuration(false)
SparkHadoopUtil.appendS3CredentialsFromEnvironment(hadoopConf,
"access-key", "secret-key", "session-token")
"endpoint", "access-key", "secret-key", "session-token")
val source = "Set by Spark on " + InetAddress.getLocalHost + " from "
assertConfigMatches(hadoopConf, "fs.s3a.endpoint", "endpoint", source)
assertConfigMatches(hadoopConf, "fs.s3a.access.key", "access-key", source)
assertConfigMatches(hadoopConf, "fs.s3a.secret.key", "secret-key", source)
assertConfigMatches(hadoopConf, "fs.s3a.session.token", "session-token", source)
}

test("SPARK-19739: S3 session token propagation requires access and secret keys") {
val hadoopConf = new Configuration(false)
SparkHadoopUtil.appendS3CredentialsFromEnvironment(hadoopConf, null, null, "session-token")
SparkHadoopUtil.appendS3CredentialsFromEnvironment(
hadoopConf, null, null, null, "session-token")
assertConfigValue(hadoopConf, "fs.s3a.session.token", null)
}

test("SPARK-45404: aws endpoint propagation requires access and secret keys") {
val hadoopConf = new Configuration(false)
SparkHadoopUtil.appendS3CredentialsFromEnvironment(
hadoopConf, "endpoint", null, null, null)
assertConfigValue(hadoopConf, "fs.s3a.endpoint", null)
}

test("substituteHadoopVariables") {
val hadoopConf = new Configuration(false)
hadoopConf.set("xxx", "yyy")
Expand Down
2 changes: 1 addition & 1 deletion docs/cloud-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ for talking to cloud infrastructures, in which case this module may not be neede
Spark jobs must authenticate with the object stores to access data within them.

1. When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
1. `spark-submit` reads the `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`
1. `spark-submit` is able to read the `AWS_ENDPOINT_URL`, `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`
and `AWS_SESSION_TOKEN` environment variables and sets the associated authentication options
for the `s3n` and `s3a` connectors to Amazon S3.
1. In a Hadoop cluster, settings may be set in the `core-site.xml` file.
Expand Down
2 changes: 1 addition & 1 deletion hadoop-cloud/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ run the test against. Those configs are passed as environment variables and the
variables must be checked by the test.
Like for `AwsS3AbortableStreamBasedCheckpointFileManagerSuite` the S3 bucket used for testing
is passed in the `S3A_PATH` and the credetinals to access AWS S3 are AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY (in addition you can define an optional AWS_SESSION_TOKEN too).
AWS_SECRET_ACCESS_KEY (in addition you can define optional AWS_SESSION_TOKEN and AWS_ENDPOINT_URL too).

0 comments on commit d5c8dfc

Please sign in to comment.