-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-21375][PYSPARK][SQL] Add Date and Timestamp support to ArrowConverters for toPandas() Conversion #18664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21375][PYSPARK][SQL] Add Date and Timestamp support to ArrowConverters for toPandas() Conversion #18664
Conversation
val d4 = new Date(sdf.parse("2016-05-09 12:01:01.000 UTC").getTime) | ||
|
||
// Date is created unaware of timezone, but DateTimeUtils force defaultTimeZone() | ||
assert(DateTimeUtils.toJavaDate(DateTimeUtils.fromJavaDate(d2)).getTime == d2.getTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk @cloud-fan I'm trying out the DateType conversion and ran into this problem. The Dataset encoder uses DateTimeUtils.toJavaDate
and fromJavaDate
similar to above (which fails), and this forces a defaultTimeZone()
when working with the data. So a value new Date(0)
should be the epoch, but in my timezone it forces it to be the day before and the test here will not pass.
What are your thoughts on this, should the conversion to Arrow assume the defaultTimeZone()? is this something that should be fixed first in Spark? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ueshin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We handle DateType
value as number of days from 1970-01-01
internally.
When converting from/to Date
to/from internal value, we assume the Date
instance contains the timestamp of 00:00:00
time of the day in TimeZone.getDefault()
timezone, which is the offset of the timezone. e.g. in JST (GMT+09:00):
scala> TimeZone.setDefault(TimeZone.getTimeZone("JST"))
scala> Date.valueOf("1970-01-01").getTime()
res6: Long = -32400000
whereas in PST (GMT-08:00):
scala> TimeZone.setDefault(TimeZone.getTimeZone("PST"))
scala> Date.valueOf("1970-01-01").getTime()
res8: Long = 28800000
We use DateTimeUtils.defaultTimeZone()
to adjust the offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ueshin! I think I got it
Test build #79693 has finished for PR 18664 at commit
|
@ueshin @holdenk I think I'm seeing an issue with transferring timestamp data to Pandas with Arrow, so I'll try to explain. Spark will assume the timestamp is in local time, so when converting to internal data, it will adjust with an offset to UTC from local timezone. Currently, the internal data is converted to Arrow without a timezone, which Arrow takes as timezone naive. When a Pandas DataFrame is created from that data, it does not adjust to local time, so a different timestamp is shown. For my case below, using PST as local time, it will add 8 hours.
It wasn't a problem before Arrow because the data gets converted before going into Pandas. I believe there are a few different ways to handle this
I'm not sure what the best solution is because there could be issues with them all, any thoughts? |
Test build #79781 has finished for PR 18664 at commit
|
In my opinion, we should definitely specify the timezone to keep the correct timestamp.
Hopefully we might specify the timezone when |
Excited to see this being worked on.
I like this the best. This presents timestamp in local time which is compatible with the existing |
Thanks @ueshin and @icexelloss. I like In order to pass the python tests when comparing
I'm not sure if this would have an unintended effect, so I would vote to leave it as-is when Arrow is not enabled. |
Test build #79922 has finished for PR 18664 at commit
|
Test build #79939 has finished for PR 18664 at commit
|
jenkins retest this please |
Test build #79945 has finished for PR 18664 at commit
|
I'm not sure why this is failing, ping @holdenk @wesm . This is after introducing a timezone in the Arrow data. The error is
And I think it's orginating from here but I can't reproduce this locally, even when I try to reproduce the env on Jenkins (although Jenkins seems to be using python 3.4, numpy 1.11.3, pandas 0.19.2, and pyarrow 0.4.1 - but I can only get these versions installed with python 3.5) Any ideas? |
cc @jreback, he may know more quickly. Possible it's a bug in 0.19.x, but it should be fixable |
I cannot repro this; can you show what |
Test build #79972 has finished for PR 18664 at commit
|
Test build #79975 has finished for PR 18664 at commit
|
So this is currently failing due to this issue:
Which is strange because there is a 'pandas.types' module in Pandas 0.19.2. @holdenk or @shaneknapp is it possible to verify the version installed on these? It failed on both worker7 and worker8, thanks! |
python/pyspark/serializers.py
Outdated
if t is None or s.dtype == t.to_pandas_dtype(): | ||
if type(t) == pa.TimestampType: | ||
# NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680 | ||
return _series_convert_timestamps_internal(s).values.astype('datetime64[us]') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we need s.fillna(0)
for null values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is that? We did that for integers that were promoted to floats to get rid of NaN, but here we are converting datetime64[ns] to datetime64[us] and both support missing values
In [28]: s = pd.Series([pd.datetime.now(), None])
In [29]: s
Out[29]:
0 2017-10-24 10:44:51.483694
1 NaT
dtype: datetime64[ns]
In [33]: s.values.astype('datetime64[us]')
Out[33]: array(['2017-10-24T10:44:51.483694', 'NaT'], dtype='datetime64[us]')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not exactly sure the reason but it seems s.dt.tz_localize('tzlocal()')
in _series_convert_timestamps_internal
doesn't work properly when including null values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, that's strange s.dt.tz_localize('tzlocal()
gets an OverflowError: Python int too large to convert to C long
error when printing but s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
works but comes up with a bogus time where the NaT was. I agree that fillna(0)
is safer to avoid overflow.
In [44]: s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
Out[44]:
0 2017-10-24 17:44:51.483694+00:00
1 1677-09-21 08:12:43.145224192+00:00
dtype: datetime64[ns, UTC]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed the date/time-related casting bugs in pyarrow and added new cast implementations -- conversions from one timestamp unit to another in Arrow-land fail silently right now, this will all be in the 0.8.0 release landing hopefully the week of 11/6 or thereabouts apache/arrow#1245
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great, thanks @wesm!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler It seems s.apply(lambda ts: ts.tz_localize('tzlocal()'))
works without s.fillna(0)
. Do you know the difference between this and s.dt.tz_localize('tzlocal()')
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apply()
will invoke the given function on each individual value of the series. I think this iterates over the series, where s.dt.tz_localize()
would do a vectorized operation and should be faster.
python/pyspark/sql/types.py
Outdated
def _series_convert_timestamps_internal(s): | ||
""" Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage | ||
""" | ||
return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer the previous implementation which checks type in case the series is already tz-aware.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right. I figured we are already checking that it is a timestamp type, but it's true the user could have created tz-aware timestamps so we need to check.
…rsion, set timestamp cast flag for copy to false
@ueshin I have the requested changes done, just wondering if you could clarify #18664 (comment) before I push them, thanks! |
@gatorsmile I started a wip at #19575 for adding documentation, nothing there yet but I'll work on it |
Test build #83050 has finished for PR 18664 at commit
|
python/pyspark/sql/types.py
Outdated
if is_datetime64_dtype(s.dtype): | ||
return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') | ||
else: | ||
return s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need s.dt.tz_convert('UTC')
for the case the timezone is strange like tzlocal()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to ensure that the values will be from unix epoch, which is what Spark expects to store internally. Just like TimestampType.toInternal
here - only this conversion is vectorized with Pandas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant if is_datetime64tz_dtype(s.dtype) but had the strange timezone like tzlocal()
, I thought we need s.dt.tz_convert('UTC')
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, got it. Yeah I think you're right.. I'll check on that a little later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what I found, for the actual internal data it doesn't matter. Changing the timezone on a series is just a metadata operation so the same data will be transferred back to Spark via Arrow regardless
In [101]: ts = pd.Timestamp(1, unit='D', tz='America/New_York')
In [102]: ts.value
Out[102]: 86400000000000
In [103]: ts.tz_convert('UTC').value
Out[103]: 86400000000000
However, to be consistent we should make sure the tz is UTC so I'll add this along with a test to make sure.
python/pyspark/sql/types.py
Outdated
else: | ||
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) | ||
return arrow_type | ||
|
||
|
||
def _check_dataframe_localize_timestamps(df): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: df
-> pdf
python/pyspark/sql/dataframe.py
Outdated
import pyarrow | ||
tables = self._collectAsArrow() | ||
if tables: | ||
table = pyarrow.concat_tables(tables) | ||
return table.to_pandas() | ||
df = table.to_pandas() | ||
return _check_dataframe_localize_timestamps(df) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto (df
->pdf
).
python/pyspark/sql/types.py
Outdated
else: | ||
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) | ||
return arrow_type | ||
|
||
|
||
def _check_dataframe_localize_timestamps(df): | ||
""" Convert timezone aware timestamps to timezone-naive in local time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment that says the expected input is pd.DataFrame
.
…nts on conversion function input and output
Test build #83087 has finished for PR 18664 at commit
|
@ueshin do you think this is ready to merge? cc @gatorsmile @cloud-fan for another look |
LGTM |
@BryanCutler Thank you for all your hard work. We really appreciate it! Merged to master. |
No problem, thanks @gatorsmile ! |
…andas to respect session timezone ## What changes were proposed in this pull request? When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone. For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`. The timestamp value from current `toPandas()` will be the following: ``` >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts") >>> df.show() +-------------------+ | ts| +-------------------+ |1970-01-01 00:00:01| +-------------------+ >>> df.toPandas() ts 0 1970-01-01 17:00:01 ``` As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone. As we discussed in apache#18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`. ## How was this patch tested? Added tests and existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes apache#19607 from ueshin/issues/SPARK-22395.
…rting Spark DataFrame to Pandas DataFrame. ## What changes were proposed in this pull request? In apache#18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail. See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example. This pr modifies to use `datetime.date` for date type as Spark 2.2 does. ## How was this patch tested? Tests modified to fit the new behavior and existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes apache#20506 from ueshin/issues/SPARK-23290.
…rting Spark DataFrame to Pandas DataFrame. ## What changes were proposed in this pull request? In apache#18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail. See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example. This pr modifies to use `datetime.date` for date type as Spark 2.2 does. ## How was this patch tested? Tests modified to fit the new behavior and existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes apache#20506 from ueshin/issues/SPARK-23290.
…ype when converting Spark DataFrame to Pandas DataFrame. ## What changes were proposed in this pull request? This is a backport of #20506. In #18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail. See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example. This pr modifies to use `datetime.date` for date type as Spark 2.2 does. ## How was this patch tested? Tests modified to fit the new behavior and existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #20515 from ueshin/issues/SPARK-23290_2.3.
…rting Spark DataFrame to Pandas DataFrame. ## What changes were proposed in this pull request? In apache#18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail. See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example. This pr modifies to use `datetime.date` for date type as Spark 2.2 does. ## How was this patch tested? Tests modified to fit the new behavior and existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes apache#20506 from ueshin/issues/SPARK-23290.
What changes were proposed in this pull request?
Adding date and timestamp support with Arrow for
toPandas()
andpandas_udf
s. Timestamps are stored in Arrow as UTC and manifested to the user as timezone-naive localized to the Python system timezone.How was this patch tested?
Added Scala tests for date and timestamp types under ArrowConverters, ArrowUtils, and ArrowWriter suites. Added Python tests for
toPandas()
andpandas_udf
s with date and timestamp types.