Skip to content

Commit 3db46bf

Browse files
anishshri-dbHeartSaVioR
authored andcommitted
[SPARK-50526][SS] Add store encoding format conf into offset log and block non supported stateful operators from using avro
### What changes were proposed in this pull request? Add store encoding format conf into offset log and block non supported stateful operators from using avro ### Why are the changes needed? Changes are needed to ensure that encoding format info is stored in offset log and that trying to use avro in a query that has stateful operators that don't support this encoding format yet will result in query failure. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added unit tests ``` [info] Run completed in 2 seconds, 547 milliseconds. [info] Total number of tests run: 6 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 13 s, completed Dec 9, 2024, 6:15:25 PM ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #49121 from anishshri-db/task/SPARK-50526. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 5538d85 commit 3db46bf

File tree

96 files changed

+278
-89
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+278
-89
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import java.util.Locale
21+
2022
import org.apache.spark.internal.{Logging, MDC}
2123
import org.apache.spark.internal.LogKeys.{ANALYSIS_ERROR, QUERY_PLAN}
2224
import org.apache.spark.sql.AnalysisException
@@ -140,6 +142,38 @@ object UnsupportedOperationChecker extends Logging {
140142
}
141143
}
142144

145+
private def checkAvroSupportForStatefulOperator(p: LogicalPlan): Option[String] = p match {
146+
// TODO: remove operators from this list as support for avro encoding is added
147+
case s: Aggregate if s.isStreaming => Some("aggregation")
148+
// Since the Distinct node will be replaced to Aggregate in the optimizer rule
149+
// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
150+
// assuming it as Aggregate.
151+
case d @ Distinct(_: LogicalPlan) if d.isStreaming => Some("distinct")
152+
case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => Some("join")
153+
case f: FlatMapGroupsWithState if f.isStreaming => Some("flatMapGroupsWithState")
154+
case f: FlatMapGroupsInPandasWithState if f.isStreaming =>
155+
Some("applyInPandasWithState")
156+
case d: Deduplicate if d.isStreaming => Some("dropDuplicates")
157+
case d: DeduplicateWithinWatermark if d.isStreaming => Some("dropDuplicatesWithinWatermark")
158+
case _ => None
159+
}
160+
161+
// Rule to check that avro encoding format is not supported in case any
162+
// non-transformWithState stateful streaming operators are present in the query.
163+
def checkSupportedStoreEncodingFormats(plan: LogicalPlan): Unit = {
164+
val storeEncodingFormat = SQLConf.get.stateStoreEncodingFormat
165+
if (storeEncodingFormat.toLowerCase(Locale.ROOT) == "avro") {
166+
plan.foreach { subPlan =>
167+
val operatorOpt = checkAvroSupportForStatefulOperator(subPlan)
168+
if (operatorOpt.isDefined) {
169+
val errorMsg = "State store encoding format as avro is not supported for " +
170+
s"operator=${operatorOpt.get} used within the query"
171+
throwError(errorMsg)(plan)
172+
}
173+
}
174+
}
175+
}
176+
143177
def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
144178
if (!plan.isStreaming) {
145179
throwError(
@@ -199,6 +233,11 @@ object UnsupportedOperationChecker extends Logging {
199233
"DataFrames/Datasets")(plan)
200234
}
201235

236+
// check to see that if store encoding format is set to true, then we have no stateful
237+
// operators in the query or only variants of operators that support avro encoding such as
238+
// transformWithState.
239+
checkSupportedStoreEncodingFormats(plan)
240+
202241
val aggregates = collectStreamingAggregates(plan)
203242
// Disallow some output mode
204243
outputMode match {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ object OffsetSeqMetadata extends Logging {
102102
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
103103
STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
104104
STATE_STORE_ROCKSDB_FORMAT_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
105-
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN
105+
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT
106106
)
107107

108108
/**
@@ -125,7 +125,8 @@ object OffsetSeqMetadata extends Logging {
125125
SymmetricHashJoinStateManager.legacyVersion.toString,
126126
STATE_STORE_COMPRESSION_CODEC.key -> CompressionCodec.LZ4,
127127
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false",
128-
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true"
128+
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true",
129+
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow"
129130
)
130131

131132
def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"id":"f3f30619-9175-4329-97a7-f5629deaad89"}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1734074255407,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"avro","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
0
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1734074257473,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"avro","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v2
2+
{"operatorInfo":{"operatorId":0,"operatorName":"transformWithStateExec"},"stateStoreInfo":[{"storeName":"default","numColsPrefixKey":0,"numPartitions":5,"stateSchemaFilePath":"file:/Users/anish.shrigondekar/spark/spark/target/tmp/spark-dcaeba6f-ff09-4f91-ba1b-4d14fe53cc9f/state/0/_stateSchema/default/0_6b12d3c5-57e6-4001-8321-3ae63d6be7a0"}],"operatorPropertiesJson":"{\"timeMode\":\"NoTime\",\"outputMode\":\"Update\",\"stateVariables\":[{\"stateName\":\"countState\",\"stateVariableType\":\"ValueState\",\"ttlEnabled\":false}]}"}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"id":"1341f9d1-5100-4426-876c-2754aeaca02b"}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1734074067729,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"unsaferow","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
0
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1734074071551,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"unsaferow","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
3+
1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v2
2+
{"operatorInfo":{"operatorId":0,"operatorName":"transformWithStateExec"},"stateStoreInfo":[{"storeName":"default","numColsPrefixKey":0,"numPartitions":5,"stateSchemaFilePath":"file:/Users/anish.shrigondekar/spark/spark/target/tmp/spark-ae28252a-e696-4653-a9a5-7a9a0766f4c1/state/0/_stateSchema/default/0_2e8e6b52-e3c3-4184-b8ef-8d391b75d751"}],"operatorPropertiesJson":"{\"timeMode\":\"NoTime\",\"outputMode\":\"Update\",\"stateVariables\":[{\"stateName\":\"countState\",\"stateVariableType\":\"ValueState\",\"ttlEnabled\":false}]}"}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
1919

2020
import java.io.File
2121

22+
import org.apache.commons.io.FileUtils
23+
2224
import org.apache.spark.sql.catalyst.util.stringToFile
2325
import org.apache.spark.sql.internal.SQLConf
2426
import org.apache.spark.sql.test.SharedSparkSession
@@ -129,4 +131,69 @@ class OffsetSeqLogSuite extends SharedSparkSession {
129131
val log = new OffsetSeqLog(spark, input.toString)
130132
log.getLatest().get
131133
}
134+
135+
// SPARK-50526 - sanity tests to ensure that values are set correctly for state store
136+
// encoding format within OffsetSeqMetadata
137+
test("offset log records defaults to unsafeRow for store encoding format") {
138+
val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, batchTimestampMs = 0,
139+
spark.conf)
140+
assert(offsetSeqMetadata.conf.get(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key) ===
141+
Some("unsaferow"))
142+
}
143+
144+
test("offset log uses the store encoding format set in the conf") {
145+
val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, batchTimestampMs = 0,
146+
Map(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro"))
147+
assert(offsetSeqMetadata.conf.get(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key) ===
148+
Some("avro"))
149+
}
150+
151+
// Verify whether entry exists within the offset log and has the right value or that we pick up
152+
// the correct default values when populating the session conf.
153+
private def verifyOffsetLogEntry(
154+
checkpointDir: String,
155+
entryExists: Boolean,
156+
encodingFormat: String): Unit = {
157+
val log = new OffsetSeqLog(spark, s"$checkpointDir/offsets")
158+
val latestBatchId = log.getLatestBatchId()
159+
assert(latestBatchId.isDefined, "No offset log entries found in the checkpoint location")
160+
161+
// Read the latest offset log
162+
val offsetSeq = log.get(latestBatchId.get).get
163+
val offsetSeqMetadata = offsetSeq.metadata.get
164+
165+
if (entryExists) {
166+
val encodingFormatOpt = offsetSeqMetadata.conf.get(
167+
SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key)
168+
assert(encodingFormatOpt.isDefined, "No store encoding format found in the offset log entry")
169+
assert(encodingFormatOpt.get == encodingFormat)
170+
}
171+
172+
val clonedSqlConf = spark.sessionState.conf.clone()
173+
OffsetSeqMetadata.setSessionConf(offsetSeqMetadata, clonedSqlConf)
174+
assert(clonedSqlConf.stateStoreEncodingFormat == encodingFormat)
175+
}
176+
177+
// verify that checkpoint created with different store encoding formats are read correctly
178+
Seq("unsaferow", "avro").foreach { storeEncodingFormat =>
179+
test(s"verify format values from checkpoint loc - $storeEncodingFormat") {
180+
withTempDir { checkpointDir =>
181+
val resourceUri = this.getClass.getResource(
182+
"/structured-streaming/checkpoint-version-4.0.0-tws-" + storeEncodingFormat + "/").toURI
183+
FileUtils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
184+
verifyOffsetLogEntry(checkpointDir.getAbsolutePath, entryExists = true,
185+
storeEncodingFormat)
186+
}
187+
}
188+
}
189+
190+
test("verify format values from old checkpoint with Spark version 3.5.1") {
191+
withTempDir { checkpointDir =>
192+
val resourceUri = this.getClass.getResource(
193+
"/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/").toURI
194+
FileUtils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
195+
verifyOffsetLogEntry(checkpointDir.getAbsolutePath, entryExists = false,
196+
"unsaferow")
197+
}
198+
}
132199
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,6 +874,26 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
874874
)
875875
}
876876

877+
testWithAllStateVersions("test that avro encoding is not supported") {
878+
val inputData = MemoryStream[Int]
879+
880+
val aggregated =
881+
inputData.toDF()
882+
.groupBy($"value")
883+
.agg(count("*"))
884+
.as[(Int, Long)]
885+
886+
val ex = intercept[Exception] {
887+
withSQLConf(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
888+
testStream(aggregated, Update)(
889+
AddData(inputData, 3),
890+
ProcessAllAvailable()
891+
)
892+
}
893+
}
894+
assert(ex.getMessage.contains("State store encoding format as avro is not supported"))
895+
}
896+
877897
private def prepareTestForChangingSchemaOfState(
878898
tempDir: File): (MemoryStream[Int], DataFrame) = {
879899
val inputData = MemoryStream[Int]

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,21 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
574574
matchPVals = true
575575
)
576576
}
577+
578+
test("test that avro encoding is not supported") {
579+
val inputData = MemoryStream[String]
580+
val result = inputData.toDS().dropDuplicates()
581+
582+
val ex = intercept[Exception] {
583+
withSQLConf(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
584+
testStream(result, Append)(
585+
AddData(inputData, "a"),
586+
ProcessAllAvailable()
587+
)
588+
}
589+
}
590+
assert(ex.getMessage.contains("State store encoding format as avro is not supported"))
591+
}
577592
}
578593

579594
@SlowSQLTest

0 commit comments

Comments
 (0)