Skip to content

[SPARK-21375][PYSPARK][SQL] Add Date and Timestamp support to ArrowConverters for toPandas() Conversion #18664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

BryanCutler
Copy link
Member

@BryanCutler BryanCutler commented Jul 18, 2017

What changes were proposed in this pull request?

Adding date and timestamp support with Arrow for toPandas() and pandas_udfs. Timestamps are stored in Arrow as UTC and manifested to the user as timezone-naive localized to the Python system timezone.

How was this patch tested?

Added Scala tests for date and timestamp types under ArrowConverters, ArrowUtils, and ArrowWriter suites. Added Python tests for toPandas() and pandas_udfs with date and timestamp types.

val d4 = new Date(sdf.parse("2016-05-09 12:01:01.000 UTC").getTime)

// Date is created unaware of timezone, but DateTimeUtils force defaultTimeZone()
assert(DateTimeUtils.toJavaDate(DateTimeUtils.fromJavaDate(d2)).getTime == d2.getTime)
Copy link
Member Author

@BryanCutler BryanCutler Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @cloud-fan I'm trying out the DateType conversion and ran into this problem. The Dataset encoder uses DateTimeUtils.toJavaDate and fromJavaDate similar to above (which fails), and this forces a defaultTimeZone() when working with the data. So a value new Date(0) should be the epoch, but in my timezone it forces it to be the day before and the test here will not pass.

What are your thoughts on this, should the conversion to Arrow assume the defaultTimeZone()? is this something that should be fixed first in Spark? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ueshin

Copy link
Member

@ueshin ueshin Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We handle DateType value as number of days from 1970-01-01 internally.

When converting from/to Date to/from internal value, we assume the Date instance contains the timestamp of 00:00:00 time of the day in TimeZone.getDefault() timezone, which is the offset of the timezone. e.g. in JST (GMT+09:00):

scala> TimeZone.setDefault(TimeZone.getTimeZone("JST"))

scala> Date.valueOf("1970-01-01").getTime()
res6: Long = -32400000

whereas in PST (GMT-08:00):

scala> TimeZone.setDefault(TimeZone.getTimeZone("PST"))

scala> Date.valueOf("1970-01-01").getTime()
res8: Long = 28800000

We use DateTimeUtils.defaultTimeZone() to adjust the offset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ueshin! I think I got it

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79693 has finished for PR 18664 at commit 69e1e21.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

BryanCutler commented Jul 19, 2017

@ueshin @holdenk I think I'm seeing an issue with transferring timestamp data to Pandas with Arrow, so I'll try to explain. Spark will assume the timestamp is in local time, so when converting to internal data, it will adjust with an offset to UTC from local timezone. Currently, the internal data is converted to Arrow without a timezone, which Arrow takes as timezone naive. When a Pandas DataFrame is created from that data, it does not adjust to local time, so a different timestamp is shown. For my case below, using PST as local time, it will add 8 hours.


In [2]: dt = datetime.datetime(1970, 1, 1, 0, 0, 1)

In [5]: TimestampType().toInternal(dt)
Out[5]: 28801000000

In [8]: df = spark.createDataFrame([(dt,)], schema=StructType([StructField("ts", TimestampType(), True)]))

In [7]: df.show()
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

In [9]: spark.conf.set("spark.sql.execution.arrow.enable", "true")

In [10]: df.toPandas()
Out[10]: 
                   ts
0 1970-01-01 08:00:01

In [11]: spark.conf.set("spark.sql.execution.arrow.enable", "false")

In [12]: df.toPandas()
Out[12]: 
          ts
0 1970-01-01 00:00:01

It wasn't a problem before Arrow because the data gets converted before going into Pandas. I believe there are a few different ways to handle this

  1. Adjust the Spark internal data to represent local time, not UTC time, and create an Arrow field without specifying the timezone.

  2. Give the Arrow field the timezone from DateTimeUtils.defaultTimeZone() and adjust the internal data to represent local time, not UTC time.

  3. Give the Arrow field a "UTC" timezone, then no adjustments need to be done to the internal data but I think Pandas will still display as UTC and it would be up to the user to change timezone.

