Skip to content

Commit

Permalink
resolve conflicts with master
Browse files Browse the repository at this point in the history
  • Loading branch information
jingz-db committed Sep 12, 2024
1 parent e037953 commit 099d827
Show file tree
Hide file tree
Showing 16 changed files with 2,775 additions and 357 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ Stateful Processor

StatefulProcessor.init
StatefulProcessor.handleInputRows
StatefulProcessor.close
StatefulProcessor.close
StatefulProcessor.handleInitialState
15 changes: 15 additions & 0 deletions python/pyspark/sql/pandas/group_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ def transformWithStateInPandas(
outputStructType: Union[StructType, str],
outputMode: str,
timeMode: str,
initialState: "GroupedData" = None
) -> DataFrame:
"""
Invokes methods defined in the stateful processor used in arbitrary state API v2. It
Expand Down Expand Up @@ -500,7 +501,15 @@ def transformWithStateUDF(
StatefulProcessorHandleState.INITIALIZED
)

# only process initial state if first batch
is_first_batch = statefulProcessorApiClient.is_first_batch()
statefulProcessorApiClient.set_implicit_key(key)
if is_first_batch:
initial_state = statefulProcessorApiClient.get_initial_state(key)
# if user did not provide initial state df, initial_state will be None
if initial_state is not None:
statefulProcessor.handleInitialState(key, initial_state)

result = statefulProcessor.handleInputRows(key, inputRows)

return result
Expand All @@ -516,11 +525,17 @@ def transformWithStateUDF(
df = self._df
udf_column = udf(*[df[col] for col in df.columns])

if initialState is None:
initial_state_java_obj = None
else:
initial_state_java_obj = initialState._jgd

jdf = self._jgd.transformWithStateInPandas(
udf_column._jc,
self.session._jsparkSession.parseDataType(outputStructType.json()),
outputMode,
timeMode,
initial_state_java_obj
)
return DataFrame(jdf, self.session)

Expand Down
102 changes: 45 additions & 57 deletions python/pyspark/sql/streaming/StateMessage_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 099d827

Please sign in to comment.