Skip to content

[SPARK-47363][SS] Initial State without state reader implementation for State API v2. #45467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 28 commits into from

Conversation

jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Mar 11, 2024

What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2.

Note that populating the initial state will only happen for the first batch of the new streaming query. Trying to re-initialize state for the same grouping key will result in an error.

Why are the changes needed?

These changes are needed to support initial state. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

Does this PR introduce any user-facing change?

Yes.
This PR introduces a new function:

def transformWithState(
      statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
      timeoutMode: TimeoutMode,
      outputMode: OutputMode,
      initialState: KeyValueGroupedDataset[K, S]): Dataset[U] 

How was this patch tested?

Unit tests in TransformWithStateWithInitialStateSuite

Was this patch authored or co-authored using generative AI tooling?

No

@jingz-db jingz-db changed the title WIP [SS] Initial State without state reader implementation for State API v2. Mar 11, 2024
@jingz-db jingz-db force-pushed the initial-state-state-v2 branch from 8ec379a to 8aac855 Compare March 12, 2024 01:21
@jingz-db jingz-db marked this pull request as ready for review March 12, 2024 01:21
@jingz-db jingz-db changed the title [SS] Initial State without state reader implementation for State API v2. [SS][SPARK-47363] Initial State without state reader implementation for State API v2. Mar 12, 2024
@jingz-db jingz-db force-pushed the initial-state-state-v2 branch from cd8b827 to 8fbd501 Compare March 13, 2024 00:24
@@ -85,3 +85,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
statefulProcessorHandle
}
}

