-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-47363][SS] Initial State without state reader implementation for State API v2. #45467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8ec379a
to
8aac855
Compare
cd8b827
to
8fbd501
Compare
@@ -85,3 +85,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { | |||
statefulProcessorHandle | |||
} | |||
} | |||
|
|||
/** | |||
* Similar usage as StatefulProcessor. Represents the arbitrary stateful logic that needs to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe reword this - Stateful processor with support for specifying initial state. Accepts a user-defined type as initial state to be initialized in the first batch. This can be used for starting a new streaming query with existing state from a previous streaming query
?
@@ -665,7 +665,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( | |||
outputMode: OutputMode = OutputMode.Append()): Dataset[U] = { | |||
Dataset[U]( | |||
sparkSession, | |||
TransformWithState[K, V, U]( | |||
// The last K type is only to silence compiler error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way to avoid this ?
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
Error seems relevant on the MIMA checks -
we probably need to update the Connect variants as well |
key: String, | ||
initialState: (String, Double)): Unit = { | ||
val initStateVal = initialState._2 | ||
_valState.update(initStateVal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simulate an actual case class for initial state that stores list/map and/or iterator for list values/iterator for map key-values ?
* the query in the first batch. | ||
* | ||
*/ | ||
def transformWithState[U: Encoder, S: Encoder]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[sql]
We want to defer exposing the API to public till we complete the work.
25d7bab
to
9f18601
Compare
child.execute().mapPartitionsWithStateStore[InternalRow]( | ||
if (hasInitialState) { | ||
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf) | ||
val hadoopConfBroadcast = sparkContext.broadcast( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% percent sure, but this will distribute the read-only variable hadoopConf
to all executors - similar as here:
Lines 55 to 57 in 74a9c6c
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it | |
private val hadoopConfBroadcast = session.sparkContext.broadcast( | |
new SerializableConfiguration(session.sessionState.newHadoopConf())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there is a code comment. The practice seems to be that it's better to use broadcast rather than task serialization as it could be huge.
child.execute().mapPartitionsWithStateStore[InternalRow]( | ||
if (hasInitialState) { | ||
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf) | ||
val hadoopConfBroadcast = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean this was only needed for the batch support part right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will also need this for StateStore.get
here: https://github.com/apache/spark/blob/40465b6760fb120c9cc3ac1a4ee42a82843f4bc5/sql/[…]ache/spark/sql/execution/streaming/TransformWithStateExec.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet reviewed the test suite, though I guess Anish has reviewed in detail.
child = logicalPlan, | ||
initialState.groupingAttributes, | ||
initialState.dataAttributes, | ||
initialState.queryExecution.logical |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we follow the practice we did in flatMapGroupsWithState for safeness sake?
initialState.queryExecution.analyzed
@@ -268,11 +268,13 @@ class IncrementalExecution( | |||
) | |||
|
|||
case t: TransformWithStateExec => | |||
val hasInitialState = (isFirstBatch && t.hasInitialState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to allow adding state in the middle of the query lifecycle. Here isFirstBatch
does not mean batch ID = 0 but mean this is the first batch in this query run.
This should follow the above logic we did for FlatMapGroupsWithStateExec, currentBatchId == 0L
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let me know if this is a different functionality than we had in flatMapGroupsWithState.
child.execute().mapPartitionsWithStateStore[InternalRow]( | ||
if (hasInitialState) { | ||
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf) | ||
val hadoopConfBroadcast = sparkContext.broadcast( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there is a code comment. The practice seems to be that it's better to use broadcast rather than task serialization as it could be huge.
processData(store, singleIterator) | ||
} | ||
} else { | ||
// If the query is running in batch mode, we need to create a new StateStore and instantiate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: apply the same practice while we are here? broadcast
useMultipleValuesPerKey = true) | ||
val store = stateStoreProvider.getStore(0) | ||
|
||
processDataWithInitialState(store, childDataIterator, initStateIterator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We close the state store and state store provider in batch codepath (see below). Shall we do that here as well?
Also, this is a good representation that we have duplicated code. two batch parts have similarity on spinning up state store provider and state store, and also closing them. That could be extracted out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good advice! Refactored duplicated codes into initNewStateStoreAndProcessData()
.
|
||
// Check if is first batch | ||
// Only process initial states for first batch | ||
if (processorHandle.getQueryInfo().getBatchId == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I see we have multiple checks. Though still better to change the condition in IncrementalExecution as reader can misunderstand that there are inconsistency between flatMapGroupsWithState and transformWithState.
e528dfd
to
b3394d0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
CI failure isn't related - only pyspark-connect failed. |
Thanks! Merging to master. |
…or State API v2 ### What changes were proposed in this pull request? This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2. Note that populating the initial state will only happen for the first batch of the new streaming query. Trying to re-initialize state for the same grouping key will result in an error. ### Why are the changes needed? These changes are needed to support initial state. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939 ### Does this PR introduce _any_ user-facing change? Yes. This PR introduces a new function: ``` def transformWithState( statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S], timeoutMode: TimeoutMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S]): Dataset[U] ``` ### How was this patch tested? Unit tests in `TransformWithStateWithInitialStateSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45467 from jingz-db/initial-state-state-v2. Lead-authored-by: jingz-db <jing.zhan@databricks.com> Co-authored-by: Jing Zhan <135738831+jingz-db@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…mWithStateInPandas ### What changes were proposed in this pull request? This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2 in Python. The Scala PR for supporting initial state is here: #45467 We propose to create a new PythonRunner that handles initial state specifically for TransformWithStateInPandas. From JVM, we coGroup input rows and initial state rows on the same grouping key. Then we create a new row that contains one row in the input rows iterator and one row in the initial state iterator, and send the new grouped row to Py4j. Inside the python worker, we deserialize the grouped row into input rows and initial state rows separately and input those into `handleInitialState` and `handleInputRows`. We will launch a python worker for each partition that has a non-empty input rows in either input rows or initial states. This will guarantee all keys in the initial state will be processed even if they do not appear in the first batch or they don't lie in the same partition with keys in the first batch. ### Why are the changes needed? We need to couple the API as we support initial state handling in Scala. ### Does this PR introduce _any_ user-facing change? Yes. This PR introduces a new API in the `StatefulProcessor` which allows users to define their own udf for processing initial state: ``` def handleInitialState( self, key: Any, initialState: "PandasDataFrameLike" ) -> None: ``` The implementation of this function is optional. If not defined, then it will act as no-op. ### How was this patch tested? Unit tests & integration tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48005 from jingz-db/python-init-state-impl. Authored-by: jingz-db <jing.zhan@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…eader ### What changes were proposed in this pull request? This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2. More specifically, this dataframe is coming from state data source reader. Remove the restraints that initialState dataframe can only contains one value row for a grouping key. This is to enable the integration with state data source reader. In flattened state data source reader for composite type, we will have multiple value rows mapping to the same grouping key. For example, we can union dataframe created by state data source reader on a single state variable and union them together and get an output dataframe as initial state for a transformWithState operator like this: ``` +-----------+-----+---------+----------+------------+ |groupingKey|value|listValue|userMapKey|userMapValue| +-----------+-----+---------+----------+------------+ |a |3 |NULL |NULL |NULL | |b |2 |NULL |NULL |NULL | |a |NULL |1 |NULL |NULL | |a |NULL |2 |NULL |NULL | |a |NULL |3 |NULL |NULL | |b |NULL |1 |NULL |NULL | |b |NULL |2 |NULL |NULL | |a |NULL |NULL |a |3 | |b |NULL |NULL |b |2 | +-----------+-----+---------+----------+------------+ ``` ### Why are the changes needed? This change is for supporting initial state handling for integration with state data source reader. ### Does this PR introduce _any_ user-facing change? No. The user API is the same as prior PR: #45467 for initial state support without state data source reader. ### How was this patch tested? Unit test cases added in `TransformWithStateWithInitialStateSuite`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48686 from jingz-db/initial-state-reader-integration. Lead-authored-by: jingz-db <jing.zhan@databricks.com> Co-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2.
Note that populating the initial state will only happen for the first batch of the new streaming query. Trying to re-initialize state for the same grouping key will result in an error.
Why are the changes needed?
These changes are needed to support initial state. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939
Does this PR introduce any user-facing change?
Yes.
This PR introduces a new function:
How was this patch tested?
Unit tests in
TransformWithStateWithInitialStateSuite
Was this patch authored or co-authored using generative AI tooling?
No