From 06f469a28fd953939e25042eb1801cb2dbc86ba5 Mon Sep 17 00:00:00 2001 From: FANNG Date: Wed, 29 May 2024 19:17:50 +0800 Subject: [PATCH] [#3555] feat(spark-connector): support multi scala version (#3608) ### What changes were proposed in this pull request? support multi scala version ### Why are the changes needed? Fix: #3555 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? 1. existing IT and UT 2. in my local machine, tested spark sqls on spark3.4-2.13 and spark3.5-2.13 --- .github/workflows/spark-integration-test.yml | 9 ++++++--- settings.gradle.kts | 14 +++++++++++--- spark-connector/spark-common/build.gradle.kts | 3 ++- .../spark/connector/SparkTransformConverter.java | 2 ++ .../connector/TestSparkTransformConverter.java | 2 ++ 5 files changed, 23 insertions(+), 7 deletions(-) diff --git a/.github/workflows/spark-integration-test.yml b/.github/workflows/spark-integration-test.yml index f73a366ad78..39bf66c5876 100644 --- a/.github/workflows/spark-integration-test.yml +++ b/.github/workflows/spark-integration-test.yml @@ -57,6 +57,7 @@ jobs: matrix: architecture: [linux/amd64] java-version: [ 8, 11, 17 ] + scala-version: [ 2.12 ] test-mode: [ embedded, deploy ] env: PLATFORM: ${{ matrix.architecture }} @@ -91,9 +92,11 @@ jobs: - name: Spark Integration Test id: integrationTest run: | - ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" - ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" - ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + if [ "${{ matrix.scala-version }}" == "2.12" ];then + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + fi + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" - name: Upload integrate tests reports uses: actions/upload-artifact@v3 diff --git a/settings.gradle.kts b/settings.gradle.kts index 98811d73380..3d6b64def50 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -8,6 +8,9 @@ plugins { rootProject.name = "gravitino" +val scalaVersion: String = gradle.startParameter.projectProperties["scalaVersion"]?.toString() + ?: settings.extra["defaultScalaVersion"].toString() + include("api", "common", "core", "meta", "server", "integration-test", "server-common") include("catalogs:bundled-catalog") include("catalogs:catalog-hive") @@ -28,9 +31,14 @@ include( "clients:client-python" ) include("trino-connector") -include("spark-connector:spark-common", "spark-connector:spark-3.3", "spark-connector:spark-runtime-3.3", "spark-connector:spark-3.4", "spark-connector:spark-runtime-3.4", "spark-connector:spark-3.5", "spark-connector:spark-runtime-3.5") -project(":spark-connector:spark-3.3").projectDir = file("spark-connector/v3.3/spark") -project(":spark-connector:spark-runtime-3.3").projectDir = file("spark-connector/v3.3/spark-runtime") +include("spark-connector:spark-common") +// kyuubi hive connector doesn't support 2.13 for Spark3.3 +if (scalaVersion == "2.12") { + include("spark-connector:spark-3.3", "spark-connector:spark-runtime-3.3") + project(":spark-connector:spark-3.3").projectDir = file("spark-connector/v3.3/spark") + project(":spark-connector:spark-runtime-3.3").projectDir = file("spark-connector/v3.3/spark-runtime") +} +include("spark-connector:spark-3.4", "spark-connector:spark-runtime-3.4", "spark-connector:spark-3.5", "spark-connector:spark-runtime-3.5") project(":spark-connector:spark-3.4").projectDir = file("spark-connector/v3.4/spark") project(":spark-connector:spark-runtime-3.4").projectDir = file("spark-connector/v3.4/spark-runtime") project(":spark-connector:spark-3.5").projectDir = file("spark-connector/v3.5/spark") diff --git a/spark-connector/spark-common/build.gradle.kts b/spark-connector/spark-common/build.gradle.kts index 3b485942f28..804516b822b 100644 --- a/spark-connector/spark-common/build.gradle.kts +++ b/spark-connector/spark-common/build.gradle.kts @@ -17,7 +17,8 @@ val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extr val sparkVersion: String = libs.versions.spark33.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val icebergVersion: String = libs.versions.iceberg4spark.get() -val kyuubiVersion: String = libs.versions.kyuubi4spark33.get() +// kyuubi hive connector for Spark 3.3 doesn't support scala 2.13 +val kyuubiVersion: String = libs.versions.kyuubi4spark34.get() val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() diff --git a/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java index 033ac6d0f3a..8be80454745 100644 --- a/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java @@ -239,6 +239,7 @@ public org.apache.spark.sql.connector.expressions.Transform[] toSparkTransform( return sparkTransforms.toArray(new org.apache.spark.sql.connector.expressions.Transform[0]); } + @SuppressWarnings("deprecation") private static Distribution toGravitinoDistribution(BucketTransform bucketTransform) { int bucketNum = (Integer) bucketTransform.numBuckets().value(); Expression[] expressions = @@ -249,6 +250,7 @@ private static Distribution toGravitinoDistribution(BucketTransform bucketTransf } // Spark datasourceV2 doesn't support specify sort order direction, use ASCENDING as default. + @SuppressWarnings("deprecation") private static Pair toGravitinoDistributionAndSortOrders( SortedBucketTransform sortedBucketTransform) { int bucketNum = (Integer) sortedBucketTransform.numBuckets().value(); diff --git a/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java index c75dcf91d84..5618b3795ac 100644 --- a/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java @@ -71,6 +71,7 @@ void testPartition(boolean supportsBucketPartition) { }); } + @SuppressWarnings("deprecation") @ParameterizedTest @ValueSource(booleans = {false, true}) void testGravitinoToSparkDistributionWithoutSortOrder(boolean supportsBucketPartition) { @@ -180,6 +181,7 @@ void testSparkToGravitinoDistributionWithSortOrder(boolean supportsBucketPartiti } } + @SuppressWarnings("deprecation") @ParameterizedTest @ValueSource(booleans = {false, true}) void testGravitinoToSparkDistributionWithSortOrder(boolean supportsBucketPartition) {