Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49601][SS][PYTHON] Support Initial State Handling for TransformWithStateInPandas #48005

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Sep 5, 2024

What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2 in Python.
The Scala PR for supporting initial state is here: #45467

We propose to create a new PythonRunner that handles initial state specifically for TransformWithStateInPandas. From JVM, we coGroup input rows and initial state rows on the same grouping key. Then we create a new row that contains one row in the input rows iterator and one row in the initial state iterator, and send the new grouped row to Py4j. Inside the python worker, we deserialize the grouped row into input rows and initial state rows separately and input those into handleInitialState and handleInputRows.
We will launch a python worker for each partition that has a non-empty input rows in either input rows or initial states. This will guarantee all keys in the initial state will be processed even if they do not appear in the first batch or they don't lie in the same partition with keys in the first batch.

Why are the changes needed?

We need to couple the API as we support initial state handling in Scala.

Does this PR introduce any user-facing change?

Yes.
This PR introduces a new API in the StatefulProcessor which allows users to define their own udf for processing initial state:

 def handleInitialState(
        self, key: Any, initialState: "PandasDataFrameLike"
    ) -> None:

The implementation of this function is optional. If not defined, then it will act as no-op.

How was this patch tested?

Unit tests & integration tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@jingz-db jingz-db changed the title [SS][PYTHON] Support Initial State for TransformWithStateInPandas [SS][PYTHON] Support Initial State Handling for TransformWithStateInPandas Sep 5, 2024
@jingz-db jingz-db marked this pull request as ready for review September 9, 2024 16:48
@github-actions github-actions bot added the DOCS label Sep 9, 2024
@jingz-db jingz-db changed the title [SS][PYTHON] Support Initial State Handling for TransformWithStateInPandas [SPARK-49601][SS][PYTHON] Support Initial State Handling for TransformWithStateInPandas Sep 11, 2024
case _ =>
throw new IllegalArgumentException("Invalid method call")
}
}

private def handleStatefulProcessorUtilRequest(message: UtilsCallCommand): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some scala unit tests for these 2 new APIs?

yield pd.DataFrame({"id": key, "value": str(accumulated_value)})

def handleInitialState(self, key, initialState) -> None:
initVal = initialState.at[0, "initVal"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add verifications on the initVal here?

@github-actions github-actions bot added the CORE label Oct 10, 2024
if isinstance(outputStructType, str):
outputStructType = cast(StructType, _parse_datatype_string(outputStructType))
def transformWithStateWithInitStateUDF(
statefulProcessorApiClient: StatefulProcessorApiClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: fix the indent.

@@ -234,3 +234,12 @@ def close(self) -> None:
operations.
"""
...

def handleInitialState(
self, key: Any, initialState: "PandasDataFrameLike"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent.

@@ -402,6 +404,9 @@ def transformWithStateInPandas(
The output mode of the stateful processor.
timeMode : str
The time mode semantics of the stateful processor for timers and TTL.
initialState: "GroupedData"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use something like below to represent the actual type.

:class:`pyspark.sql.types.DataType`

StatefulProcessorHandleState.INITIALIZED
)

# if we don't have state for the given key but in initial state,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you wanted to say if we only have initial state but don't have input rows for the given key, the inputRows iterator could be empty?

@@ -168,13 +169,10 @@ class TransformWithStateInPandasStateServer(
val requestedState = message.getSetHandleState.getState
requestedState match {
case HandleState.CREATED =>
logInfo(log"set handle state to Created")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to remove these log lines?

return False
else:
# TODO(SPARK-49233): Classify user facing errors.
raise PySparkRuntimeError(f"Error getting batch id: " f"{response_message[1]}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a better error message?

with PythonArrowInput[GroupedInType] {

override protected lazy val schema: StructType = new StructType()
.add("state", dataSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use data or inputDate to differentiate with initState?

) -> Iterator["PandasDataFrameLike"]:
handle = StatefulProcessorHandle(statefulProcessorApiClient)

if statefulProcessorApiClient.handle_state == StatefulProcessorHandleState.CREATED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's something not very clear to me here, could you help me understand more?

We only call handleInitialState when handle state is CREATED, but after we processed the initial state of the first grouping key, we update the state to be INITIALIZED. Wouldn't that skip the initial state for other grouping keys?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my understanding is correct, we should move the handleInitialState outside the handle state check, do it after the init call.

statefulProcessorApiClient: StatefulProcessorApiClient,
key: Any,
inputRows: Iterator["PandasDataFrameLike"],
# for non first batch, initialStates will be None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non first batch, would initialStates be None or empty?

inputRows: Iterator["PandasDataFrameLike"],
# for non first batch, initialStates will be None
initialStates: Iterator["PandasDataFrameLike"] = None
) -> Iterator["PandasDataFrameLike"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some commentss on the possible input combinations that we need to handle in this udf for people to understand easier? IIUC there should be 3 cases:

  • Both inputRows and initialStates contain data. This would only happen in the first batch and the associated grouping key contains both input data and initial state.
  • Only inputRows contains data. This could happen when either the grouping key doesn't have any initial state to process or it's non first batch.
  • Only initialStates contains data. This could happen when the grouping key doesn't have any associated input data but it has initial state to process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants