Skip to content

Commit e7e0161

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-34482][SS] Correct the active SparkSession for StreamExecution.logicalPlan
### What changes were proposed in this pull request? Set the active SparkSession to `sparkSessionForStream` and diable AQE & CBO before initializing the `StreamExecution.logicalPlan`. ### Why are the changes needed? The active session should be `sparkSessionForStream`. Otherwise, settings like https://github.com/apache/spark/blob/6b34745cb9b294c91cd126c2ea44c039ee83cb84/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L332-L335 wouldn't take effect if callers access them from the active SQLConf, e.g., the rule of `InsertAdaptiveSparkPlan`. Besides, unlike `InsertAdaptiveSparkPlan` (which skips streaming plan), `CostBasedJoinReorder` seems to have the chance to take effect theoretically. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested manually. Before the fix, `InsertAdaptiveSparkPlan` would try to apply AQE on the plan(wouldn't take effect though). After this fix, the rule returns directly. Closes #31600 from Ngone51/active-session-for-stream. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 401e270 commit e7e0161

File tree

2 files changed

+54
-21
lines changed

2 files changed

+54
-21
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -323,26 +323,28 @@ abstract class StreamExecution(
323323
startLatch.countDown()
324324

325325
// While active, repeatedly attempt to run batches.
326-
SparkSession.setActiveSession(sparkSession)
327-
328-
updateStatusMessage("Initializing sources")
329-
// force initialization of the logical plan so that the sources can be created
330-
logicalPlan
331-
332-
// Adaptive execution can change num shuffle partitions, disallow
333-
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
334-
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
335-
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
336-
offsetSeqMetadata = OffsetSeqMetadata(
337-
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
338-
339-
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
340-
// Unblock `awaitInitialization`
341-
initializationLatch.countDown()
342-
runActivatedStream(sparkSessionForStream)
343-
updateStatusMessage("Stopped")
344-
} else {
345-
// `stop()` is already called. Let `finally` finish the cleanup.
326+
sparkSessionForStream.withActive {
327+
// Adaptive execution can change num shuffle partitions, disallow
328+
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
329+
// Disable cost-based join optimization as we do not want stateful operations
330+
// to be rearranged
331+
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
332+
333+
updateStatusMessage("Initializing sources")
334+
// force initialization of the logical plan so that the sources can be created
335+
logicalPlan
336+
337+
offsetSeqMetadata = OffsetSeqMetadata(
338+
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
339+
340+
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
341+
// Unblock `awaitInitialization`
342+
initializationLatch.countDown()
343+
runActivatedStream(sparkSessionForStream)
344+
updateStatusMessage("Stopped")
345+
} else {
346+
// `stop()` is already called. Let `finally` finish the cleanup.
347+
}
346348
}
347349
} catch {
348350
case e if isInterruptedByStop(e, sparkSession.sparkContext) =>

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.scalatest.time.SpanSugar._
3434
import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils}
3535
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3636
import org.apache.spark.sql._
37-
import org.apache.spark.sql.catalyst.plans.logical.Range
37+
import org.apache.spark.sql.catalyst.plans.logical.{Range, RepartitionByExpression}
3838
import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes, StreamingRelationV2}
3939
import org.apache.spark.sql.catalyst.util.DateTimeUtils
4040
import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan}
@@ -1266,6 +1266,37 @@ class StreamSuite extends StreamTest {
12661266
}
12671267
}
12681268
}
1269+
1270+
test("SPARK-34482: correct active SparkSession for logicalPlan") {
1271+
withSQLConf(
1272+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
1273+
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
1274+
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
1275+
var query: StreamExecution = null
1276+
try {
1277+
query =
1278+
df.repartition($"a")
1279+
.writeStream
1280+
.format("memory")
1281+
.queryName("memory")
1282+
.start()
1283+
.asInstanceOf[StreamingQueryWrapper]
1284+
.streamingQuery
1285+
query.awaitInitialization(streamingTimeout.toMillis)
1286+
val plan = query.logicalPlan
1287+
val numPartition = plan
1288+
.find { _.isInstanceOf[RepartitionByExpression] }
1289+
.map(_.asInstanceOf[RepartitionByExpression].numPartitions)
1290+
// Before the fix of SPARK-34482, the numPartition is the value of
1291+
// `COALESCE_PARTITIONS_INITIAL_PARTITION_NUM`.
1292+
assert(numPartition.get === spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS))
1293+
} finally {
1294+
if (query != null) {
1295+
query.stop()
1296+
}
1297+
}
1298+
}
1299+
}
12691300
}
12701301

12711302
abstract class FakeSource extends StreamSourceProvider {

0 commit comments

Comments
 (0)