Skip to content

Commit

Permalink
[SPARK-44854][PYTHON] Python timedelta to DayTimeIntervalType edge ca…
Browse files Browse the repository at this point in the history
…se bug

### What changes were proposed in this pull request?

This PR proposes to change the way that python `datetime.timedelta` objects are converted to `pyspark.sql.types.DayTimeIntervalType` objects. Specifically, it modifies the logic inside `toInternal` which returns the timedelta as a python integer (would be int64 in other languages) storing the timedelta as microseconds. The current logic inadvertently adds an extra second when doing the conversion for certain python timedelta objects, thereby returning an incorrect value.

An illustrative example is as follows:

```
from datetime import timedelta
from pyspark.sql.types import DayTimeIntervalType, StructField, StructType

spark = ...spark session setup here...

td = timedelta(days=4498031, seconds=16054, microseconds=999981)
df = spark.createDataFrame([(td,)], StructType([StructField(name="timedelta_col", dataType=DayTimeIntervalType(), nullable=False)]))
df.show(truncate=False)

> +------------------------------------------------+
> |timedelta_col                                   |
> +------------------------------------------------+
> |INTERVAL '4498031 04:27:35.999981' DAY TO SECOND|
> +------------------------------------------------+

print(str(td))

>  '4498031 days, 4:27:34.999981'
```

In the above example, look at the seconds. The original python timedelta object has 34 seconds, the pyspark DayTimeIntervalType column has 35 seconds.

### Why are the changes needed?

To fix a bug. It is a bug because the wrong value is returned after conversion. Adding the above timedelta entry to existing unit tests causes the test to fail.

### Does this PR introduce _any_ user-facing change?

Yes. Users should now see the correct timedelta values in pyspark dataframes for similar such edge cases.

### How was this patch tested?

Illustrative edge case examples were added to the unit test (`python/pyspark/sql/tests/test_types.py` the `test_daytime_interval_type` test), verified that the existing code failed the test, new code was added, and verified that the unit test now passes.

### JIRA ticket link
This PR should close https://issues.apache.org/jira/browse/SPARK-44854

Closes apache#42541 from hdaly0/SPARK-44854.

Authored-by: Ocean <haghighidaly@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9fdf65a)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
hdaly0 authored and dongjoon-hyun committed Aug 23, 2023
1 parent 0d9049d commit ef18456
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 1 deletion.
2 changes: 2 additions & 0 deletions python/pyspark/sql/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,8 @@ def test_daytime_interval_type(self):
),
(datetime.timedelta(microseconds=-123),),
(datetime.timedelta(days=-1),),
(datetime.timedelta(microseconds=388629894454999981),),
(datetime.timedelta(days=-1, seconds=86399, microseconds=999999),), # -1 microsecond
]
df = self.spark.createDataFrame(timedetlas, schema="td interval day to second")
self.assertEqual(set(r.td for r in df.collect()), set(set(r[0] for r in timedetlas)))
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def needConversion(self) -> bool:

def toInternal(self, dt: datetime.timedelta) -> Optional[int]:
if dt is not None:
return (math.floor(dt.total_seconds()) * 1000000) + dt.microseconds
return (((dt.days * 86400) + dt.seconds) * 1_000_000) + dt.microseconds

def fromInternal(self, micros: int) -> Optional[datetime.timedelta]:
if micros is not None:
Expand Down

0 comments on commit ef18456

Please sign in to comment.