Skip to content

[SPARK-30640][PYTHON][SQL] Prevent unnecessary copies of data during Arrow to Pandas conversion #27358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def toPandas(self):
# of PyArrow is found, if 'spark.sql.execution.arrow.pyspark.enabled' is enabled.
if use_arrow:
try:
from pyspark.sql.pandas.types import _check_dataframe_localize_timestamps
from pyspark.sql.pandas.types import _check_series_localize_timestamps
import pyarrow
batches = self._collect_as_arrow()
if len(batches) > 0:
Expand All @@ -109,7 +109,11 @@ def toPandas(self):
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pdf = table.to_pandas(date_as_object=True)
return _check_dataframe_localize_timestamps(pdf, timezone)
for field in self.schema:
if isinstance(field.dataType, TimestampType):
pdf[field.name] = \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it different? Doesn't this also assign the series back to the DataFrame?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for the case of timestamps making a copy is unavailable. This is just to prevent non-timestamp columns that were also causing a copy when assigned back to the DataFrame

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. looks good then. thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viirya !

_check_series_localize_timestamps(pdf[field.name], timezone)
return pdf
else:
return pd.DataFrame.from_records([], columns=self.columns)
except Exception as e:
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/sql/pandas/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,17 @@ def __init__(self, timezone, safecheck, assign_cols_by_name):

def arrow_to_pandas(self, arrow_column):
from pyspark.sql.pandas.types import _check_series_localize_timestamps
import pyarrow

# If the given column is a date type column, creates a series of datetime.date directly
# instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
# datetime64[ns] type handling.
s = arrow_column.to_pandas(date_as_object=True)

s = _check_series_localize_timestamps(s, self._timezone)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this was causing the same issue, but it's easy enough to just check the column type and only convert if necessary.

return s
if pyarrow.types.is_timestamp(arrow_column.type):
return _check_series_localize_timestamps(s, self._timezone)
else:
return s

def _create_batch(self, series):
"""
Expand Down
16 changes: 0 additions & 16 deletions python/pyspark/sql/pandas/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,6 @@ def _check_series_localize_timestamps(s, timezone):
return s


def _check_dataframe_localize_timestamps(pdf, timezone):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to just remove this, it was only used in the one place

"""
Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone

:param pdf: pandas.DataFrame
:param timezone: the timezone to convert. if None then use local timezone
:return pandas.DataFrame where any timezone aware columns have been converted to tz-naive
"""
from pyspark.sql.pandas.utils import require_minimum_pandas_version
require_minimum_pandas_version()

for column, series in pdf.iteritems():
pdf[column] = _check_series_localize_timestamps(series, timezone)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is pyarrow stores the DataFrame data in blocks internally, and assigning series back to the DataFrame would cause the blocks to be reallocated.

return pdf


def _check_series_convert_timestamps_internal(s, timezone):
"""
Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for
Expand Down