Skip to content

[SPARK-51745] Enforce State Machine for RocksDBStateStore #50497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 93 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
248fa8e
impl and unit test
liviazhu Mar 25, 2025
52085cb
fix unit tests
liviazhu Mar 25, 2025
c294c9f
remove store from loadedProviders from task thread
liviazhu Mar 26, 2025
44ddf22
impl and unit test
liviazhu Mar 25, 2025
a5495ee
fix unit tests
liviazhu Mar 25, 2025
75f7aaa
remove store from loadedProviders from task thread
liviazhu Mar 26, 2025
c70af5f
state store impl
liviazhu Mar 25, 2025
1c0106a
[WIP] Lock Hardening
ericm-db Apr 2, 2025
cbd35a0
correcting tests
ericm-db Apr 8, 2025
f6a35a0
Merge remote-tracking branch 'upstream/master' into state-machine-har…
ericm-db Apr 8, 2025
28210e5
formatting and other stuff
ericm-db Apr 8, 2025
5ca6244
adding tests
ericm-db Apr 9, 2025
5fab410
removing state_store_lock_violation
ericm-db Apr 9, 2025
e524ed3
Provider state machine (#25)
ericm-db Apr 15, 2025
dc03440
removing tests
ericm-db Apr 15, 2025
9016424
tests
ericm-db Apr 15, 2025
8a2887e
removing import
ericm-db Apr 15, 2025
3548ca3
nits
ericm-db Apr 15, 2025
c8c0a4e
addressing shutdown
ericm-db Apr 15, 2025
817030d
adding condition
ericm-db Apr 15, 2025
e53eaba
fixing state data source read suite
ericm-db Apr 15, 2025
17813c8
closing store in suite
ericm-db Apr 15, 2025
7dcb927
fix
ericm-db Apr 16, 2025
0808d04
v2 tests pass
ericm-db Apr 16, 2025
a2665ae
comments
ericm-db Apr 16, 2025
d42cc50
resolving conflicts
ericm-db Apr 16, 2025
9efeefd
removing unnecessary logging
ericm-db Apr 16, 2025
7b008b3
adding more docs
ericm-db Apr 16, 2025
3482480
removing unnecessary comment
ericm-db Apr 16, 2025
ccdabc9
refactoring to use private helper func
ericm-db Apr 16, 2025
a0ea879
updating doc
ericm-db Apr 16, 2025
02e8a91
removing param
ericm-db Apr 16, 2025
d774869
moving test back
ericm-db Apr 16, 2025
ba130a8
indentation
ericm-db Apr 16, 2025
642f3b4
addressing comments
ericm-db Apr 24, 2025
7f26f40
imports
ericm-db Apr 24, 2025
78af7fe
addressing Livia's comment
ericm-db Apr 24, 2025
ca302f6
feedback
ericm-db Apr 24, 2025
86f7e64
replacing log line
ericm-db Apr 24, 2025
c12a6ae
provider -> provider_id
ericm-db Apr 24, 2025
1f7c3de
removing taskContext since it's null
ericm-db Apr 24, 2025
a74f935
Merge remote-tracking branch 'upstream/master' into maintenance-changes
ericm-db Apr 24, 2025
d17e476
removing taskContext since it's null
ericm-db Apr 24, 2025
c363483
adding back tc
ericm-db Apr 24, 2025
7079852
adding test
ericm-db Apr 24, 2025
40fc8fb
state_store_provider_id -> task_id
ericm-db Apr 25, 2025
eb8f3e1
initial feedback
ericm-db Apr 25, 2025
31198b1
with timeout
ericm-db Apr 25, 2025
b241e33
adding queue
ericm-db Apr 28, 2025
6823586
resolving merge conflicts
ericm-db Apr 28, 2025
08f6afa
imports
ericm-db Apr 28, 2025
c3c160c
adding test case
ericm-db Apr 28, 2025
e1ebd4a
queueing changes
ericm-db Apr 28, 2025
db2506f
seconds -> ms
ericm-db Apr 28, 2025
7d5f168
boolean to enum and other feedback
ericm-db Apr 29, 2025
dab9e8f
thanks livia
ericm-db Apr 29, 2025
172ee83
waiting for timeout
ericm-db Apr 29, 2025
b051d5a
wrapping in synch block
ericm-db Apr 29, 2025
7384a84
adding condition on submitMaintenance
ericm-db Apr 29, 2025
46b300a
adding comment
ericm-db Apr 29, 2025
b3c584b
using threadlocal instead
ericm-db Apr 30, 2025
63573db
unnecessary change
ericm-db Apr 30, 2025
f7d0e70
removing clera
ericm-db Apr 30, 2025
560c5c7
Added cleanup
ericm-db Apr 30, 2025
81c0eed
adding assertion
ericm-db May 1, 2025
0465454
setting usedForWriteStore in StateStoreRDD
ericm-db May 1, 2025
93f014a
case class
ericm-db May 1, 2025
226f99b
changes
ericm-db May 1, 2025
a892142
test
ericm-db May 1, 2025
768346a
upgradeable read store
ericm-db May 2, 2025
776ad6b
code feedback
ericm-db May 2, 2025
4946f9b
adding parens
ericm-db May 2, 2025
411cce2
removing test
ericm-db May 2, 2025
55664c4
adding state
ericm-db May 2, 2025
9faf0a4
adding test
ericm-db May 2, 2025
2c34e9b
merging
ericm-db May 5, 2025
bda4234
merging
ericm-db May 5, 2025
545f9b1
building after merge
ericm-db May 5, 2025
2b5b095
added logs
ericm-db May 5, 2025
ecf5e0f
removing acquire and release calls
ericm-db May 5, 2025
a81fa47
moving clearStore to only mapPartitionsWithReadStateStore
ericm-db May 5, 2025
25f623b
ignoring other test
ericm-db May 5, 2025
253d70b
rm acquirelock from rocksdb and move tests (#26)
liviazhu May 5, 2025
3f1f985
removing abort failure listener from mapPartitionsWithReadStateStore
ericm-db May 6, 2025
bbce1fd
adding tasklistener changes
ericm-db May 6, 2025
0dcdf0e
printing log only if transition != Update
ericm-db May 12, 2025
e2dbaf0
adding transition logkey
ericm-db May 12, 2025
5839a88
adding condition on failure listener
ericm-db May 13, 2025
d909bf0
Merge branch 'master' into state-machine-hardening
ericm-db May 14, 2025
5fa6ea4
validateAndTransitionState before rollback()
ericm-db May 14, 2025
6de4e29
Merge branch 'state-machine-hardening' of github.com:ericm-db/spark i…
ericm-db May 14, 2025
42c3adc
adding more logs
ericm-db May 15, 2025
f43c8e9
logs
ericm-db May 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5032,6 +5032,13 @@
],
"sqlState" : "42802"
},
"STATE_STORE_UPDATING_AFTER_TASK_COMPLETION" : {
"message" : [
"State store id=<stateStoreId> still in updating state after task completed. If using foreachBatch, ",
"verify that it consumes the entire dataframe and does not catch and suppress errors during dataframe iteration."
],
"sqlState" : "XXKST"
},
"STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE" : {
"message" : [
"The streaming query failed to validate written state for value row for stateStore=<stateStoreID>.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ private[spark] object LogKeys {
case object STATE_STORE_PROVIDER extends LogKey
case object STATE_STORE_PROVIDER_ID extends LogKey
case object STATE_STORE_PROVIDER_IDS extends LogKey
case object STATE_STORE_STATE extends LogKey
case object STATE_STORE_VERSION extends LogKey
case object STATS extends LogKey
case object STATUS extends LogKey
Expand Down Expand Up @@ -867,6 +868,7 @@ private[spark] object LogKeys {
case object TRAIN_VALIDATION_SPLIT_METRIC extends LogKey
case object TRAIN_VALIDATION_SPLIT_METRICS extends LogKey
case object TRANSFER_TYPE extends LogKey
case object TRANSITION extends LogKey
case object TREE_NODE extends LogKey
case object TRIGGER_INTERVAL extends LogKey
case object UI_ACLS extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2260,6 +2260,13 @@ object SQLConf {
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300L)

val STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT =
buildConf("spark.sql.streaming.stateStore.maintenanceProcessingTimeout")
.internal()
.doc("Timeout in seconds to wait for maintenance to process this partition.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(30L)

val STATE_SCHEMA_CHECK_ENABLED =
buildConf("spark.sql.streaming.stateStore.stateSchemaCheck")
.doc("When true, Spark will validate the state schema against schema on existing state and " +
Expand Down Expand Up @@ -6018,6 +6025,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def stateStoreMaintenanceShutdownTimeout: Long = getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT)

def stateStoreMaintenanceProcessingTimeout: Long =
getConf(STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT)

def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)

def stateStoreFormatValidationEnabled: Boolean = getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ abstract class StatePartitionReaderBase(
stateStoreColFamilySchema.keyStateEncoderSpec.get,
useMultipleValuesPerKey = useMultipleValuesPerKey,
isInternal = isInternal)
store.abort()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we abort here? Can you leave a comment?

}
provider
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.{SparkConf, SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
Expand Down Expand Up @@ -89,6 +89,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

override def abort(): Unit = {}

override def release(): Unit = {}

override def toString(): String = {
s"HDFSReadStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]"
}
Expand All @@ -112,6 +114,19 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
case object UPDATING extends STATE
case object COMMITTED extends STATE
case object ABORTED extends STATE
case object RELEASED extends STATE

Option(TaskContext.get()).foreach { ctxt =>
ctxt.addTaskCompletionListener[Unit]( ctx => {
if (state == UPDATING) {
abort()
// Only throw error if the task is not already failed or interrupted
if (!ctx.isFailed() && !ctx.isInterrupted()) {
throw StateStoreErrors.stateStoreUpdatingAfterTaskCompletion(id)
}
}
})
}

private val newVersion = version + 1
@volatile private var state: STATE = UPDATING
Expand Down Expand Up @@ -194,6 +209,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)}")
}

override def release(): Unit = {
state = RELEASED
}

/**
* Get an iterator of all the store data.
* This can be called only after committing all the updates made in the current thread.
Expand Down Expand Up @@ -953,7 +972,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
override def replayStateFromSnapshot(
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = {
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " +
Expand Down
Loading