I'm not sure what the best solution is because there could be issues with them all, any thoughts?

@SparkQA
Copy link

SparkQA commented Jul 20, 2017

Test build #79781 has finished for PR 18664 at commit b709d78.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Jul 24, 2017

In my opinion, we should definitely specify the timezone to keep the correct timestamp.
I'm not sure which is the suitable one yet, but the candidates would be:

  1. "UTC"
    Spark SQL has timestamp value as the number of micros since 1970-01-01 00:00:00.0 UTC internally.
  2. SQLConf.SESSION_LOCAL_TIMEZONE
    Spark SQL represents and calculates in timezone related operations using this timezone. If there isn't the config value, the value will fallback to DateTimeUtils.defaultTimeZone().
  3. DateTimeUtils.defaultTimeZone()
    The system timezone.

Hopefully we might specify the timezone when spark.conf.set("spark.sql.execution.arrow.enable", "false"), too, but it would affect backward-compatibility?

@icexelloss
Copy link
Contributor

icexelloss commented Jul 24, 2017

Excited to see this being worked on.

SQLConf.SESSION_LOCAL_TIMEZONE

I like this the best. This presents timestamp in local time which is compatible with the existing toPandas(). If we really want to have the result exactly the same as the non-arrow version of toPandas, we can do sth like df[col] = df[col].dt.tz_localize(None) after getting the pandas.DataFrame from arrow.

@BryanCutler
Copy link
Member Author

BryanCutler commented Jul 24, 2017

Thanks @ueshin and @icexelloss. I like SQLConf.SESSION_LOCAL_TIMEZONE also, but it ended up being pretty messy trying to fit the SQLConf into these static functions - if there is an easier way let me know. So I just used DateTimeUtils.defaultTimeZone() for now, maybe it could be changed at a later point if needed.

In order to pass the python tests when comparing pandas.DataFrames with/without Arrow, I needed to do something like @icexelloss suggested to remove the timezone on the conversion with Arrow, which seems ok to me.

we might specify the timezone when spark.conf.set("spark.sql.execution.arrow.enable", "false"), too, but it would affect backward-compatibility?

I'm not sure if this would have an unintended effect, so I would vote to leave it as-is when Arrow is not enabled.

@SparkQA
Copy link

SparkQA commented Jul 25, 2017

Test build #79922 has finished for PR 18664 at commit 719e77c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2017

Test build #79939 has finished for PR 18664 at commit 3585520.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79945 has finished for PR 18664 at commit 3585520.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

I'm not sure why this is failing, ping @holdenk @wesm . This is after introducing a timezone in the Arrow data. The error is

Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests.py", line 3058, in test_pandas_round_trip
    pdf_arrow = df.toPandas()
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/dataframe.py", line 1734, in toPandas
    return table.to_pandas()
  File "pyarrow/table.pxi", line 766, in pyarrow.lib.Table.to_pandas (/arrow/python/build/temp.linux-x86_64-3.4/lib.cxx:30571)
  File "/home/anaconda/envs/py3k/lib/python3.4/site-packages/pyarrow/pandas_compat.py", line 157, in table_to_blockmanager
    dtype = DatetimeTZDtype('ns', tz=item['timezone'])
TypeError: object() takes no parameters

And I think it's orginating from here but I can't reproduce this locally, even when I try to reproduce the env on Jenkins (although Jenkins seems to be using python 3.4, numpy 1.11.3, pandas 0.19.2, and pyarrow 0.4.1 - but I can only get these versions installed with python 3.5)

Any ideas?

@wesm
Copy link
Member

wesm commented Jul 26, 2017

cc @jreback, he may know more quickly. Possible it's a bug in 0.19.x, but it should be fixable

@jreback
Copy link

jreback commented Jul 26, 2017

I cannot repro this; can you show what item['timezone'] is?

@BryanCutler
Copy link
Member Author

Thanks @jreback and @wesm for looking. I'm going to try one more fix and if that doesn't work then I can add some printouts to see what is going on.

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79972 has finished for PR 18664 at commit f977d0b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79975 has finished for PR 18664 at commit a0f912f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

