From cf09f9ab0a93bb75626daa8c098c47cdfb545e1e Mon Sep 17 00:00:00 2001 From: FANNG Date: Fri, 25 Oct 2024 14:16:51 +0800 Subject: [PATCH] [#5068] feat(core): support GCS token provider (#5224) ### What changes were proposed in this pull request? support GCS token provider ### Why are the changes needed? Fix: #5068 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add IT and run with real google acount --- LICENSE.bin | 3 + .../credential/GCSTokenCredential.java | 73 ++++++ bundles/gcp-bundle/build.gradle.kts | 11 +- .../gcs/credential/GCSTokenProvider.java | 218 ++++++++++++++++++ ...he.gravitino.credential.CredentialProvider | 19 ++ .../services/org.apache.hadoop.fs.FileSystem | 20 ++ .../credential/CredentialConstants.java | 2 + .../credential/CredentialPropertyUtils.java | 25 +- .../config/GCSCredentialConfig.java | 51 ++++ gradle/libs.versions.toml | 5 + iceberg/iceberg-rest-server/build.gradle.kts | 2 + .../integration/test/IcebergRESTGCSIT.java | 108 +++++++++ .../test/IcebergRESTJdbcCatalogIT.java | 24 +- .../test/IcebergRESTServiceBaseIT.java | 56 ++++- .../integration/test/util/BaseIT.java | 37 +-- .../integration/test/util/ITUtils.java | 16 ++ settings.gradle.kts | 3 +- 17 files changed, 634 insertions(+), 39 deletions(-) create mode 100644 api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java create mode 100644 bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java create mode 100644 bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider create mode 100644 bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem create mode 100644 core/src/main/java/org/apache/gravitino/credential/config/GCSCredentialConfig.java create mode 100644 iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTGCSIT.java diff --git a/LICENSE.bin b/LICENSE.bin index e922f936771..1bdb9864d2f 100644 --- a/LICENSE.bin +++ b/LICENSE.bin @@ -306,6 +306,7 @@ Apache Iceberg core Apache Iceberg Hive metastore Apache Iceberg GCP + Apache Iceberg GCP bundle Apache Ivy Apache Log4j 1.x Compatibility API Apache Log4j API @@ -398,6 +399,8 @@ RE2/J ZSTD JNI fsspec + Google auth HTTP + Google auth Credentials This product bundles various third-party components also under the MIT license diff --git a/api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java b/api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java new file mode 100644 index 00000000000..98186e2dea7 --- /dev/null +++ b/api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.credential; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; + +/** The GCS token credential to access GCS. */ +public class GCSTokenCredential implements Credential { + + /** GCS credential type. */ + public static final String GCS_TOKEN_CREDENTIAL_TYPE = "gcs-token"; + + /** GCS credential property, token name. */ + public static final String GCS_TOKEN_NAME = "token"; + + private String token; + private long expireMs; + + /** + * @param token The GCS token. + * @param expireMs The GCS token expire time at ms. + */ + public GCSTokenCredential(String token, long expireMs) { + Preconditions.checkArgument( + StringUtils.isNotBlank(token), "GCS session token should not be null"); + this.token = token; + this.expireMs = expireMs; + } + + @Override + public String credentialType() { + return GCS_TOKEN_CREDENTIAL_TYPE; + } + + @Override + public long expireTimeInMs() { + return expireMs; + } + + @Override + public Map credentialInfo() { + return (new ImmutableMap.Builder()).put(GCS_TOKEN_NAME, token).build(); + } + + /** + * Get GCS token. + * + * @return The GCS token. + */ + public String token() { + return token; + } +} diff --git a/bundles/gcp-bundle/build.gradle.kts b/bundles/gcp-bundle/build.gradle.kts index 6b373578c9d..e69ff345ea8 100644 --- a/bundles/gcp-bundle/build.gradle.kts +++ b/bundles/gcp-bundle/build.gradle.kts @@ -25,9 +25,17 @@ plugins { } dependencies { + compileOnly(project(":api")) + compileOnly(project(":core")) + compileOnly(project(":catalogs:catalog-common")) compileOnly(project(":catalogs:catalog-hadoop")) + compileOnly(libs.hadoop3.common) + + implementation(libs.commons.lang3) implementation(libs.hadoop3.gcs) + implementation(libs.google.auth.http) + implementation(libs.google.auth.credentials) } tasks.withType(ShadowJar::class.java) { @@ -38,8 +46,7 @@ tasks.withType(ShadowJar::class.java) { // Relocate dependencies to avoid conflicts relocate("org.apache.httpcomponents", "org.apache.gravitino.shaded.org.apache.httpcomponents") relocate("org.apache.commons", "org.apache.gravitino.shaded.org.apache.commons") - relocate("com.google.guava", "org.apache.gravitino.shaded.com.google.guava") - relocate("com.google.code", "org.apache.gravitino.shaded.com.google.code") + relocate("com.google", "org.apache.gravitino.shaded.com.google") } tasks.jar { diff --git a/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java new file mode 100644 index 00000000000..94234b2d98e --- /dev/null +++ b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.gcs.credential; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.CredentialAccessBoundary; +import com.google.auth.oauth2.CredentialAccessBoundary.AccessBoundaryRule; +import com.google.auth.oauth2.DownscopedCredentials; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.CredentialConstants; +import org.apache.gravitino.credential.CredentialContext; +import org.apache.gravitino.credential.CredentialProvider; +import org.apache.gravitino.credential.GCSTokenCredential; +import org.apache.gravitino.credential.PathBasedCredentialContext; +import org.apache.gravitino.credential.config.GCSCredentialConfig; + +/** Generate GCS access token according to the read and write paths. */ +public class GCSTokenProvider implements CredentialProvider { + + private static final String INITIAL_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; + + private GoogleCredentials sourceCredentials; + + @Override + public void initialize(Map properties) { + GCSCredentialConfig gcsCredentialConfig = new GCSCredentialConfig(properties); + try { + this.sourceCredentials = + getSourceCredentials(gcsCredentialConfig).createScoped(INITIAL_SCOPE); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() {} + + @Override + public String credentialType() { + return CredentialConstants.GCS_TOKEN_CREDENTIAL_PROVIDER_TYPE; + } + + @Override + public Credential getCredential(CredentialContext context) { + if (!(context instanceof PathBasedCredentialContext)) { + return null; + } + PathBasedCredentialContext pathBasedCredentialContext = (PathBasedCredentialContext) context; + try { + AccessToken accessToken = + getToken( + pathBasedCredentialContext.getReadPaths(), + pathBasedCredentialContext.getWritePaths()); + String tokenValue = accessToken.getTokenValue(); + long expireTime = accessToken.getExpirationTime().toInstant().toEpochMilli(); + return new GCSTokenCredential(tokenValue, expireTime); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private AccessToken getToken(Set readLocations, Set writeLocations) + throws IOException { + DownscopedCredentials downscopedCredentials = + DownscopedCredentials.newBuilder() + .setSourceCredential(sourceCredentials) + .setCredentialAccessBoundary(getAccessBoundary(readLocations, writeLocations)) + .build(); + return downscopedCredentials.refreshAccessToken(); + } + + private CredentialAccessBoundary getAccessBoundary( + Set readLocations, Set writeLocations) { + // bucketName -> read resource expressions + Map> readExpressions = new HashMap<>(); + // bucketName -> write resource expressions + Map> writeExpressions = new HashMap<>(); + + // Construct read and write resource expressions + HashSet readBuckets = new HashSet<>(); + HashSet writeBuckets = new HashSet<>(); + Stream.concat(readLocations.stream(), writeLocations.stream()) + .distinct() + .forEach( + location -> { + URI uri = URI.create(location); + String bucketName = getBucketName(uri); + readBuckets.add(bucketName); + String resourcePath = uri.getPath().substring(1); + List resourceExpressions = + readExpressions.computeIfAbsent(bucketName, key -> new ArrayList<>()); + // add read privilege + resourceExpressions.add( + String.format( + "resource.name.startsWith('projects/_/buckets/%s/objects/%s')", + bucketName, resourcePath)); + // add list privilege + resourceExpressions.add( + String.format( + "api.getAttribute('storage.googleapis.com/objectListPrefix', '').startsWith('%s')", + resourcePath)); + if (writeLocations.contains(location)) { + writeBuckets.add(bucketName); + resourceExpressions = + writeExpressions.computeIfAbsent(bucketName, key -> new ArrayList<>()); + // add write privilege + resourceExpressions.add( + String.format( + "resource.name.startsWith('projects/_/buckets/%s/objects/%s')", + bucketName, resourcePath)); + } + }); + + // Construct policy according to the resource expression and privilege. + CredentialAccessBoundary.Builder credentialAccessBoundaryBuilder = + CredentialAccessBoundary.newBuilder(); + readBuckets.forEach( + bucket -> { + List readConditions = readExpressions.get(bucket); + AccessBoundaryRule rule = + getAccessBoundaryRule( + bucket, + readConditions, + Arrays.asList( + "inRole:roles/storage.legacyObjectReader", + "inRole:roles/storage.objectViewer")); + if (rule == null) { + return; + } + credentialAccessBoundaryBuilder.addRule(rule); + }); + + writeBuckets.forEach( + bucket -> { + List writeConditions = writeExpressions.get(bucket); + AccessBoundaryRule rule = + getAccessBoundaryRule( + bucket, + writeConditions, + Arrays.asList("inRole:roles/storage.legacyBucketWriter")); + if (rule == null) { + return; + } + credentialAccessBoundaryBuilder.addRule(rule); + }); + + return credentialAccessBoundaryBuilder.build(); + } + + private AccessBoundaryRule getAccessBoundaryRule( + String bucketName, List resourceExpression, List permissions) { + if (resourceExpression == null || resourceExpression.isEmpty()) { + return null; + } + CredentialAccessBoundary.AccessBoundaryRule.Builder builder = + CredentialAccessBoundary.AccessBoundaryRule.newBuilder(); + builder.setAvailableResource(toGCSBucketResource(bucketName)); + builder.setAvailabilityCondition( + CredentialAccessBoundary.AccessBoundaryRule.AvailabilityCondition.newBuilder() + .setExpression(String.join(" || ", resourceExpression)) + .build()); + builder.setAvailablePermissions(permissions); + return builder.build(); + } + + private static String toGCSBucketResource(String bucketName) { + return "//storage.googleapis.com/projects/_/buckets/" + bucketName; + } + + private static String getBucketName(URI uri) { + return uri.getHost(); + } + + private GoogleCredentials getSourceCredentials(GCSCredentialConfig gcsCredentialConfig) + throws IOException { + String gcsCredentialFilePath = gcsCredentialConfig.gcsCredentialFilePath(); + if (StringUtils.isBlank(gcsCredentialFilePath)) { + return GoogleCredentials.getApplicationDefault(); + } else { + File credentialsFile = new File(gcsCredentialFilePath); + if (!credentialsFile.exists()) { + throw new IOException("GCS credential file does not exist." + gcsCredentialFilePath); + } + return GoogleCredentials.fromStream(new FileInputStream(credentialsFile)); + } + } +} diff --git a/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider new file mode 100644 index 00000000000..69510490549 --- /dev/null +++ b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.gravitino.gcs.credential.GCSTokenProvider diff --git a/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 00000000000..e67410de7b3 --- /dev/null +++ b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.gravitino.shaded.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java index 596268395e3..a141b637eba 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java @@ -22,5 +22,7 @@ public class CredentialConstants { public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type"; + public static final String GCS_TOKEN_CREDENTIAL_PROVIDER_TYPE = "gcs-token"; + private CredentialConstants() {} } diff --git a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java index 255e54fbf3d..e380cc5d44b 100644 --- a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java +++ b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java @@ -19,12 +19,17 @@ package org.apache.gravitino.credential; +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; import java.util.Map; /** * Helper class to generate specific credential properties for different table format and engine. */ public class CredentialPropertyUtils { + private static Map icebergCredentialPropertyMap = + ImmutableMap.of(GCSTokenCredential.GCS_TOKEN_NAME, "gcs.oauth2.token"); + /** * Transforms a specific credential into a map of Iceberg properties. * @@ -32,7 +37,25 @@ public class CredentialPropertyUtils { * @return a map of Iceberg properties derived from the credential */ public static Map toIcebergProperties(Credential credential) { - // todo: transform specific credential to iceberg properties + if (credential instanceof GCSTokenCredential) { + Map icebergGCSCredentialProperties = + transformProperties(credential.credentialInfo(), icebergCredentialPropertyMap); + icebergGCSCredentialProperties.put( + "gcs.oauth2.token-expires-at", String.valueOf(credential.expireTimeInMs())); + return icebergGCSCredentialProperties; + } return credential.toProperties(); } + + private static Map transformProperties( + Map originProperties, Map transformMap) { + HashMap properties = new HashMap(); + originProperties.forEach( + (k, v) -> { + if (transformMap.containsKey(k)) { + properties.put(transformMap.get(k), v); + } + }); + return properties; + } } diff --git a/core/src/main/java/org/apache/gravitino/credential/config/GCSCredentialConfig.java b/core/src/main/java/org/apache/gravitino/credential/config/GCSCredentialConfig.java new file mode 100644 index 00000000000..1a2b38ef641 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/credential/config/GCSCredentialConfig.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.credential.config; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.gravitino.Config; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; + +public class GCSCredentialConfig extends Config { + + @VisibleForTesting + public static final String GRAVITINO_GCS_CREDENTIAL_FILE_PATH = "gcs-credential-file-path"; + + public static final ConfigEntry GCS_CREDENTIAL_FILE_PATH = + new ConfigBuilder(GRAVITINO_GCS_CREDENTIAL_FILE_PATH) + .doc("The path of GCS credential file") + .version(ConfigConstants.VERSION_0_7_0) + .stringConf() + .create(); + + public GCSCredentialConfig(Map properties) { + super(false); + loadFromMap(properties, k -> true); + } + + @Nullable + public String gcsCredentialFilePath() { + return this.get(GCS_CREDENTIAL_FILE_PATH); + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 472be136cd2..830fe5e747c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -107,6 +107,7 @@ datanucleus-api-jdo = "4.2.4" datanucleus-rdbms = "4.1.19" datanucleus-jdo = "3.2.0-m3" hudi = "0.15.0" +google-auth = "1.28.0" [libraries] protobuf-java = { group = "com.google.protobuf", name = "protobuf-java", version.ref = "protoc" } @@ -180,6 +181,7 @@ iceberg-core = { group = "org.apache.iceberg", name = "iceberg-core", version.re iceberg-api = { group = "org.apache.iceberg", name = "iceberg-api", version.ref = "iceberg" } iceberg-hive-metastore = { group = "org.apache.iceberg", name = "iceberg-hive-metastore", version.ref = "iceberg" } iceberg-gcp = { group = "org.apache.iceberg", name = "iceberg-gcp", version.ref = "iceberg" } +iceberg-gcp-bundle = { group = "org.apache.iceberg", name = "iceberg-gcp-bundle", version.ref = "iceberg" } paimon-core = { group = "org.apache.paimon", name = "paimon-core", version.ref = "paimon" } paimon-format = { group = "org.apache.paimon", name = "paimon-format", version.ref = "paimon" } paimon-hive-catalog = { group = "org.apache.paimon", name = "paimon-hive-catalog", version.ref = "paimon" } @@ -246,6 +248,9 @@ mail = { group = "javax.mail", name = "mail", version.ref = "mail" } rome = { group = "rome", name = "rome", version.ref = "rome" } jettison = { group = "org.codehaus.jettison", name = "jettison", version.ref = "jettison" } +google-auth-http = { group = "com.google.auth", name = "google-auth-library-oauth2-http", version.ref = "google-auth" } +google-auth-credentials = { group = "com.google.auth", name = "google-auth-library-credentials", version.ref = "google-auth" } + [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"] jetty = ["jetty-server", "jetty-servlet", "jetty-webapp", "jetty-servlets"] diff --git a/iceberg/iceberg-rest-server/build.gradle.kts b/iceberg/iceberg-rest-server/build.gradle.kts index 594e6d04208..f088ce2926d 100644 --- a/iceberg/iceberg-rest-server/build.gradle.kts +++ b/iceberg/iceberg-rest-server/build.gradle.kts @@ -63,6 +63,7 @@ dependencies { compileOnly(libs.lombok) + testImplementation(project(":bundles:gcp-bundle", configuration = "shadow")) testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") @@ -75,6 +76,7 @@ dependencies { exclude("org.rocksdb") } + testImplementation(libs.iceberg.gcp.bundle) testImplementation(libs.jersey.test.framework.core) { exclude(group = "org.junit.jupiter") } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTGCSIT.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTGCSIT.java new file mode 100644 index 00000000000..89f56c51774 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTGCSIT.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.integration.test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.credential.CredentialConstants; +import org.apache.gravitino.credential.config.GCSCredentialConfig; +import org.apache.gravitino.iceberg.common.IcebergConfig; +import org.apache.gravitino.integration.test.util.BaseIT; +import org.apache.gravitino.integration.test.util.DownloaderUtils; +import org.apache.gravitino.integration.test.util.ITUtils; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; + +// You should export GRAVITINO_GCS_BUCKET and GOOGLE_APPLICATION_CREDENTIALS to run the test +@EnabledIfEnvironmentVariable(named = "GRAVITINO_TEST_CLOUD_IT", matches = "true") +public class IcebergRESTGCSIT extends IcebergRESTJdbcCatalogIT { + private String gcsWarehouse; + private String gcsCredentialPath; + + @Override + void initEnv() { + this.gcsWarehouse = + String.format("gs://%s/test", getFromEnvOrDefault("GRAVITINO_GCS_BUCKET", "bucketName")); + this.gcsCredentialPath = + getFromEnvOrDefault("GOOGLE_APPLICATION_CREDENTIALS", "credential.json"); + if (ITUtils.isEmbedded()) { + return; + } + + try { + downloadIcebergBundleJar(); + } catch (IOException e) { + throw new RuntimeException(e); + } + copyGCSBundleJar(); + } + + @Override + public Map getCatalogConfig() { + HashMap m = new HashMap(); + m.putAll(getCatalogJdbcConfig()); + m.putAll(getGCSConfig()); + return m; + } + + public boolean supportsCredentialVending() { + return true; + } + + private Map getGCSConfig() { + Map configMap = new HashMap(); + + configMap.put( + IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDER_TYPE, + CredentialConstants.GCS_TOKEN_CREDENTIAL_PROVIDER_TYPE); + configMap.put( + IcebergConfig.ICEBERG_CONFIG_PREFIX + + GCSCredentialConfig.GRAVITINO_GCS_CREDENTIAL_FILE_PATH, + gcsCredentialPath); + configMap.put( + IcebergConfig.ICEBERG_CONFIG_PREFIX + IcebergConstants.IO_IMPL, + "org.apache.iceberg.gcp.gcs.GCSFileIO"); + configMap.put(IcebergConfig.ICEBERG_CONFIG_PREFIX + IcebergConstants.WAREHOUSE, gcsWarehouse); + return configMap; + } + + private void copyGCSBundleJar() { + String gravitinoHome = System.getenv("GRAVITINO_HOME"); + String targetDir = String.format("%s/iceberg-rest-server/libs/", gravitinoHome); + BaseIT.copyBundleJarsToDirectory("gcp-bundle", targetDir); + } + + private void downloadIcebergBundleJar() throws IOException { + String icebergBundleJarName = "iceberg-gcp-bundle-1.5.2.jar"; + String icebergBundleJarUri = + "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-gcp-bundle/1.5.2/" + + icebergBundleJarName; + String gravitinoHome = System.getenv("GRAVITINO_HOME"); + String targetDir = String.format("%s/iceberg-rest-server/libs/", gravitinoHome); + DownloaderUtils.downloadFile(icebergBundleJarUri, targetDir); + } + + private String getFromEnvOrDefault(String envVar, String defaultValue) { + String envValue = System.getenv(envVar); + return Optional.ofNullable(envValue).orElse(defaultValue); + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTJdbcCatalogIT.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTJdbcCatalogIT.java index d53f8022091..c235451f2ff 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTJdbcCatalogIT.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTJdbcCatalogIT.java @@ -33,7 +33,9 @@ @Tag("gravitino-docker-test") @TestInstance(Lifecycle.PER_CLASS) public class IcebergRESTJdbcCatalogIT extends IcebergRESTServiceIT { + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + private boolean hiveStarted = false; public IcebergRESTJdbcCatalogIT() { catalogType = IcebergCatalogBackend.JDBC; @@ -42,9 +44,15 @@ public IcebergRESTJdbcCatalogIT() { @Override void initEnv() { containerSuite.startHiveContainer(); + hiveStarted = true; } + @Override public Map getCatalogConfig() { + return getCatalogJdbcConfig(); + } + + protected Map getCatalogJdbcConfig() { Map configMap = new HashMap<>(); configMap.put( @@ -70,13 +78,15 @@ public Map getCatalogConfig() { configMap.put(IcebergConfig.ICEBERG_CONFIG_PREFIX + "jdbc.schema-version", "V1"); - configMap.put( - IcebergConfig.ICEBERG_CONFIG_PREFIX + IcebergConfig.CATALOG_WAREHOUSE.getKey(), - GravitinoITUtils.genRandomName( - String.format( - "hdfs://%s:%d/user/hive/warehouse-jdbc-sqlite", - containerSuite.getHiveContainer().getContainerIpAddress(), - HiveContainer.HDFS_DEFAULTFS_PORT))); + if (hiveStarted) { + configMap.put( + IcebergConfig.ICEBERG_CONFIG_PREFIX + IcebergConfig.CATALOG_WAREHOUSE.getKey(), + GravitinoITUtils.genRandomName( + String.format( + "hdfs://%s:%d/user/hive/warehouse-jdbc-sqlite", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT))); + } return configMap; } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java index 0ba781cabd8..67e7a3b8fd8 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java @@ -20,6 +20,8 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.FormatMethod; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -27,11 +29,14 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.iceberg.common.IcebergCatalogBackend; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.integration.test.util.IcebergRESTServerManager; +import org.apache.gravitino.integration.test.util.ITUtils; import org.apache.gravitino.server.web.JettyServerConfig; +import org.apache.spark.SparkConf; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; @@ -46,6 +51,7 @@ @SuppressWarnings("FormatStringAnnotation") public abstract class IcebergRESTServiceBaseIT { + public static final Logger LOG = LoggerFactory.getLogger(IcebergRESTServiceBaseIT.class); private SparkSession sparkSession; protected IcebergCatalogBackend catalogType = IcebergCatalogBackend.MEMORY; @@ -84,6 +90,31 @@ boolean isSupportsViewCatalog() { abstract Map getCatalogConfig(); + protected boolean supportsCredentialVending() { + return false; + } + + private void copyBundleJar(String bundleName) { + String bundleFileName = ITUtils.getBundleJarName(bundleName); + + String rootDir = System.getenv("GRAVITINO_ROOT_DIR"); + String sourceFile = + String.format("%s/bundles/gcp-bundle/build/libs/%s", rootDir, bundleFileName); + String gravitinoHome = System.getenv("GRAVITINO_HOME"); + String targetDir = String.format("%s/iceberg-rest-server/libs/", gravitinoHome); + String targetFile = String.format("%s/%s", targetDir, bundleFileName); + LOG.info("Source file: {}, target directory: {}", sourceFile, targetDir); + try { + File target = new File(targetFile); + if (!target.exists()) { + LOG.info("Copy source file: {} to target directory: {}", sourceFile, targetDir); + FileUtils.copyFileToDirectory(new File(sourceFile), new File(targetDir)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private void registerIcebergCatalogConfig() { Map icebergConfigs = getCatalogConfig(); icebergRESTServerManager.registerCustomConfigs(icebergConfigs); @@ -100,19 +131,24 @@ private int getServerPort() { private void initSparkEnv() { int port = getServerPort(); LOG.info("Iceberg REST server port:{}", port); - String IcebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/", port); - sparkSession = - SparkSession.builder() - .master("local[1]") - .config( + String icebergRESTUri = String.format("http://127.0.0.1:%d/iceberg/", port); + SparkConf sparkConf = + new SparkConf() + .set( "spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.catalog.rest.type", "rest") - .config("spark.sql.catalog.rest.uri", IcebergRESTUri) + .set("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") + .set("spark.sql.catalog.rest.type", "rest") + .set("spark.sql.catalog.rest.uri", icebergRESTUri) // drop Iceberg table purge may hang in spark local mode - .config("spark.locality.wait.node", "0") - .getOrCreate(); + .set("spark.locality.wait.node", "0"); + + if (supportsCredentialVending()) { + sparkConf.set( + "spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation", "vended-credentials"); + } + + sparkSession = SparkSession.builder().master("local[1]").config(sparkConf).getOrCreate(); } private void stopSparkEnv() { diff --git a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java index 8bbb5a3b23f..e7ed483f2f5 100644 --- a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java +++ b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java @@ -75,6 +75,7 @@ @ExtendWith({PrintFuncNameExtension.class, CloseContainerExtension.class}) @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class BaseIT { + protected static final ContainerSuite containerSuite = ContainerSuite.getInstance(); private static final Logger LOG = LoggerFactory.getLogger(BaseIT.class); @@ -127,7 +128,9 @@ private void rewriteGravitinoServerConfig() throws IOException { originConfig = FileUtils.readFileToString(configPath.toFile(), StandardCharsets.UTF_8); } - if (customConfigs.isEmpty()) return; + if (customConfigs.isEmpty()) { + return; + } String tmpFileName = GravitinoServer.CONF_FILE + ".tmp"; Path tmpPath = Paths.get(gravitinoHome, "conf", tmpFileName); @@ -397,26 +400,26 @@ private static boolean isDeploy() { return Objects.equals(mode, ITUtils.DEPLOY_TEST_MODE); } - protected void copyBundleJarsToHadoop(String bundleName) { + public static void copyBundleJarsToDirectory(String bundleName, String directory) { + String bundleJarSourceFile = ITUtils.getBundleJarSourceFile(bundleName); + try { + DownloaderUtils.downloadFile(bundleJarSourceFile, directory); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to copy the %s dependency jars: %s to %s", + bundleName, bundleJarSourceFile, directory), + e); + } + } + + protected static void copyBundleJarsToHadoop(String bundleName) { if (!isDeploy()) { return; } String gravitinoHome = System.getenv("GRAVITINO_HOME"); - String jarName = - String.format("gravitino-%s-%s.jar", bundleName, System.getenv("PROJECT_VERSION")); - String gcsJars = - ITUtils.joinPath( - gravitinoHome, "..", "..", "bundles", bundleName, "build", "libs", jarName); - gcsJars = "file://" + gcsJars; - try { - if (!ITUtils.EMBEDDED_TEST_MODE.equals(testMode)) { - String hadoopLibDirs = ITUtils.joinPath(gravitinoHome, "catalogs", "hadoop", "libs"); - DownloaderUtils.downloadFile(gcsJars, hadoopLibDirs); - } - } catch (Exception e) { - throw new RuntimeException( - String.format("Failed to copy the %s dependency jars: %s", bundleName, gcsJars), e); - } + String hadoopLibDirs = ITUtils.joinPath(gravitinoHome, "catalogs", "hadoop", "libs"); + copyBundleJarsToDirectory(bundleName, hadoopLibDirs); } } diff --git a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java index 9a6d7b13010..d7c099dc7ac 100644 --- a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java +++ b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java @@ -48,6 +48,7 @@ import org.junit.jupiter.api.Assertions; public class ITUtils { + public static final String TEST_MODE = "testMode"; public static final String EMBEDDED_TEST_MODE = "embedded"; public static final String DEPLOY_TEST_MODE = "deploy"; @@ -186,5 +187,20 @@ public static boolean isEmbedded() { return Objects.equals(mode, ITUtils.EMBEDDED_TEST_MODE); } + public static String getBundleJarSourceFile(String bundleName) { + String jarName = ITUtils.getBundleJarName(bundleName); + String gcsJars = ITUtils.joinPath(ITUtils.getBundleJarDirectory(bundleName), jarName); + return "file://" + gcsJars; + } + + public static String getBundleJarName(String bundleName) { + return String.format("gravitino-%s-%s.jar", bundleName, System.getenv("PROJECT_VERSION")); + } + + public static String getBundleJarDirectory(String bundleName) { + return ITUtils.joinPath( + System.getenv("GRAVITINO_ROOT_DIR"), "bundles", bundleName, "build", "libs"); + } + private ITUtils() {} } diff --git a/settings.gradle.kts b/settings.gradle.kts index 2eb340baad3..1f3efb49544 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -73,5 +73,4 @@ include("docs") include("integration-test-common") include(":bundles:aws-bundle") include(":bundles:gcp-bundle") -include("bundles:aliyun-bundle") -findProject(":bundles:aliyun-bundle")?.name = "aliyun-bundle" +include(":bundles:aliyun-bundle")