Skip to content

Commit 3ba36ec

Browse files
HeartSaVioRdongjoon-hyun
authored andcommitted
[SPARK-51187][SQL][SS][3.5] Implement the graceful deprecation of incorrect config introduced in SPARK-49699
### What changes were proposed in this pull request? This PR proposes to implement the graceful deprecation of incorrect config introduced in SPARK-49699. SPARK-49699 was included in Spark 3.5.4, hence we can't simply rename to fix the issue. Also, since the incorrect config is logged in offset log in streaming query, the fix isn't just easy like adding withAlternative and done. We need to manually handle the case where offset log contains the incorrect config, and set the value of incorrect config in the offset log into the new config. Once a single microbatch has planned after the restart (hence the above logic is applied), offset log will contain the "new" config and it will no longer refer to the incorrect config. That said, we can remove the incorrect config in the Spark version which we are confident that there will be no case users will upgrade from Spark 3.5.4 to that version. ### Why are the changes needed? We released an incorrect config and we want to rename it properly. While renaming, we don't also want to have any breakage on the existing streaming query. ### Does this PR introduce _any_ user-facing change? No. That is what this PR is aiming for. ### How was this patch tested? New UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49985 from HeartSaVioR/SPARK-51187-3.5. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent c0bfae6 commit 3ba36ec

File tree

8 files changed

+96
-2
lines changed

8 files changed

+96
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3421,11 +3421,12 @@ object SQLConf {
34213421
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
34223422

34233423
val PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN =
3424-
buildConf("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
3424+
buildConf("spark.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
34253425
.internal()
34263426
.doc("Allow PruneFilters to remove streaming subplans when we encounter a false filter. " +
34273427
"This flag is to restore prior buggy behavior for broken pipelines.")
34283428
.version("4.0.0")
3429+
.withAlternative("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
34293430
.booleanConf
34303431
.createWithDefault(false)
34313432

@@ -4480,7 +4481,9 @@ object SQLConf {
44804481
DeprecatedConfig(LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key, "3.2",
44814482
"""Use `.format("avro")` in `DataFrameWriter` or `DataFrameReader` instead."""),
44824483
DeprecatedConfig(COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "3.2",
4483-
s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead.")
4484+
s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead."),
4485+
DeprecatedConfig(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.alternatives.head, "3.5.5",
4486+
s"Use '${PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key}' instead.")
44844487
)
44854488

44864489
Map(configs.map { cfg => cfg.key -> cfg } : _*)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,5 +170,16 @@ object OffsetSeqMetadata extends Logging {
170170
}
171171
}
172172
}
173+
174+
// SPARK-51187: This incorrect config is not added in the relevantSQLConfs, but the
175+
// metadata in the offset log may have this if the batch ran from Spark 3.5.4.
176+
// We need to pick the value from the metadata and set it in the new config.
177+
// This also leads the further batches to have a correct config in the offset log.
178+
metadata.conf.get("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan") match {
179+
case Some(value) =>
180+
sessionConf.set(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key, value)
181+
182+
case _ =>
183+
}
173184
}
174185
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"id":"3f409b2c-b22b-49f6-b6e4-86c2bdcddaba"}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1739419905155,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
0
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1739419906627,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
1

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,75 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
13631363
)
13641364
}
13651365

1366+
test("SPARK-51187 validate that the incorrect config introduced in SPARK-49699 still takes " +
1367+
"effect when restarting from Spark 3.5.4") {
1368+
// Spark 3.5.4 is the only release we accidentally introduced the incorrect config.
1369+
// We just need to confirm that current Spark version will apply the fix of SPARK-49699 when
1370+
// the streaming query started from Spark 3.5.4. We should consistently apply the fix, instead
1371+
// of "on and off", because that may expose more possibility to break.
1372+
1373+
val problematicConfName = "spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan"
1374+
1375+
withTempDir { dir =>
1376+
val input = getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
1377+
assert(input != null, "cannot find test resource")
1378+
val inputDir = new File(input.toURI)
1379+
1380+
// Copy test files to tempDir so that we won't modify the original data.
1381+
FileUtils.copyDirectory(inputDir, dir)
1382+
1383+
// Below is the code we extract checkpoint from Spark 3.5.4. We need to make sure the offset
1384+
// advancement continues from the last run.
1385+
val inputData = MemoryStream[Int]
1386+
val df = inputData.toDF()
1387+
1388+
inputData.addData(1, 2, 3, 4)
1389+
inputData.addData(5, 6, 7, 8)
1390+
1391+
testStream(df)(
1392+
StartStream(checkpointLocation = dir.getCanonicalPath),
1393+
AddData(inputData, 9, 10, 11, 12),
1394+
ProcessAllAvailable(),
1395+
AssertOnQuery { q =>
1396+
val confValue = q.lastExecution.sparkSession.conf.get(
1397+
SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
1398+
assert(confValue === false,
1399+
"The value for the incorrect config in offset metadata should be respected as the " +
1400+
"value of the fixed config")
1401+
1402+
val offsetLog = new OffsetSeqLog(spark, new File(dir, "offsets").getCanonicalPath)
1403+
def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: Boolean): Unit = {
1404+
val offsetLogForBatch = offsetLog.get(batchId).get
1405+
val confInMetadata = offsetLogForBatch.metadata.get.conf
1406+
if (expectCorrectConfig) {
1407+
assert(confInMetadata.get(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key) ===
1408+
Some("false"),
1409+
"The new offset log should have the fixed config instead of the incorrect one."
1410+
)
1411+
assert(!confInMetadata.contains(problematicConfName),
1412+
"The new offset log should not have the incorrect config.")
1413+
} else {
1414+
assert(
1415+
confInMetadata.get(problematicConfName) === Some("false"),
1416+
"The offset log in test resource should have the incorrect config to test properly."
1417+
)
1418+
assert(
1419+
!confInMetadata.contains(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key),
1420+
"The offset log in test resource should not have the fixed config."
1421+
)
1422+
}
1423+
}
1424+
1425+
assert(offsetLog.getLatestBatchId() === Some(2))
1426+
checkConfigFromMetadata(0, expectCorrectConfig = false)
1427+
checkConfigFromMetadata(1, expectCorrectConfig = false)
1428+
checkConfigFromMetadata(2, expectCorrectConfig = true)
1429+
true
1430+
}
1431+
)
1432+
}
1433+
}
1434+
13661435
private def checkExceptionMessage(df: DataFrame): Unit = {
13671436
withTempDir { outputDir =>
13681437
withTempDir { checkpointDir =>

0 commit comments

Comments
 (0)