Skip to content

[SPARK-50526][SS] Add store encoding format conf into offset log and block non supported stateful operators from using avro #49121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.analysis

import java.util.Locale

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{ANALYSIS_ERROR, QUERY_PLAN}
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -140,6 +142,38 @@ object UnsupportedOperationChecker extends Logging {
}
}

private def checkAvroSupportForStatefulOperator(p: LogicalPlan): Option[String] = p match {
// TODO: remove operators from this list as support for avro encoding is added
case s: Aggregate if s.isStreaming => Some("aggregation")
// Since the Distinct node will be replaced to Aggregate in the optimizer rule
// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
// assuming it as Aggregate.
case d @ Distinct(_: LogicalPlan) if d.isStreaming => Some("distinct")
case _ @ Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => Some("join")
case f: FlatMapGroupsWithState if f.isStreaming => Some("flatMapGroupsWithState")
case f: FlatMapGroupsInPandasWithState if f.isStreaming =>
Some("applyInPandasWithState")
case d: Deduplicate if d.isStreaming => Some("dropDuplicates")
case d: DeduplicateWithinWatermark if d.isStreaming => Some("dropDuplicatesWithinWatermark")
case _ => None
}

// Rule to check that avro encoding format is not supported in case any
// non-transformWithState stateful streaming operators are present in the query.
def checkSupportedStoreEncodingFormats(plan: LogicalPlan): Unit = {
val storeEncodingFormat = SQLConf.get.stateStoreEncodingFormat
if (storeEncodingFormat.toLowerCase(Locale.ROOT) == "avro") {
plan.foreach { subPlan =>
val operatorOpt = checkAvroSupportForStatefulOperator(subPlan)
if (operatorOpt.isDefined) {
val errorMsg = "State store encoding format as avro is not supported for " +
s"operator=${operatorOpt.get} used within the query"
throwError(errorMsg)(plan)
}
}
}
}