So this is currently failing due to this issue:

ERROR: test_bbb_fail (pyspark.sql.tests.ArrowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests.py", line 3055, in test_bbb_fail
    from pandas.types.dtypes import DatetimeTZDtype
ImportError: No module named 'pandas.types'

Which is strange because there is a 'pandas.types' module in Pandas 0.19.2. @holdenk or @shaneknapp is it possible to verify the version installed on these? It failed on both worker7 and worker8, thanks!

if t is None or s.dtype == t.to_pandas_dtype():
if type(t) == pa.TimestampType:
# NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680
return _series_convert_timestamps_internal(s).values.astype('datetime64[us]')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we need s.fillna(0) for null values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that? We did that for integers that were promoted to floats to get rid of NaN, but here we are converting datetime64[ns] to datetime64[us] and both support missing values

In [28]: s = pd.Series([pd.datetime.now(), None])

In [29]: s
Out[29]: 
0   2017-10-24 10:44:51.483694
1                          NaT
dtype: datetime64[ns]

In [33]: s.values.astype('datetime64[us]')
Out[33]: array(['2017-10-24T10:44:51.483694', 'NaT'], dtype='datetime64[us]')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not exactly sure the reason but it seems s.dt.tz_localize('tzlocal()') in _series_convert_timestamps_internal doesn't work properly when including null values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, that's strange s.dt.tz_localize('tzlocal() gets an OverflowError: Python int too large to convert to C long error when printing but s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') works but comes up with a bogus time where the NaT was. I agree that fillna(0) is safer to avoid overflow.

In [44]: s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
Out[44]: 
0      2017-10-24 17:44:51.483694+00:00
1   1677-09-21 08:12:43.145224192+00:00
dtype: datetime64[ns, UTC]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed the date/time-related casting bugs in pyarrow and added new cast implementations -- conversions from one timestamp unit to another in Arrow-land fail silently right now, this will all be in the 0.8.0 release landing hopefully the week of 11/6 or thereabouts apache/arrow#1245

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, thanks @wesm!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler It seems s.apply(lambda ts: ts.tz_localize('tzlocal()')) works without s.fillna(0). Do you know the difference between this and s.dt.tz_localize('tzlocal()')?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply() will invoke the given function on each individual value of the series. I think this iterates over the series, where s.dt.tz_localize() would do a vectorized operation and should be faster.

def _series_convert_timestamps_internal(s):
""" Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage
"""
return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
Copy link
Member

@ueshin ueshin Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer the previous implementation which checks type in case the series is already tz-aware.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. I figured we are already checking that it is a timestamp type, but it's true the user could have created tz-aware timestamps so we need to check.

…rsion, set timestamp cast flag for copy to false
@BryanCutler
Copy link
Member Author

@ueshin I have the requested changes done, just wondering if you could clarify #18664 (comment) before I push them, thanks!

@BryanCutler
Copy link
Member Author

Before we merging this PR, could anybody submit a PR for documenting this issue? Then, we can get more feedbacks from the others.

@gatorsmile I started a wip at #19575 for adding documentation, nothing there yet but I'll work on it

@SparkQA
Copy link

SparkQA commented Oct 25, 2017

Test build #83050 has finished for PR 18664 at commit 4d40893.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if is_datetime64_dtype(s.dtype):
return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
else:
return s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need s.dt.tz_convert('UTC') for the case the timezone is strange like tzlocal()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure that the values will be from unix epoch, which is what Spark expects to store internally. Just like TimestampType.toInternal here - only this conversion is vectorized with Pandas

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant if is_datetime64tz_dtype(s.dtype) but had the strange timezone like tzlocal(), I thought we need s.dt.tz_convert('UTC').

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got it. Yeah I think you're right.. I'll check on that a little later

Copy link
Member Author

@BryanCutler BryanCutler Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what I found, for the actual internal data it doesn't matter. Changing the timezone on a series is just a metadata operation so the same data will be transferred back to Spark via Arrow regardless

In [101]: ts = pd.Timestamp(1, unit='D', tz='America/New_York')

In [102]: ts.value
Out[102]: 86400000000000

In [103]: ts.tz_convert('UTC').value
Out[103]: 86400000000000

However, to be consistent we should make sure the tz is UTC so I'll add this along with a test to make sure.

else:
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
return arrow_type


def _check_dataframe_localize_timestamps(df):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: df -> pdf

import pyarrow
tables = self._collectAsArrow()
if tables:
table = pyarrow.concat_tables(tables)
return table.to_pandas()
df = table.to_pandas()
return _check_dataframe_localize_timestamps(df)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto (df ->pdf).

else:
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
return arrow_type


def _check_dataframe_localize_timestamps(df):
""" Convert timezone aware timestamps to timezone-naive in local time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment that says the expected input is pd.DataFrame.

@SparkQA
Copy link

SparkQA commented Oct 26, 2017

Test build #83087 has finished for PR 18664 at commit addd35f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

@ueshin do you think this is ready to merge? cc @gatorsmile @cloud-fan for another look

@gatorsmile
Copy link
Member

LGTM

@gatorsmile
Copy link
Member

@BryanCutler Thank you for all your hard work. We really appreciate it!

Merged to master.

@asfgit asfgit closed this in 17af727 Oct 27, 2017
@BryanCutler
Copy link
Member Author

No problem, thanks @gatorsmile !

ghost pushed a commit to dbtsai/spark that referenced this pull request Nov 28, 2017
…andas to respect session timezone

## What changes were proposed in this pull request?

When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.

For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.

The timestamp value from current `toPandas()` will be the following:

```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

>>> df.toPandas()
                   ts
0 1970-01-01 17:00:01
```

As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in apache#18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.

## How was this patch tested?

Added tests and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes apache#19607 from ueshin/issues/SPARK-22395.
ghost pushed a commit to dbtsai/spark that referenced this pull request Feb 6, 2018
…rting Spark DataFrame to Pandas DataFrame.

## What changes were proposed in this pull request?

In apache#18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail.
See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example.

This pr modifies to use `datetime.date` for date type as Spark 2.2 does.

## How was this patch tested?

Tests modified to fit the new behavior and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes apache#20506 from ueshin/issues/SPARK-23290.
ueshin added a commit to ueshin/apache-spark that referenced this pull request Feb 6, 2018
…rting Spark DataFrame to Pandas DataFrame.

## What changes were proposed in this pull request?

In apache#18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail.
See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example.

This pr modifies to use `datetime.date` for date type as Spark 2.2 does.

## How was this patch tested?

Tests modified to fit the new behavior and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes apache#20506 from ueshin/issues/SPARK-23290.
asfgit pushed a commit that referenced this pull request Feb 6, 2018
…ype when converting Spark DataFrame to Pandas DataFrame.

## What changes were proposed in this pull request?

This is a backport of #20506.

In #18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail.
See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example.

This pr modifies to use `datetime.date` for date type as Spark 2.2 does.

## How was this patch tested?

Tests modified to fit the new behavior and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #20515 from ueshin/issues/SPARK-23290_2.3.
dansanduleac pushed a commit to palantir/spark that referenced this pull request Feb 6, 2018
…rting Spark DataFrame to Pandas DataFrame.

## What changes were proposed in this pull request?

In apache#18664, there was a change in how `DateType` is being returned to users ([line 1968 in dataframe.py](https://github.com/apache/spark/pull/18664/files#diff-6fc344560230bf0ef711bb9b5573f1faR1968)). This can cause client code which works in Spark 2.2 to fail.
See [SPARK-23290](https://issues.apache.org/jira/browse/SPARK-23290?focusedCommentId=16350917&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16350917) for an example.

This pr modifies to use `datetime.date` for date type as Spark 2.2 does.

## How was this patch tested?

Tests modified to fit the new behavior and existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes apache#20506 from ueshin/issues/SPARK-23290.
@BryanCutler BryanCutler deleted the arrow-date-timestamp-SPARK-21375 branch November 19, 2018 05:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.