-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53614][PYTHON] Add Iterator[pandas.DataFrame] support to applyInPandas
#52716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-53614][PYTHON] Add Iterator[pandas.DataFrame] support to applyInPandas
#52716
Conversation
applyInPandasIterator[pandas.DataFrame] support to applyInPandas
Iterator[pandas.DataFrame] support to applyInPandasIterator[pandas.DataFrame] support to applyInPandas
Iterator[pandas.DataFrame] support to applyInPandasIterator[pandas.DataFrame] support to applyInPandas
Iterator[pandas.DataFrame] support to applyInPandasIterator[pandas.DataFrame] support to applyInPandas
zhengruifeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only a few minor comments
python/pyspark/sql/connect/group.py
Outdated
| @@ -294,14 +294,26 @@ def applyInPandas( | |||
| ) -> "DataFrame": | |||
| from pyspark.sql.connect.udf import UserDefinedFunction | |||
| from pyspark.sql.connect.dataframe import DataFrame | |||
| from pyspark.sql.pandas.typehints import infer_group_pandas_eval_type_from_func | |||
| import warnings | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| import warnings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| self.assertEqual(expected, result) | ||
|
|
||
| def test_apply_in_pandas_iterator_with_keys_batch_slicing(self): | ||
| from typing import Iterator, Tuple, Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
such imports should be move to the head of the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
|
|
||
| def test_apply_in_pandas_iterator_process_multiple_input_batches(self): | ||
| from typing import Iterator | ||
| import builtins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need import builtins?
I think there is no name conflict if we use sf.max/min/sum in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
somehow when I use sum directly it would use column.sum. Do you know the reason? I changed to use builtins to avoid this conflict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved typing import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have column.sum, do you mean sf.sum?
in some test files, sum is imported, so the builtin sum is overridden
| ) | ||
|
|
||
| # Verify that all rows are present after concatenation | ||
| self.assertEqual(len(result), 6) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's directly compare the rows
self.assertEqual(result, [Row(...), Row(...), ...])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| if is_iterator_dataframe or is_iterator_dataframe_with_keys: | ||
| return PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF | ||
|
|
||
| # Default to non-iterator (standard grouped map) | ||
| return PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if is_iterator_dataframe or is_iterator_dataframe_with_keys: | |
| return PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF | |
| # Default to non-iterator (standard grouped map) | |
| return PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF | |
| if is_iterator_dataframe or is_iterator_dataframe_with_keys: | |
| return PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF | |
| # Default to non-iterator (standard grouped map) | |
| return PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF |
this part should match
spark/python/pyspark/sql/pandas/typehints.py
Lines 368 to 379 in 9e12201
| # pa.Table -> pa.Table | |
| is_table = ( | |
| len(parameters_sig) == 1 and parameters_sig[0] == pa.Table and return_annotation == pa.Table | |
| ) | |
| # Tuple[pa.Scalar, ...], pa.Table -> pa.Table | |
| is_table_with_keys = ( | |
| len(parameters_sig) == 2 and parameters_sig[1] == pa.Table and return_annotation == pa.Table | |
| ) | |
| if is_table or is_table_with_keys: | |
| return PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF | |
| return None |
we can align it in a followup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's discuss and do it with a follow up if needed.
e00112a to
fe23a96
Compare
9df4590 to
45bc03b
Compare
|
thanks, merged to master |
…plyInPandas`
### What changes were proposed in this pull request?
This PR adds support for the `Iterator[pandas.DataFrame] API` in `groupBy().applyInPandas()`, enabling batch-by-batch processing of grouped data for improved memory efficiency and scalability.
#### Key Changes:
1. **New PythonEvalType**: Added `SQL_GROUPED_MAP_PANDAS_ITER_UDF` to distinguish iterator-based UDFs from standard grouped map UDFs
2. **Type Inference**: Implemented automatic detection of iterator signatures:
- `Iterator[pd.DataFrame] -> Iterator[pd.DataFrame]`
- `Tuple[Any, ...], Iterator[pd.DataFrame] -> Iterator[pd.DataFrame]`
3. **Streaming Serialization**: Created `GroupPandasIterUDFSerializer` that streams results without materializing all DataFrames in memory
4. **Configuration Change**: Updated `FlatMapGroupsInPandasExec` which was hardcoding `pythonEvalType = 201` instead of extracting it from the UDF expression (mirrored fix from `FlatMapGroupsInArrowExec`)
### Why are the changes needed?
The existing `applyInPandas()` API loads entire groups into memory as single DataFrames. For large groups, this can cause OOM errors. The iterator API allows:
- **Memory Efficiency**: Process data batch-by-batch instead of materializing entire groups
- **Scalability**: Handle arbitrarily large groups that don't fit in memory
- **Consistency**: Mirrors the existing `applyInArrow()` iterator API design
### Does this PR introduce any user-facing changes?
Yes, this PR adds a new API variant for `applyInPandas()`:
#### Before (existing API, still supported):
```python
def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
return pdf.assign(v=(pdf.v - pdf.v.mean()) / pdf.v.std())
df.groupBy("id").applyInPandas(normalize, schema="id long, v double")
```
#### After (new iterator API):
```python
from typing import Iterator
def normalize(batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
# Process data batch-by-batch
for batch in batches:
yield batch.assign(v=(batch.v - batch.v.mean()) / batch.v.std())
df.groupBy("id").applyInPandas(normalize, schema="id long, v double")
```
#### With Grouping Keys:
```python
from typing import Iterator, Tuple, Any
def sum_by_key(key: Tuple[Any, ...], batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
total = 0
for batch in batches:
total += batch['v'].sum()
yield pd.DataFrame({"id": [key[0]], "total": [total]})
df.groupBy("id").applyInPandas(sum_by_key, schema="id long, total double")
```
**Backward Compatibility**: The existing DataFrame-to-DataFrame API is fully preserved and continues to work without changes.
### How was this patch tested?
- Added `test_apply_in_pandas_iterator_basic` - Basic functionality test
- Added `test_apply_in_pandas_iterator_with_keys` - Test with grouping keys
- Added `test_apply_in_pandas_iterator_batch_slicing` - Pressure test with 10M rows, 20 columns
- Added `test_apply_in_pandas_iterator_with_keys_batch_slicing` - Pressure test with keys
### Was this patch authored or co-authored using generative AI tooling?
Yes, tests generated by Cursor.
Closes apache#52716 from Yicong-Huang/SPARK-53614/feat/add-apply-in-pandas.
Lead-authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com>
Co-authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…lizer` with `GroupPandasUDFSerializer` ### What changes were proposed in this pull request? This PR consolidates `GroupPandasUDFSerializer` to support both `SQL_GROUPED_MAP_PANDAS_UDF` and `SQL_GROUPED_MAP_PANDAS_ITER_UDF`, aligning with the design pattern used by `GroupArrowUDFSerializer`. ### Why are the changes needed? When `Iterator[pandas.DataFrame]` API was added to `groupBy().applyInPandas()` in SPARK-53614 (#52716), a new `GroupPandasIterUDFSerializer` class was created. However, this class is nearly identical to `GroupPandasUDFSerializer`, differing only in whether batches are processed lazily (iterator mode) or all at once (regular mode). ### Does this PR introduce _any_ user-facing change? No, this is an internal refactoring that maintains backward compatibility. The API behavior remains the same from the user's perspective. ### How was this patch tested? Existing test cases. ### Was this patch authored or co-authored using generative AI tooling? Co-Generated-by: Cursor with Claude 4.5 Sonnet Closes #53043 from Yicong-Huang/SPARK-54316/refactor/consolidate-pandas-iter-serializer. Lead-authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Co-authored-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
What changes were proposed in this pull request?
This PR adds support for the
Iterator[pandas.DataFrame] APIingroupBy().applyInPandas(), enabling batch-by-batch processing of grouped data for improved memory efficiency and scalability.Key Changes:
New PythonEvalType: Added
SQL_GROUPED_MAP_PANDAS_ITER_UDFto distinguish iterator-based UDFs from standard grouped map UDFsType Inference: Implemented automatic detection of iterator signatures:
Iterator[pd.DataFrame] -> Iterator[pd.DataFrame]Tuple[Any, ...], Iterator[pd.DataFrame] -> Iterator[pd.DataFrame]Streaming Serialization: Created
GroupPandasIterUDFSerializerthat streams results without materializing all DataFrames in memoryConfiguration Change: Updated
FlatMapGroupsInPandasExecwhich was hardcodingpythonEvalType = 201instead of extracting it from the UDF expression (mirrored fix fromFlatMapGroupsInArrowExec)Why are the changes needed?
The existing
applyInPandas()API loads entire groups into memory as single DataFrames. For large groups, this can cause OOM errors. The iterator API allows:applyInArrow()iterator API designDoes this PR introduce any user-facing changes?
Yes, this PR adds a new API variant for
applyInPandas():Before (existing API, still supported):
After (new iterator API):
With Grouping Keys:
Backward Compatibility: The existing DataFrame-to-DataFrame API is fully preserved and continues to work without changes.
How was this patch tested?
test_apply_in_pandas_iterator_basic- Basic functionality testtest_apply_in_pandas_iterator_with_keys- Test with grouping keystest_apply_in_pandas_iterator_batch_slicing- Pressure test with 10M rows, 20 columnstest_apply_in_pandas_iterator_with_keys_batch_slicing- Pressure test with keysWas this patch authored or co-authored using generative AI tooling?
Yes, tests generated by Cursor.