Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support conversion from pyarrow RecordBatch to pandas DataFrame #39

Merged
merged 10 commits into from
Nov 8, 2021
109 changes: 93 additions & 16 deletions db_dtypes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import datetime
import re
from typing import Union

import numpy
import packaging.version
Expand All @@ -29,13 +30,16 @@
import pandas.core.dtypes.generic
import pandas.core.nanops
import pyarrow
import pyarrow.compute

from db_dtypes.version import __version__
from db_dtypes import core


date_dtype_name = "dbdate"
time_dtype_name = "dbtime"
_EPOCH = datetime.datetime(1970, 1, 1)
_NPEPOCH = numpy.datetime64(_EPOCH)

pandas_release = packaging.version.parse(pandas.__version__).release

Expand All @@ -52,6 +56,33 @@ class TimeDtype(core.BaseDatetimeDtype):
def construct_array_type(self):
return TimeArray

@staticmethod
def __from_arrow__(
array: Union[pyarrow.Array, pyarrow.ChunkedArray]
) -> "TimeArray":
tswast marked this conversation as resolved.
Show resolved Hide resolved
"""Convert to dbtime data from an Arrow array.

See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
# We can't call combine_chunks on an empty array, so short-circuit the
# rest of the function logic for this special case.
if len(array) == 0:
return TimeArray(numpy.array([], dtype="datetime64[ns]"))

# We can't cast to timestamp("ns"), but time64("ns") has the same
# memory layout: 64-bit integers representing the number of nanoseconds
# since the datetime epoch (midnight 1970-01-01).
array = pyarrow.compute.cast(array, pyarrow.time64("ns"))

# ChunkedArray has no "view" method, so combine into an Array.
if isinstance(array, pyarrow.ChunkedArray):
array = array.combine_chunks()

array = array.view(pyarrow.timestamp("ns"))
np_array = array.to_numpy(zero_copy_only=False)
return TimeArray(np_array)


class TimeArray(core.BaseDatetimeArray):
"""
Expand All @@ -61,8 +92,6 @@ class TimeArray(core.BaseDatetimeArray):
# Data are stored as datetime64 values with a date of Jan 1, 1970

dtype = TimeDtype()
_epoch = datetime.datetime(1970, 1, 1)
_npepoch = numpy.datetime64(_epoch)

@classmethod
def _datetime(
Expand All @@ -75,8 +104,21 @@ def _datetime(
r"(?:\.(?P<fraction>\d*))?)?)?\s*$"
).match,
):
if isinstance(scalar, datetime.time):
return datetime.datetime.combine(cls._epoch, scalar)
# Convert pyarrow values to datetime.time.
if isinstance(scalar, (pyarrow.Time32Scalar, pyarrow.Time64Scalar)):
scalar = (
scalar.cast(pyarrow.time64("ns"))
.cast(pyarrow.int64())
.cast(pyarrow.timestamp("ns"))
.as_py()
)

if scalar is None:
return None
elif isinstance(scalar, datetime.time):
return datetime.datetime.combine(_EPOCH, scalar)
elif isinstance(scalar, pandas.Timestamp):
return scalar.to_datetime64()
elif isinstance(scalar, str):
# iso string
parsed = match_fn(scalar)
Expand Down Expand Up @@ -113,7 +155,7 @@ def _box_func(self, x):
__return_deltas = {"timedelta", "timedelta64", "timedelta64[ns]", "<m8", "<m8[ns]"}

def astype(self, dtype, copy=True):
deltas = self._ndarray - self._npepoch
deltas = self._ndarray - _NPEPOCH
stype = str(dtype)
if stype in self.__return_deltas:
return deltas
Expand All @@ -122,15 +164,25 @@ def astype(self, dtype, copy=True):
else:
return super().astype(dtype, copy=copy)