/**
* Similar usage as StatefulProcessor. Represents the arbitrary stateful logic that needs to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reword this - Stateful processor with support for specifying initial state. Accepts a user-defined type as initial state to be initialized in the first batch. This can be used for starting a new streaming query with existing state from a previous streaming query ?

@@ -665,7 +665,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
outputMode: OutputMode = OutputMode.Append()): Dataset[U] = {
Dataset[U](
sparkSession,
TransformWithState[K, V, U](
// The last K type is only to silence compiler error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to avoid this ?

@anishshri-db
Copy link
Contributor

Error seems relevant on the MIMA checks -

problems with Sql module: 
method transformWithState(org.apache.spark.sql.streaming.StatefulProcessorWithInitialState,org.apache.spark.sql.streaming.TimeoutMode,org.apache.spark.sql.streaming.OutputMode,org.apache.spark.sql.KeyValueGroupedDataset,org.apache.spark.sql.Encoder,org.apache.spark.sql.Encoder)org.apache.spark.sql.Dataset in class org.apache.spark.sql.KeyValueGroupedDataset does not have a correspondent in client version

we probably need to update the Connect variants as well

key: String,
initialState: (String, Double)): Unit = {
val initStateVal = initialState._2
_valState.update(initStateVal)
Copy link
Contributor

@anishshri-db anishshri-db Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simulate an actual case class for initial state that stores list/map and/or iterator for list values/iterator for map key-values ?

* the query in the first batch.
*
*/
def transformWithState[U: Encoder, S: Encoder](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[sql]

We want to defer exposing the API to public till we complete the work.

@jingz-db jingz-db force-pushed the initial-state-state-v2 branch from 25d7bab to 9f18601 Compare March 14, 2024 19:33
@jingz-db jingz-db requested a review from anishshri-db March 14, 2024 19:51
@jingz-db jingz-db requested a review from anishshri-db March 20, 2024 22:58
child.execute().mapPartitionsWithStateStore[InternalRow](
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
val hadoopConfBroadcast = sparkContext.broadcast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% percent sure, but this will distribute the read-only variable hadoopConf to all executors - similar as here:

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val hadoopConfBroadcast = session.sparkContext.broadcast(
new SerializableConfiguration(session.sessionState.newHadoopConf()))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is a code comment. The practice seems to be that it's better to use broadcast rather than task serialization as it could be huge.

@jingz-db jingz-db requested a review from anishshri-db March 21, 2024 17:40
child.execute().mapPartitionsWithStateStore[InternalRow](
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
val hadoopConfBroadcast =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this was only needed for the batch support part right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jingz-db jingz-db requested a review from HeartSaVioR March 26, 2024 21:11
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet reviewed the test suite, though I guess Anish has reviewed in detail.

child = logicalPlan,
initialState.groupingAttributes,
initialState.dataAttributes,
initialState.queryExecution.logical
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we follow the practice we did in flatMapGroupsWithState for safeness sake?

initialState.queryExecution.analyzed

@@ -268,11 +268,13 @@ class IncrementalExecution(
)

case t: TransformWithStateExec =>
val hasInitialState = (isFirstBatch && t.hasInitialState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to allow adding state in the middle of the query lifecycle. Here isFirstBatch does not mean batch ID = 0 but mean this is the first batch in this query run.

This should follow the above logic we did for FlatMapGroupsWithStateExec, currentBatchId == 0L.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if this is a different functionality than we had in flatMapGroupsWithState.

child.execute().mapPartitionsWithStateStore[InternalRow](
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
val hadoopConfBroadcast = sparkContext.broadcast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is a code comment. The practice seems to be that it's better to use broadcast rather than task serialization as it could be huge.

processData(store, singleIterator)
}
} else {
// If the query is running in batch mode, we need to create a new StateStore and instantiate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: apply the same practice while we are here? broadcast

useMultipleValuesPerKey = true)
val store = stateStoreProvider.getStore(0)

processDataWithInitialState(store, childDataIterator, initStateIterator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We close the state store and state store provider in batch codepath (see below). Shall we do that here as well?

Also, this is a good representation that we have duplicated code. two batch parts have similarity on spinning up state store provider and state store, and also closing them. That could be extracted out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good advice! Refactored duplicated codes into initNewStateStoreAndProcessData().


// Check if is first batch
// Only process initial states for first batch
if (processorHandle.getQueryInfo().getBatchId == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see we have multiple checks. Though still better to change the condition in IncrementalExecution as reader can misunderstand that there are inconsistency between flatMapGroupsWithState and transformWithState.

@beliefer beliefer changed the title [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [SPARK-47363][SS] Initial State without state reader implementation for State API v2. Mar 27, 2024
@github-actions github-actions bot added the BUILD label Mar 27, 2024
@jingz-db jingz-db force-pushed the initial-state-state-v2 branch from e528dfd to b3394d0 Compare March 27, 2024 20:59
@github-actions github-actions bot removed the BUILD label Mar 27, 2024
@jingz-db jingz-db requested a review from HeartSaVioR March 27, 2024 22:53
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR
Copy link
Contributor

CI failure isn't related - only pyspark-connect failed.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

sweisdb pushed a commit to sweisdb/spark that referenced this pull request Apr 1, 2024
…or State API v2

### What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2.

Note that populating the initial state will only happen for the first batch of the new streaming query. Trying to re-initialize state for the same grouping key will result in an error.

### Why are the changes needed?

These changes are needed to support initial state. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

### Does this PR introduce _any_ user-facing change?

Yes.
This PR introduces a new function:
```
def transformWithState(
      statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
      timeoutMode: TimeoutMode,
      outputMode: OutputMode,
      initialState: KeyValueGroupedDataset[K, S]): Dataset[U]
```

### How was this patch tested?

Unit tests in `TransformWithStateWithInitialStateSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45467 from jingz-db/initial-state-state-v2.

Lead-authored-by: jingz-db <jing.zhan@databricks.com>
Co-authored-by: Jing Zhan <135738831+jingz-db@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
HeartSaVioR pushed a commit that referenced this pull request Nov 6, 2024
…mWithStateInPandas

### What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2 in Python.
The Scala PR for supporting initial state is here: #45467

We propose to create a new PythonRunner that handles initial state specifically for TransformWithStateInPandas. From JVM, we coGroup input rows and initial state rows on the same grouping key. Then we create a new row that contains one row in the input rows iterator and one row in the initial state iterator, and send the new grouped row to Py4j. Inside the python worker, we deserialize the grouped row into input rows and initial state rows separately and input those into `handleInitialState` and `handleInputRows`.
We will launch a python worker for each partition that has a non-empty input rows in either input rows or initial states. This will guarantee all keys in the initial state will be processed even if they do not appear in the first batch or they don't lie in the same partition with keys in the first batch.

### Why are the changes needed?

We need to couple the API as we support initial state handling in Scala.

### Does this PR introduce _any_ user-facing change?

Yes.
This PR introduces a new API in the `StatefulProcessor` which allows users to define their own udf for processing initial state:
```
 def handleInitialState(
        self, key: Any, initialState: "PandasDataFrameLike"
    ) -> None:
```
The implementation of this function is optional. If not defined, then it will act as no-op.

### How was this patch tested?

Unit tests & integration tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48005 from jingz-db/python-init-state-impl.

Authored-by: jingz-db <jing.zhan@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
HeartSaVioR added a commit that referenced this pull request Nov 12, 2024
…eader

### What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2. More specifically, this dataframe is coming from state data source reader.

Remove the restraints that initialState dataframe can only contains one value row for a grouping key. This is to enable the integration with state data source reader. In flattened state data source reader for composite type, we will have multiple value rows mapping to the same grouping key.

For example, we can union dataframe created by state data source reader on a single state variable and union them together and get an output dataframe as initial state for a transformWithState operator like this:
```
+-----------+-----+---------+----------+------------+
|groupingKey|value|listValue|userMapKey|userMapValue|
+-----------+-----+---------+----------+------------+
|a          |3    |NULL     |NULL      |NULL        |
|b          |2    |NULL     |NULL      |NULL        |
|a          |NULL |1        |NULL      |NULL        |
|a          |NULL |2        |NULL      |NULL        |
|a          |NULL |3        |NULL      |NULL        |
|b          |NULL |1        |NULL      |NULL        |
|b          |NULL |2        |NULL      |NULL        |
|a          |NULL |NULL     |a         |3           |
|b          |NULL |NULL     |b         |2           |
+-----------+-----+---------+----------+------------+
```
### Why are the changes needed?

This change is for supporting initial state handling for integration with state data source reader.

### Does this PR introduce _any_ user-facing change?

No. The user API is the same as prior PR: #45467 for initial state support without state data source reader.

### How was this patch tested?

Unit test cases added in `TransformWithStateWithInitialStateSuite`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48686 from jingz-db/initial-state-reader-integration.

Lead-authored-by: jingz-db <jing.zhan@databricks.com>
Co-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants