Skip to content

Commit

Permalink
build: Add spark-4.0 profile and shims (#407)
Browse files Browse the repository at this point in the history
This PR adds the spark-4.0 profile and shims
This is an initial commit. Tests with the spark-4.0 profile do not pass yet. Tests for spark-3.x should pass.
  • Loading branch information
kazuyukitanimura authored May 28, 2024
1 parent 479a97a commit 9b3e87b
Show file tree
Hide file tree
Showing 52 changed files with 1,125 additions and 162 deletions.
113 changes: 113 additions & 0 deletions .github/workflows/pr_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,44 @@ jobs:
# upload test reports only for java 17
upload-test-reports: ${{ matrix.java_version == '17' }}

linux-test-with-spark4_0:
strategy:
matrix:
os: [ubuntu-latest]
java_version: [17]
test-target: [java]
spark-version: ['4.0']
is_push_event:
- ${{ github.event_name == 'push' }}
fail-fast: false
name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }}
runs-on: ${{ matrix.os }}
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java_version }}
- name: Clone Spark
uses: actions/checkout@v4
with:
repository: "apache/spark"
path: "apache-spark"
- name: Install Spark
shell: bash
working-directory: ./apache-spark
run: build/mvn install -Phive -Phadoop-cloud -DskipTests
- name: Java test steps
uses: ./.github/actions/java-test
with:
# TODO: remove -DskipTests after fixing tests
maven_opts: "-Pspark-${{ matrix.spark-version }} -DskipTests"
# TODO: upload test reports after enabling tests
upload-test-reports: false

linux-test-with-old-spark:
strategy:
matrix:
Expand Down Expand Up @@ -169,6 +207,81 @@ jobs:
with:
maven_opts: -Pspark-${{ matrix.spark-version }},scala-${{ matrix.scala-version }}

macos-test-with-spark4_0:
strategy:
matrix:
os: [macos-13]
java_version: [17]
test-target: [java]
spark-version: ['4.0']
fail-fast: false
if: github.event_name == 'push'
name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }}
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-macos-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java_version }}
- name: Clone Spark
uses: actions/checkout@v4
with:
repository: "apache/spark"
path: "apache-spark"
- name: Install Spark
shell: bash
working-directory: ./apache-spark
run: build/mvn install -Phive -Phadoop-cloud -DskipTests
- name: Java test steps
uses: ./.github/actions/java-test
with:
# TODO: remove -DskipTests after fixing tests
maven_opts: "-Pspark-${{ matrix.spark-version }} -DskipTests"
# TODO: upload test reports after enabling tests
upload-test-reports: false

macos-aarch64-test-with-spark4_0:
strategy:
matrix:
java_version: [17]
test-target: [java]
spark-version: ['4.0']
is_push_event:
- ${{ github.event_name == 'push' }}
exclude: # exclude java 11 for pull_request event
- java_version: 11
is_push_event: false
fail-fast: false
name: macos-14(Silicon)/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }}
runs-on: macos-14
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-macos-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java_version }}
jdk-architecture: aarch64
protoc-architecture: aarch_64
- name: Clone Spark
uses: actions/checkout@v4
with:
repository: "apache/spark"
path: "apache-spark"
- name: Install Spark
shell: bash
working-directory: ./apache-spark
run: build/mvn install -Phive -Phadoop-cloud -DskipTests
- name: Java test steps
uses: ./.github/actions/java-test
with:
# TODO: remove -DskipTests after fixing tests
maven_opts: "-Pspark-${{ matrix.spark-version }} -DskipTests"
# TODO: upload test reports after enabling tests
upload-test-reports: false

macos-aarch64-test-with-old-spark:
strategy:
matrix:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
package org.apache.comet.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.comet.shims.ShimCometParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

