Skip to content

Commit

Permalink
[SPARK-49594][SS] Adding check on whether columnFamilies were added o…
Browse files Browse the repository at this point in the history
…r removed to write StateSchemaV3 file

### What changes were proposed in this pull request?

Up until this [PR](apache#47880) that enabled deleteIfExists, we changed the condition on which we throw an error. However, in doing so, we are not writing schema files whenever we add or remove column families, which is functionally incorrect.
Additionally, we were initially always writing the newSchemaFilePath to the OperatorStateMetadata upon every new query run, when we should only do this if the schema changes.
### Why are the changes needed?

These changes are needed because we want to write a schema file out every time we add or remove column families. Also, we want to make sure that we point to the old schema file for the current metadata file if the schema has not changed between this run and the last one, as opposed to populating the metadata with a new schema file path every time, even if this file is not created.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Amended unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48067 from ericm-db/add-remove-cf.

Authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
ericm-db authored and himadripal committed Oct 19, 2024
1 parent 8fe8ad3 commit 2867c5f
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, StatefulOperatorStateInfo}
import org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, SchemaWriter}
import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.SCHEMA_FORMAT_V3
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -95,7 +96,7 @@ class StateSchemaCompatibilityChecker(
stateStoreColFamilySchema: List[StateStoreColFamilySchema],
stateSchemaVersion: Int): Unit = {
// Ensure that schema file path is passed explicitly for schema version 3
if (stateSchemaVersion == 3 && newSchemaFilePath.isEmpty) {
if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFilePath.isEmpty) {
throw new IllegalStateException("Schema file path is required for schema version 3")
}

Expand Down Expand Up @@ -186,8 +187,13 @@ class StateSchemaCompatibilityChecker(
check(existingStateSchema, newSchema, ignoreValueSchema)
}
}
val colFamiliesAddedOrRemoved =
newStateSchemaList.map(_.colFamilyName) != existingStateSchemaList.map(_.colFamilyName)
if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) {
createSchemaFile(newStateSchemaList, stateSchemaVersion)
}
// TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3
false
colFamiliesAddedOrRemoved
}
}

Expand All @@ -196,6 +202,9 @@ class StateSchemaCompatibilityChecker(
}

object StateSchemaCompatibilityChecker {

val SCHEMA_FORMAT_V3: Int = 3

private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
if (!UnsafeRowUtils.isBinaryStable(schema)) {
throw new SparkUnsupportedOperationException(
Expand Down Expand Up @@ -275,10 +284,31 @@ object StateSchemaCompatibilityChecker {
if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
throw result.get
}
val schemaFileLocation = newSchemaFilePath match {
case Some(path) => path.toString
case None => checker.schemaFileLocation.toString
val schemaFileLocation = if (evolvedSchema) {
// if we are using the state schema v3, and we have
// evolved schema, this newSchemaFilePath should be defined
// and we want to populate the metadata with this file
if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
newSchemaFilePath.get.toString
} else {
// if we are using any version less than v3, we have written
// the schema to this static location, which we will return
checker.schemaFileLocation.toString
}
} else {
// if we have not evolved schema (there has been a previous schema)
// and we are using state schema v3, this file path would be defined
// so we would just populate the next run's metadata file with this
// file path
if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
oldSchemaFilePath.get.toString
} else {
// if we are using any version less than v3, we have written
// the schema to this static location, which we will return
checker.schemaFileLocation.toString
}
}

StateSchemaValidationResult(evolvedSchema, schemaFileLocation)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,10 @@ class TransformWithStateSuite extends StateStoreMetricsTest
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withTempDir { chkptDir =>
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)

val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
// in this test case, we are changing the state spec back and forth
// to trigger the writing of the schema and metadata files
val inputData = MemoryStream[(String, String)]
Expand Down Expand Up @@ -1483,6 +1487,11 @@ class TransformWithStateSuite extends StateStoreMetricsTest
},
StopStream
)
// assert that a metadata and schema file has been written for each run
// as state variables have been deleted
assert(getFiles(metadataPath).length == 2)
assert(getFiles(stateSchemaPath).length == 2)

val result3 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
Expand Down Expand Up @@ -1512,10 +1521,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest
},
StopStream
)
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)

val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
// by the end of the test, there have been 4 batches,
// so the metadata and schema logs, and commitLog has been purged
// for batches 0 and 1 so metadata and schema files exist for batches 0, 1, 2, 3
Expand All @@ -1527,6 +1532,116 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}