def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
if (!plan.isStreaming) {
throwError(
Expand Down Expand Up @@ -199,6 +233,11 @@ object UnsupportedOperationChecker extends Logging {
"DataFrames/Datasets")(plan)
}

// check to see that if store encoding format is set to true, then we have no stateful
// operators in the query or only variants of operators that support avro encoding such as
// transformWithState.
checkSupportedStoreEncodingFormats(plan)

val aggregates = collectStreamingAggregates(plan)
// Disallow some output mode
outputMode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object OffsetSeqMetadata extends Logging {
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
STATE_STORE_ROCKSDB_FORMAT_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT
)

/**
Expand All @@ -125,7 +125,8 @@ object OffsetSeqMetadata extends Logging {
SymmetricHashJoinStateManager.legacyVersion.toString,
STATE_STORE_COMPRESSION_CODEC.key -> CompressionCodec.LZ4,
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false",
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true"
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true",
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow"
)

def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"f3f30619-9175-4329-97a7-f5629deaad89"}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1734074255407,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"avro","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1734074257473,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"avro","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
1
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v2
{"operatorInfo":{"operatorId":0,"operatorName":"transformWithStateExec"},"stateStoreInfo":[{"storeName":"default","numColsPrefixKey":0,"numPartitions":5,"stateSchemaFilePath":"file:/Users/anish.shrigondekar/spark/spark/target/tmp/spark-dcaeba6f-ff09-4f91-ba1b-4d14fe53cc9f/state/0/_stateSchema/default/0_6b12d3c5-57e6-4001-8321-3ae63d6be7a0"}],"operatorPropertiesJson":"{\"timeMode\":\"NoTime\",\"outputMode\":\"Update\",\"stateVariables\":[{\"stateName\":\"countState\",\"stateVariableType\":\"ValueState\",\"ttlEnabled\":false}]}"}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0,"stateUniqueIds":{}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"1341f9d1-5100-4426-876c-2754aeaca02b"}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1734074067729,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"unsaferow","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1734074071551,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"unsaferow","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
1
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v2
{"operatorInfo":{"operatorId":0,"operatorName":"transformWithStateExec"},"stateStoreInfo":[{"storeName":"default","numColsPrefixKey":0,"numPartitions":5,"stateSchemaFilePath":"file:/Users/anish.shrigondekar/spark/spark/target/tmp/spark-ae28252a-e696-4653-a9a5-7a9a0766f4c1/state/0/_stateSchema/default/0_2e8e6b52-e3c3-4184-b8ef-8d391b75d751"}],"operatorPropertiesJson":"{\"timeMode\":\"NoTime\",\"outputMode\":\"Update\",\"stateVariables\":[{\"stateName\":\"countState\",\"stateVariableType\":\"ValueState\",\"ttlEnabled\":false}]}"}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming

import java.io.File

import org.apache.commons.io.FileUtils

import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -129,4 +131,69 @@ class OffsetSeqLogSuite extends SharedSparkSession {
val log = new OffsetSeqLog(spark, input.toString)
log.getLatest().get
}

// SPARK-50526 - sanity tests to ensure that values are set correctly for state store
// encoding format within OffsetSeqMetadata
test("offset log records defaults to unsafeRow for store encoding format") {
val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, batchTimestampMs = 0,
spark.conf)
assert(offsetSeqMetadata.conf.get(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key) ===
Some("unsaferow"))
}

test("offset log uses the store encoding format set in the conf") {
val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, batchTimestampMs = 0,
Map(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro"))
assert(offsetSeqMetadata.conf.get(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key) ===
Some("avro"))
}

// Verify whether entry exists within the offset log and has the right value or that we pick up
// the correct default values when populating the session conf.
private def verifyOffsetLogEntry(
checkpointDir: String,
entryExists: Boolean,
encodingFormat: String): Unit = {
val log = new OffsetSeqLog(spark, s"$checkpointDir/offsets")
val latestBatchId = log.getLatestBatchId()
assert(latestBatchId.isDefined, "No offset log entries found in the checkpoint location")

// Read the latest offset log
val offsetSeq = log.get(latestBatchId.get).get
val offsetSeqMetadata = offsetSeq.metadata.get

if (entryExists) {
val encodingFormatOpt = offsetSeqMetadata.conf.get(
SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key)
assert(encodingFormatOpt.isDefined, "No store encoding format found in the offset log entry")
assert(encodingFormatOpt.get == encodingFormat)
}

val clonedSqlConf = spark.sessionState.conf.clone()
OffsetSeqMetadata.setSessionConf(offsetSeqMetadata, clonedSqlConf)
assert(clonedSqlConf.stateStoreEncodingFormat == encodingFormat)
}

// verify that checkpoint created with different store encoding formats are read correctly
Seq("unsaferow", "avro").foreach { storeEncodingFormat =>
test(s"verify format values from checkpoint loc - $storeEncodingFormat") {
withTempDir { checkpointDir =>
val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-4.0.0-tws-" + storeEncodingFormat + "/").toURI
FileUtils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
verifyOffsetLogEntry(checkpointDir.getAbsolutePath, entryExists = true,
storeEncodingFormat)
}
}
}

test("verify format values from old checkpoint with Spark version 3.5.1") {
withTempDir { checkpointDir =>
val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/").toURI
FileUtils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
verifyOffsetLogEntry(checkpointDir.getAbsolutePath, entryExists = false,
"unsaferow")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,26 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
)
}

testWithAllStateVersions("test that avro encoding is not supported") {
val inputData = MemoryStream[Int]

val aggregated =
inputData.toDF()
.groupBy($"value")
.agg(count("*"))
.as[(Int, Long)]

val ex = intercept[Exception] {
withSQLConf(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
testStream(aggregated, Update)(
AddData(inputData, 3),
ProcessAllAvailable()
)
}
}
assert(ex.getMessage.contains("State store encoding format as avro is not supported"))
}

private def prepareTestForChangingSchemaOfState(
tempDir: File): (MemoryStream[Int], DataFrame) = {
val inputData = MemoryStream[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,21 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
matchPVals = true
)
}

test("test that avro encoding is not supported") {
val inputData = MemoryStream[String]
val result = inputData.toDS().dropDuplicates()

val ex = intercept[Exception] {
withSQLConf(SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
testStream(result, Append)(
AddData(inputData, "a"),
ProcessAllAvailable()
)
}
}
assert(ex.getMessage.contains("State store encoding format as avro is not supported"))
}
}

@SlowSQLTest
Expand Down
Loading
Loading