object CometParquetUtils {
object CometParquetUtils extends ShimCometParquetUtils {
private val PARQUET_FIELD_ID_WRITE_ENABLED = "spark.sql.parquet.fieldId.write.enabled"
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"
Expand All @@ -39,61 +39,4 @@ object CometParquetUtils {

def ignoreMissingIds(conf: SQLConf): Boolean =
conf.getConfString(IGNORE_MISSING_PARQUET_FIELD_ID, "false").toBoolean

// The following is copied from QueryExecutionErrors
// TODO: remove after dropping Spark 3.2.0 support and directly use
// QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError
def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int,
matchedFields: String): Throwable = {
new RuntimeException(s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

// The followings are copied from org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
// TODO: remove after dropping Spark 3.2.0 support and directly use ParquetUtils
/**
* A StructField metadata key used to set the field id of a column in the Parquet schema.
*/
val FIELD_ID_METADATA_KEY = "parquet.field.id"

/**
* Whether there exists a field in the schema, whether inner or leaf, has the parquet field ID
* metadata.
*/
def hasFieldIds(schema: StructType): Boolean = {
def recursiveCheck(schema: DataType): Boolean = {
schema match {
case st: StructType =>
st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))

case at: ArrayType => recursiveCheck(at.elementType)

case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)

case _ =>
// No need to really check primitive types, just to terminate the recursion
false
}
}
if (schema.isEmpty) false else recursiveCheck(schema)
}

def hasFieldId(field: StructField): Boolean =
field.metadata.contains(FIELD_ID_METADATA_KEY)

def getFieldId(field: StructField): Int = {
require(
hasFieldId(field),
s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
try {
Math.toIntExact(field.metadata.getLong(FIELD_ID_METADATA_KEY))
} catch {
case _: ArithmeticException | _: ClassCastException =>
throw new IllegalArgumentException(
s"The key `$FIELD_ID_METADATA_KEY` must be a 32-bit integer")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import org.apache.spark.sql.types._

trait ShimCometParquetUtils {
// The following is copied from QueryExecutionErrors
// TODO: remove after dropping Spark 3.2.0 support and directly use
// QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError
def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int,
matchedFields: String): Throwable = {
new RuntimeException(s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

// The followings are copied from org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
// TODO: remove after dropping Spark 3.2.0 support and directly use ParquetUtils
/**
* A StructField metadata key used to set the field id of a column in the Parquet schema.
*/
val FIELD_ID_METADATA_KEY = "parquet.field.id"

/**
* Whether there exists a field in the schema, whether inner or leaf, has the parquet field ID
* metadata.
*/
def hasFieldIds(schema: StructType): Boolean = {
def recursiveCheck(schema: DataType): Boolean = {
schema match {
case st: StructType =>
st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))

case at: ArrayType => recursiveCheck(at.elementType)

case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)

case _ =>
// No need to really check primitive types, just to terminate the recursion
false
}
}
if (schema.isEmpty) false else recursiveCheck(schema)
}

def hasFieldId(field: StructField): Boolean =
field.metadata.contains(FIELD_ID_METADATA_KEY)

def getFieldId(field: StructField): Int = {
require(
hasFieldId(field),
s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
try {
Math.toIntExact(field.metadata.getLong(FIELD_ID_METADATA_KEY))
} catch {
case _: ArithmeticException | _: ClassCastException =>
throw new IllegalArgumentException(
s"The key `$FIELD_ID_METADATA_KEY` must be a 32-bit integer")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
PartitionedFile(
partitionValues,
SparkPath.fromUrlString(file),
-1, // -1 means we read the entire file
-1,
Array.empty[String],
0,
0
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

object ShimFileFormat {
// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME

val OPTION_RETURNING_BATCH = FileFormat.OPTION_RETURNING_BATCH
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims


import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.types.{StructField, StructType}

object ShimResolveDefaultColumns {
def getExistenceDefaultValue(field: StructField): Any =
ResolveDefaultColumns.getExistenceDefaultValues(StructType(Seq(field))).head
}
Loading

0 comments on commit 9b3e87b

Please sign in to comment.