Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ spark-extension/src/generated-sources

# exclude spark241kwaiae shim module
spark-extension-shims-spark241kwaiae

# exclude native engine build lib directory
native-engine/_build
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ Blaze.
```shell
SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.4/spark-3.5
MODE=release # or pre
mvn package -P"${SHIM}" -P"${MODE}"
mvn clean package -P"${SHIM}" -P"${MODE}"
```

Skip build native (native lib is already built, and you can check the native lib in `native-engine/_build/${MODE}`).

```shell
SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.4/spark-3.5
MODE=release # or pre
mvn clean package -P"${SHIM}" -P"${MODE}" -DskipBuildNative
```

After the build is finished, a fat Jar package that contains all the dependencies will be generated in the `target`
Expand Down
6 changes: 3 additions & 3 deletions build-native.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ else
echo "native-engine source code and built libraries not modified, no need to rebuild"
fi

mkdir -p dev/mvn-build-helper/assembly/target/classes
rm -f dev/mvn-build-helper/assembly/target/classes/$libname.{dylib,so,dll}
cp target/$profile/$libname."$libsuffix" dev/mvn-build-helper/assembly/target/classes
mkdir -p native-engine/_build/$profile
rm -f native-engine/_build/$profile/*
cp target/$profile/$libname."$libsuffix" native-engine/_build/$profile

new_checksum="$(checksum)"
echo "build-checksum updated: $new_checksum"
Expand Down
25 changes: 0 additions & 25 deletions dev/mvn-build-helper/assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,6 @@

<build>
<plugins>
<!-- build native libs -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<executable>bash</executable>
<arguments>
<argument>./build-native.sh</argument>
<argument>${releaseMode}</argument>
</arguments>
<workingDirectory>../../../</workingDirectory>
<useMavenLogger>true</useMavenLogger>
</configuration>
<executions>
<execution>
<id>prepare-native-libs</id>
<goals>
<goal>exec</goal>
</goals>
<phase>compile</phase>
</execution>
</executions>
</plugin>

<!-- create uber jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
30 changes: 29 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,34 @@
<artifactId>scalatest_${scalaVersion}</artifactId>
<version>${scalaTestVersion}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scalaVersion}</artifactId>
<version>${sparkVersion}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scalaVersion}</artifactId>
<version>${sparkVersion}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scalaVersion}</artifactId>
<version>${sparkVersion}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -269,7 +297,7 @@
<javaVersion>1.8</javaVersion>
<scalaVersion>2.12</scalaVersion>
<scalaLongVersion>2.12.17</scalaLongVersion>
<scalaTestVersion>3.2.9</scalaTestVersion>
<scalaTestVersion>3.0.8</scalaTestVersion>
<scalafmtVersion>3.0.0</scalafmtVersion>
<sparkVersion>3.0.3</sparkVersion>
<celebornVersion>0.5.2</celebornVersion>
Expand Down
16 changes: 16 additions & 0 deletions spark-extension-shims-spark3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,21 @@
<artifactId>enableif_${scalaVersion}</artifactId>
<version>1.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scalaVersion}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scalaVersion}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scalaVersion}</artifactId>
<type>test-jar</type>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.blaze

import org.apache.spark.SparkConf
import org.apache.spark.sql.test.SharedSparkSession

trait BaseBlazeSQLSuite extends SharedSparkSession {

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.extensions", "org.apache.spark.sql.blaze.BlazeSparkSessionExtension")
.set(
"spark.shuffle.manager",
"org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager")
.set("spark.memory.offHeap.enabled", "false")
.set("spark.blaze.enable", "true")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.blaze

import com.thoughtworks.enableMembersIf

@enableMembersIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
class BlazeAdaptiveQueryExecSuite
extends org.apache.spark.sql.QueryTest
with BaseBlazeSQLSuite
with org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper {

import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, AdaptiveSparkPlanExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData

import testImplicits._

// Copy from spark/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
test("SPARK-35725: Support optimize skewed partitions in RebalancePartitions") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test case for #662

withTempView("v") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {

spark.sparkContext
.parallelize((1 to 10).map(i => TestData(if (i > 4) 5 else i, i.toString)), 3)
.toDF("c1", "c2")
.createOrReplaceTempView("v")

def checkPartitionNumber(
query: String,
skewedPartitionNumber: Int,
totalNumber: Int): Unit = {
val (_, adaptive) = runAdaptiveAndVerifyResult(query)
val read = collect(adaptive) { case read: AQEShuffleReadExec =>
read
}
assert(read.size == 1)
assert(
read.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec]) ==
skewedPartitionNumber)
assert(read.head.partitionSpecs.size == totalNumber)
}

withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100") {
// partition size [0, 120, 34, 34, 34]
checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4)
// partition size [216, 216, 72, 0, 216]
checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 9, 10)
}

// no skewed partition should be optimized
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10000") {
checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 0, 1)
}
}
}
}

private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, SparkPlan) = {
var finalPlanCnt = 0
var hasMetricsEvent = false
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) =>
if (sparkPlanInfo.simpleString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) {
finalPlanCnt += 1
}
case _: SparkListenerSQLAdaptiveSQLMetricUpdates =>
hasMetricsEvent = true
case _ => // ignore other events
}
}
}
spark.sparkContext.addSparkListener(listener)

val dfAdaptive = sql(query)
val planBefore = dfAdaptive.queryExecution.executedPlan
assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false"))
val result = dfAdaptive.collect()
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = sql(query)
checkAnswer(df, result)
}
val planAfter = dfAdaptive.queryExecution.executedPlan
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan

spark.sparkContext.listenerBus.waitUntilEmpty()
// AQE will post `SparkListenerSQLAdaptiveExecutionUpdate` twice in case of subqueries that
// exist out of query stages.
val expectedFinalPlanCnt = adaptivePlan.find(_.subqueries.nonEmpty).map(_ => 2).getOrElse(1)
assert(finalPlanCnt == expectedFinalPlanCnt)
spark.sparkContext.removeSparkListener(listener)

val expectedMetrics = findInMemoryTable(planAfter).nonEmpty ||
subqueriesAll(planAfter).nonEmpty
assert(hasMetricsEvent == expectedMetrics)

val exchanges = adaptivePlan.collect { case e: Exchange =>
e
}
assert(exchanges.isEmpty, "The final plan should not contain any Exchange node.")
(dfAdaptive.queryExecution.sparkPlan, adaptivePlan)
}

private def findInMemoryTable(plan: SparkPlan): Seq[InMemoryTableScanExec] = {
collect(plan) {
case c: InMemoryTableScanExec
if c.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] =>
c
}
}
}
53 changes: 53 additions & 0 deletions spark-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,57 @@
<version>1.2.0</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>${project.basedir}/../native-engine/_build/${releaseMode}</directory>
<includes>
<include>libblaze.so</include>
<include>libblaze.dylib</include>
<include>blaze.dll</include>
</includes>
</resource>
</resources>
</build>

<profiles>
<profile>
<id>build-native</id>
<activation>
<property>
<name>!skipBuildNative</name>
</property>
</activation>
<build>
<plugins>
<!-- build native libs -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<executable>bash</executable>
<arguments>
<argument>./build-native.sh</argument>
<argument>${releaseMode}</argument>
</arguments>
<workingDirectory>../</workingDirectory>
<useMavenLogger>true</useMavenLogger>
</configuration>
<executions>
<execution>
<id>prepare-native-libs</id>
<goals>
<goal>exec</goal>
</goals>
<phase>generate-resources</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>