Skip to content

Commit 463a24d

Browse files
committed
[SPARK-39650][SS] Fix incorrect value schema in streaming deduplication with backward compatibility
### What changes were proposed in this pull request? This PR proposes to fix the incorrect value schema in streaming deduplication. It stores the empty row having a single column with null (using NullType), but the value schema is specified as all columns, which leads incorrect behavior from state store schema compatibility checker. This PR proposes to set the schema of value as `StructType(Array(StructField("__dummy__", NullType)))` to fit with the empty row. With this change, the streaming queries creating the checkpoint after this fix would work smoothly. To not break the existing streaming queries having incorrect value schema, this PR proposes to disable the check for value schema on streaming deduplication. Disabling the value check was there for the format validation (we have two different checkers for state store), but it has been missing for state store schema compatibility check. To avoid adding more config, this PR leverages the existing config "format validation" is using. ### Why are the changes needed? This is a bug fix. Suppose the streaming query below: ``` # df has the columns `a`, `b`, `c` val df = spark.readStream.format("...").load() val query = df.dropDuplicate("a").writeStream.format("...").start() ``` while the query is running, df can produce a different set of columns (e.g. `a`, `b`, `c`, `d`) from the same source due to schema evolution. Since we only deduplicate the rows with column `a`, the change of schema should not matter for streaming deduplication, but state store schema checker throws error saying "value schema is not compatible" before this fix. ### Does this PR introduce _any_ user-facing change? No, this is basically a bug fix which end users wouldn't notice unless they encountered a bug. ### How was this patch tested? New tests. Closes #37041 from HeartSaVioR/SPARK-39650. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit fe53603) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 27f78e6 commit 463a24d

File tree

37 files changed

+152
-22
lines changed

37 files changed

+152
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,34 @@ class StateSchemaCompatibilityChecker(
4141
fm.mkdirs(schemaFileLocation.getParent)
4242

4343
def check(keySchema: StructType, valueSchema: StructType): Unit = {
44+
check(keySchema, valueSchema, ignoreValueSchema = false)
45+
}
46+
47+
def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: Boolean): Unit = {
4448
if (fm.exists(schemaFileLocation)) {
4549
logDebug(s"Schema file for provider $providerId exists. Comparing with provided schema.")
4650
val (storedKeySchema, storedValueSchema) = readSchemaFile()
47-
if (storedKeySchema.equals(keySchema) && storedValueSchema.equals(valueSchema)) {
51+
if (storedKeySchema.equals(keySchema) &&
52+
(ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
4853
// schema is exactly same
4954
} else if (!schemasCompatible(storedKeySchema, keySchema) ||
50-
!schemasCompatible(storedValueSchema, valueSchema)) {
55+
(!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema))) {
56+
val errorMsgForKeySchema = s"- Provided key schema: $keySchema\n" +
57+
s"- Existing key schema: $storedKeySchema\n"
58+
59+
// If it is requested to skip checking the value schema, we also don't expose the value
60+
// schema information to the error message.
61+
val errorMsgForValueSchema = if (!ignoreValueSchema) {
62+
s"- Provided value schema: $valueSchema\n" +
63+
s"- Existing value schema: $storedValueSchema\n"
64+
} else {
65+
""
66+
}
5167
val errorMsg = "Provided schema doesn't match to the schema for existing state! " +
5268
"Please note that Spark allow difference of field name: check count of fields " +
5369
"and data type of each field.\n" +
54-
s"- Provided key schema: $keySchema\n" +
55-
s"- Provided value schema: $valueSchema\n" +
56-
s"- Existing key schema: $storedKeySchema\n" +
57-
s"- Existing value schema: $storedValueSchema\n" +
70+
errorMsgForKeySchema +
71+
errorMsgForValueSchema +
5872
s"If you want to force running query without schema validation, please set " +
5973
s"${SQLConf.STATE_SCHEMA_CHECK_ENABLED.key} to false.\n" +
6074
"Please note running query with incompatible schema could cause indeterministic" +

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,12 @@ object StateStore extends Logging {
511511
val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf)
512512
// regardless of configuration, we check compatibility to at least write schema file
513513
// if necessary
514-
val ret = Try(checker.check(keySchema, valueSchema)).toEither.fold(Some(_), _ => None)
514+
// if the format validation for value schema is disabled, we also disable the schema
515+
// compatibility checker for value schema as well.
516+
val ret = Try(
517+
checker.check(keySchema, valueSchema,
518+
ignoreValueSchema = !storeConf.formatValidationCheckValue)
519+
).toEither.fold(Some(_), _ => None)
515520
if (storeConf.stateSchemaCheckEnabled) {
516521
ret
517522
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ class StateStoreConf(
4848
/** Whether validate the underlying format or not. */
4949
val formatValidationEnabled: Boolean = sqlConf.stateStoreFormatValidationEnabled
5050

51-
/** Whether validate the value format when the format invalidation enabled. */
51+
/**
52+
* Whether to validate the value side. This config is applied to both validators as below:
53+
*
54+
* - whether to validate the value format when the format validation is enabled.
55+
* - whether to validate the value schema when the state schema check is enabled.
56+
*/
5257
val formatValidationCheckValue: Boolean =
5358
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, "true") == "true"
5459

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,13 +783,15 @@ case class StreamingDeduplicateExec(
783783
keyExpressions, getStateInfo, conf) :: Nil
784784
}
785785

786+
private val schemaForEmptyRow: StructType = StructType(Array(StructField("__dummy__", NullType)))
787+
786788
override protected def doExecute(): RDD[InternalRow] = {
787789
metrics // force lazy init at driver
788790

789791
child.execute().mapPartitionsWithStateStore(
790792
getStateInfo,
791793
keyExpressions.toStructType,
792-
child.output.toStructType,
794+
schemaForEmptyRow,
793795
numColsPrefixKey = 0,
794796
session.sessionState,
795797
Some(session.streams.stateStoreCoordinator),
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"id":"33e8de33-00b8-4b60-8246-df2f433257ff"}

0 commit comments

Comments
 (0)