-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49601][SS][PYTHON] Support Initial State Handling for TransformWithStateInPandas #48005
base: master
Are you sure you want to change the base?
Conversation
253e56d
to
099d827
Compare
sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
Outdated
Show resolved
Hide resolved
case _ => | ||
throw new IllegalArgumentException("Invalid method call") | ||
} | ||
} | ||
|
||
private def handleStatefulProcessorUtilRequest(message: UtilsCallCommand): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add some scala unit tests for these 2 new APIs?
...main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
Outdated
Show resolved
Hide resolved
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py
Outdated
Show resolved
Hide resolved
yield pd.DataFrame({"id": key, "value": str(accumulated_value)}) | ||
|
||
def handleInitialState(self, key, initialState) -> None: | ||
initVal = initialState.at[0, "initVal"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add verifications on the initVal here?
if isinstance(outputStructType, str): | ||
outputStructType = cast(StructType, _parse_datatype_string(outputStructType)) | ||
def transformWithStateWithInitStateUDF( | ||
statefulProcessorApiClient: StatefulProcessorApiClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: fix the indent.
@@ -234,3 +234,12 @@ def close(self) -> None: | |||
operations. | |||
""" | |||
... | |||
|
|||
def handleInitialState( | |||
self, key: Any, initialState: "PandasDataFrameLike" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indent.
@@ -402,6 +404,9 @@ def transformWithStateInPandas( | |||
The output mode of the stateful processor. | |||
timeMode : str | |||
The time mode semantics of the stateful processor for timers and TTL. | |||
initialState: "GroupedData" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use something like below to represent the actual type.
:class:`pyspark.sql.types.DataType`
StatefulProcessorHandleState.INITIALIZED | ||
) | ||
|
||
# if we don't have state for the given key but in initial state, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you wanted to say if we only have initial state but don't have input rows for the given key, the inputRows iterator could be empty
?
@@ -168,13 +169,10 @@ class TransformWithStateInPandasStateServer( | |||
val requestedState = message.getSetHandleState.getState | |||
requestedState match { | |||
case HandleState.CREATED => | |||
logInfo(log"set handle state to Created") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to remove these log lines?
return False | ||
else: | ||
# TODO(SPARK-49233): Classify user facing errors. | ||
raise PySparkRuntimeError(f"Error getting batch id: " f"{response_message[1]}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a better error message?
with PythonArrowInput[GroupedInType] { | ||
|
||
override protected lazy val schema: StructType = new StructType() | ||
.add("state", dataSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use data
or inputDate
to differentiate with initState
?
) -> Iterator["PandasDataFrameLike"]: | ||
handle = StatefulProcessorHandle(statefulProcessorApiClient) | ||
|
||
if statefulProcessorApiClient.handle_state == StatefulProcessorHandleState.CREATED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's something not very clear to me here, could you help me understand more?
We only call handleInitialState
when handle state is CREATED
, but after we processed the initial state of the first grouping key, we update the state to be INITIALIZED
. Wouldn't that skip the initial state for other grouping keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my understanding is correct, we should move the handleInitialState
outside the handle state check, do it after the init
call.
statefulProcessorApiClient: StatefulProcessorApiClient, | ||
key: Any, | ||
inputRows: Iterator["PandasDataFrameLike"], | ||
# for non first batch, initialStates will be None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non first batch, would initialStates be None or empty?
inputRows: Iterator["PandasDataFrameLike"], | ||
# for non first batch, initialStates will be None | ||
initialStates: Iterator["PandasDataFrameLike"] = None | ||
) -> Iterator["PandasDataFrameLike"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some commentss on the possible input combinations that we need to handle in this udf for people to understand easier? IIUC there should be 3 cases:
- Both
inputRows
andinitialStates
contain data. This would only happen in the first batch and the associated grouping key contains both input data and initial state. - Only
inputRows
contains data. This could happen when either the grouping key doesn't have any initial state to process or it's non first batch. - Only
initialStates
contains data. This could happen when the grouping key doesn't have any associated input data but it has initial state to process.
What changes were proposed in this pull request?
This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2 in Python.
The Scala PR for supporting initial state is here: #45467
We propose to create a new PythonRunner that handles initial state specifically for TransformWithStateInPandas. From JVM, we coGroup input rows and initial state rows on the same grouping key. Then we create a new row that contains one row in the input rows iterator and one row in the initial state iterator, and send the new grouped row to Py4j. Inside the python worker, we deserialize the grouped row into input rows and initial state rows separately and input those into
handleInitialState
andhandleInputRows
.We will launch a python worker for each partition that has a non-empty input rows in either input rows or initial states. This will guarantee all keys in the initial state will be processed even if they do not appear in the first batch or they don't lie in the same partition with keys in the first batch.
Why are the changes needed?
We need to couple the API as we support initial state handling in Scala.
Does this PR introduce any user-facing change?
Yes.
This PR introduces a new API in the
StatefulProcessor
which allows users to define their own udf for processing initial state:The implementation of this function is optional. If not defined, then it will act as no-op.
How was this patch tested?
Unit tests & integration tests.
Was this patch authored or co-authored using generative AI tooling?
No.