Skip to content

[SPARK-51187][SQL][SS][3.5] Implement the graceful deprecation of incorrect config introduced in SPARK-49699 #49985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3421,11 +3421,12 @@ object SQLConf {
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)

val PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN =
buildConf("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
buildConf("spark.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
.internal()
.doc("Allow PruneFilters to remove streaming subplans when we encounter a false filter. " +
"This flag is to restore prior buggy behavior for broken pipelines.")
.version("4.0.0")
.withAlternative("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan")
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -4480,7 +4481,9 @@ object SQLConf {
DeprecatedConfig(LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key, "3.2",
"""Use `.format("avro")` in `DataFrameWriter` or `DataFrameReader` instead."""),
DeprecatedConfig(COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "3.2",
s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead.")
s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead."),
DeprecatedConfig(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.alternatives.head, "3.5.5",
s"Use '${PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key}' instead.")
)

Map(configs.map { cfg => cfg.key -> cfg } : _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,16 @@ object OffsetSeqMetadata extends Logging {
}
}
}

// SPARK-51187: This incorrect config is not added in the relevantSQLConfs, but the
// metadata in the offset log may have this if the batch ran from Spark 3.5.4.
// We need to pick the value from the metadata and set it in the new config.
// This also leads the further batches to have a correct config in the offset log.
metadata.conf.get("spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan") match {
case Some(value) =>
sessionConf.set(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key, value)

case _ =>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"3f409b2c-b22b-49f6-b6e4-86c2bdcddaba"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419905155,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419906627,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
1
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,75 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

test("SPARK-51187 validate that the incorrect config introduced in SPARK-49699 still takes " +
"effect when restarting from Spark 3.5.4") {
// Spark 3.5.4 is the only release we accidentally introduced the incorrect config.
// We just need to confirm that current Spark version will apply the fix of SPARK-49699 when
// the streaming query started from Spark 3.5.4. We should consistently apply the fix, instead
// of "on and off", because that may expose more possibility to break.

val problematicConfName = "spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan"

withTempDir { dir =>
val input = getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
assert(input != null, "cannot find test resource")
val inputDir = new File(input.toURI)

// Copy test files to tempDir so that we won't modify the original data.
FileUtils.copyDirectory(inputDir, dir)

// Below is the code we extract checkpoint from Spark 3.5.4. We need to make sure the offset
// advancement continues from the last run.
val inputData = MemoryStream[Int]
val df = inputData.toDF()

inputData.addData(1, 2, 3, 4)
inputData.addData(5, 6, 7, 8)

testStream(df)(
StartStream(checkpointLocation = dir.getCanonicalPath),
AddData(inputData, 9, 10, 11, 12),
ProcessAllAvailable(),
AssertOnQuery { q =>
val confValue = q.lastExecution.sparkSession.conf.get(
SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
assert(confValue === false,
"The value for the incorrect config in offset metadata should be respected as the " +
"value of the fixed config")

val offsetLog = new OffsetSeqLog(spark, new File(dir, "offsets").getCanonicalPath)
def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: Boolean): Unit = {
val offsetLogForBatch = offsetLog.get(batchId).get
val confInMetadata = offsetLogForBatch.metadata.get.conf
if (expectCorrectConfig) {
assert(confInMetadata.get(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key) ===
Some("false"),
"The new offset log should have the fixed config instead of the incorrect one."
)
assert(!confInMetadata.contains(problematicConfName),
"The new offset log should not have the incorrect config.")
} else {
assert(
confInMetadata.get(problematicConfName) === Some("false"),
"The offset log in test resource should have the incorrect config to test properly."
)
assert(
!confInMetadata.contains(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key),
"The offset log in test resource should not have the fixed config."
)
}
}

assert(offsetLog.getLatestBatchId() === Some(2))
checkConfigFromMetadata(0, expectCorrectConfig = false)
checkConfigFromMetadata(1, expectCorrectConfig = false)
checkConfigFromMetadata(2, expectCorrectConfig = true)
true
}
)
}
}

private def checkExceptionMessage(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down