test("transformWithState - verify that schema file is kept after metadata is purged") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
withTempDir { chkptDir =>
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)

val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
// in this test case, we are changing the state spec back and forth
// to trigger the writing of the schema and metadata files
val inputData = MemoryStream[(String, String)]
val result1 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())
testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("a", "str1")),
CheckNewAnswer(("a", "1", "")),
Execute { q =>
eventually(timeout(Span(5, Seconds))) {
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
}
},
StopStream
)
testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("a", "str1")),
CheckNewAnswer(("a", "2", "str1")),
Execute { q =>
eventually(timeout(Span(5, Seconds))) {
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
}
},
StopStream
)
val result2 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new MostRecentStatefulProcessorWithDeletion(),
TimeMode.None(),
OutputMode.Update())
testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("a", "str2")),
CheckNewAnswer(("a", "str1")),
Execute { q =>
eventually(timeout(Span(5, Seconds))) {
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
}
},
StopStream
)
assert(getFiles(metadataPath).length == 3)
assert(getFiles(stateSchemaPath).length == 2)

val result3 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())
testStream(result3, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("a", "str3")),
CheckNewAnswer(("a", "1", "str2")),
Execute { q =>
eventually(timeout(Span(5, Seconds))) {
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
}
},
StopStream
)
// metadata files should be kept for batches 1, 2, 3
// schema files should be kept for batches 0, 2, 3
assert(getFiles(metadataPath).length == 3)
assert(getFiles(stateSchemaPath).length == 3)
// we want to ensure that we can read batch 1 even though the
// metadata file for batch 0 was removed
val batch1Df = spark.read
.format("statestore")
.option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
.option(StateSourceOptions.STATE_VAR_NAME, "countState")
.option(StateSourceOptions.BATCH_ID, 1)
.load()

val batch1AnsDf = batch1Df.selectExpr(
"key.value AS groupingKey",
"single_value.value AS valueId")

checkAnswer(batch1AnsDf, Seq(Row("a", 2L)))

val batch3Df = spark.read
.format("statestore")
.option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
.option(StateSourceOptions.STATE_VAR_NAME, "countState")
.option(StateSourceOptions.BATCH_ID, 3)
.load()

val batch3AnsDf = batch3Df.selectExpr(
"key.value AS groupingKey",
"single_value.value AS valueId")
checkAnswer(batch3AnsDf, Seq(Row("a", 1L)))
}
}
}

test("state data source integration - value state supports time travel") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
Expand Down Expand Up @@ -1708,6 +1823,102 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}
}

test("transformWithState - verify that no metadata and schema logs are purged after" +
" removing column family") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "3") {
withTempDir { chkptDir =>
val inputData = MemoryStream[(String, String)]
val result1 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
TimeMode.None(),
OutputMode.Update())
testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("a", "str1")),
CheckNewAnswer(("a", "1", "")),
AddData(inputData, ("a", "str1")),
CheckNewAnswer(("a", "2", "str1")),
Execute { q =>
eventually(timeout(Span(5, Seconds))) {
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
}
},
StopStream
)
testStream(result1, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "1", "")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "2", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "3", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "4", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "5", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "6", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "7", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "8", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "9", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "10", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "11", "str1")),
AddData(inputData, ("b", "str1")),
CheckNewAnswer(("b", "12", "str1")),
Execute { q =>
eventually(timeout(Span(5, Seconds))) {
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
}
},
StopStream
)
val result2 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new MostRecentStatefulProcessorWithDeletion(),
TimeMode.None(),
OutputMode.Update())

testStream(result2, OutputMode.Update())(
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
AddData(inputData, ("b", "str2")),
CheckNewAnswer(("b", "str1")),
AddData(inputData, ("b", "str3")),
CheckNewAnswer(("b", "str2")),
Execute { q =>
eventually(timeout(Span(5, Seconds))) {
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
}
},
StopStream
)

val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)

val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)

// Metadata files are written for batches 0, 2, and 14.
// Schema files are written for 0, 14
// At the beginning of the last query run, the thresholdBatchId is 11.
// However, we would need both schema files to be preserved, if we want to
// be able to read from batch 11 onwards.
assert(getFiles(metadataPath).length == 2)
assert(getFiles(stateSchemaPath).length == 2)
}
}
}
}

class TransformWithStateValidationSuite extends StateStoreMetricsTest {
Expand Down

0 comments on commit 2867c5f

Please sign in to comment.