if pandas_release < (1,):
def __arrow_array__(self, type=None):
tswast marked this conversation as resolved.
Show resolved Hide resolved
"""Convert to an Arrow array from dbtime data.

def to_numpy(self, dtype="object"):
return self.astype(dtype)
See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
array = pyarrow.array(self._ndarray, type=pyarrow.timestamp("ns"))

def __arrow_array__(self, type=None):
return pyarrow.array(
self.to_numpy(dtype="object"),
type=type if type is not None else pyarrow.time64("ns"),
# ChunkedArray has no "view" method, so combine into an Array.
array = (
array.combine_chunks() if isinstance(array, pyarrow.ChunkedArray) else array
)

# We can't cast to time64("ns"), but timestamp("ns") has the same
# memory layout: 64-bit integers representing the number of nanoseconds
# since the datetime epoch (midnight 1970-01-01).
array = array.view(pyarrow.time64("ns"))
return pyarrow.compute.cast(
array, type if type is not None else pyarrow.time64("ns"),
)


Expand All @@ -146,6 +198,19 @@ class DateDtype(core.BaseDatetimeDtype):
def construct_array_type(self):
return DateArray

@staticmethod
def __from_arrow__(
array: Union[pyarrow.Array, pyarrow.ChunkedArray]
) -> "DateArray":
"""Convert to dbdate data from an Arrow array.

See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
array = pyarrow.compute.cast(array, pyarrow.timestamp("ns"))
np_array = array.to_numpy()
return DateArray(np_array)


class DateArray(core.BaseDatetimeArray):
"""
Expand All @@ -161,7 +226,13 @@ def _datetime(
scalar,
match_fn=re.compile(r"\s*(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)\s*$").match,
):
if isinstance(scalar, datetime.date):
# Convert pyarrow values to datetime.date.
if isinstance(scalar, (pyarrow.Date32Scalar, pyarrow.Date64Scalar)):
scalar = scalar.as_py()

if scalar is None:
return None
elif isinstance(scalar, datetime.date):
return datetime.datetime(scalar.year, scalar.month, scalar.day)
elif isinstance(scalar, str):
match = match_fn(scalar)
Expand Down Expand Up @@ -197,16 +268,22 @@ def astype(self, dtype, copy=True):
return super().astype(dtype, copy=copy)

def __arrow_array__(self, type=None):
return pyarrow.array(
self._ndarray, type=type if type is not None else pyarrow.date32(),
"""Convert to an Arrow array from dbdate data.

See:
https://pandas.pydata.org/pandas-docs/stable/development/extending.html#compatibility-with-apache-arrow
"""
array = pyarrow.array(self._ndarray, type=pyarrow.timestamp("ns"))
return pyarrow.compute.cast(
array, type if type is not None else pyarrow.date32(),
)

def __add__(self, other):
if isinstance(other, pandas.DateOffset):
return self.astype("object") + other

if isinstance(other, TimeArray):
return (other._ndarray - other._npepoch) + self._ndarray
return (other._ndarray - _NPEPOCH) + self._ndarray

return super().__add__(other)

Expand Down
8 changes: 3 additions & 5 deletions db_dtypes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import numpy
import pandas
from pandas._libs import NaT
import pandas.api.extensions
import pandas.compat.numpy.function
import pandas.core.algorithms
import pandas.core.arrays
Expand All @@ -32,7 +33,7 @@
pandas_release = pandas_backports.pandas_release


class BaseDatetimeDtype(pandas.core.dtypes.base.ExtensionDtype):
class BaseDatetimeDtype(pandas.api.extensions.ExtensionDtype):
tswast marked this conversation as resolved.
Show resolved Hide resolved
na_value = NaT
kind = "o"
names = None
Expand Down Expand Up @@ -60,10 +61,7 @@ def __init__(self, values, dtype=None, copy: bool = False):

@classmethod
def __ndarray(cls, scalars):
return numpy.array(
[None if scalar is None else cls._datetime(scalar) for scalar in scalars],
"M8[ns]",
)
return numpy.array([cls._datetime(scalar) for scalar in scalars], "M8[ns]",)

@classmethod
def _from_sequence(cls, scalars, *, dtype=None, copy=False):
Expand Down
Loading