Skip to content

refactor: make to_pandas() call to_arrow() and use local dtypes in DataFrame construction #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 26, 2023
Merged
41 changes: 7 additions & 34 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
from typing import Iterable, List, Optional, Sequence, Tuple
import warnings

import geopandas as gpd # type: ignore
import google.cloud.bigquery as bigquery
import numpy
import pandas as pd
import pyarrow as pa # type: ignore

import bigframes.constants as constants
import bigframes.core as core
Expand All @@ -46,6 +43,7 @@
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.session._io.pandas
import third_party.bigframes_vendored.pandas.io.common as vendored_pandas_io_common

# Type constraint for wherever column labels are used
Expand Down Expand Up @@ -372,34 +370,11 @@ def reorder_levels(self, ids: typing.Sequence[str]):
level_names = [self.col_id_to_index_name[index_id] for index_id in ids]
return Block(self.expr, ids, self.column_labels, level_names)

@classmethod
def _to_dataframe(
cls, result, schema: typing.Mapping[str, bigframes.dtypes.Dtype]
) -> pd.DataFrame:
def _to_dataframe(self, result) -> pd.DataFrame:
"""Convert BigQuery data to pandas DataFrame with specific dtypes."""
dtypes = bigframes.dtypes.to_pandas_dtypes_overrides(result.schema)
df = result.to_dataframe(
dtypes=dtypes,
bool_dtype=pd.BooleanDtype(),
int_dtype=pd.Int64Dtype(),
float_dtype=pd.Float64Dtype(),
string_dtype=pd.StringDtype(storage="pyarrow"),
date_dtype=pd.ArrowDtype(pa.date32()),
datetime_dtype=pd.ArrowDtype(pa.timestamp("us")),
time_dtype=pd.ArrowDtype(pa.time64("us")),
timestamp_dtype=pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
)

# Convert Geography column from StringDType to GeometryDtype.
for column_name, dtype in schema.items():
if dtype == gpd.array.GeometryDtype():
df[column_name] = gpd.GeoSeries.from_wkt(
# https://github.com/geopandas/geopandas/issues/1879
df[column_name].replace({numpy.nan: None}),
# BigQuery geography type is based on the WGS84 reference ellipsoid.
crs="EPSG:4326",
)
return df
dtypes = dict(zip(self.index_columns, self.index_dtypes))
dtypes.update(zip(self.value_columns, self.dtypes))
return self._expr._session._rows_to_dataframe(result, dtypes)

def to_pandas(
self,
Expand Down Expand Up @@ -480,8 +455,7 @@ def _compute_and_count(
if sampling_method == _HEAD:
total_rows = int(results_iterator.total_rows * fraction)
results_iterator.max_results = total_rows
schema = dict(zip(self.value_columns, self.dtypes))
df = self._to_dataframe(results_iterator, schema)
df = self._to_dataframe(results_iterator)

if self.index_columns:
df.set_index(list(self.index_columns), inplace=True)
Expand Down Expand Up @@ -510,8 +484,7 @@ def _compute_and_count(
)
else:
total_rows = results_iterator.total_rows
schema = dict(zip(self.value_columns, self.dtypes))
df = self._to_dataframe(results_iterator, schema)
df = self._to_dataframe(results_iterator)

if self.index_columns:
df.set_index(list(self.index_columns), inplace=True)
Expand Down
3 changes: 2 additions & 1 deletion bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,10 @@ def to_pandas(self) -> pandas.Index:
"""Executes deferred operations and downloads the results."""
# Project down to only the index column. So the query can be cached to visualize other data.
index_columns = list(self._block.index_columns)
dtypes = dict(zip(index_columns, self.dtypes))
expr = self._expr.select_columns(index_columns)
results, _ = expr.start_query()
df = expr._session._rows_to_dataframe(results)
df = expr._session._rows_to_dataframe(results, dtypes)
df = df.set_index(index_columns)
index = df.index
index.names = list(self._block._index_labels)
Expand Down
6 changes: 6 additions & 0 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ def ibis_dtype_to_bigframes_dtype(
if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))

# BigQuery only supports integers of size 64 bits.
if isinstance(ibis_dtype, ibis_dtypes.Integer):
return pd.Int64Dtype()

if ibis_dtype in IBIS_TO_BIGFRAMES:
return IBIS_TO_BIGFRAMES[ibis_dtype]
elif isinstance(ibis_dtype, ibis_dtypes.Null):
Expand Down Expand Up @@ -372,6 +376,8 @@ def cast_ibis_value(
ibis_dtypes.float64: (ibis_dtypes.string, ibis_dtypes.int64),
ibis_dtypes.string: (ibis_dtypes.int64, ibis_dtypes.float64),
ibis_dtypes.date: (),
ibis_dtypes.Decimal(precision=38, scale=9): (ibis_dtypes.float64,),
ibis_dtypes.Decimal(precision=76, scale=38): (ibis_dtypes.float64,),
ibis_dtypes.time: (),
ibis_dtypes.timestamp: (ibis_dtypes.Timestamp(timezone="UTC"),),
ibis_dtypes.Timestamp(timezone="UTC"): (ibis_dtypes.timestamp,),
Expand Down
10 changes: 3 additions & 7 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,14 +1515,10 @@ def _get_table_size(self, destination_table):
return table.num_bytes

def _rows_to_dataframe(
self, row_iterator: bigquery.table.RowIterator
self, row_iterator: bigquery.table.RowIterator, dtypes: Dict
) -> pandas.DataFrame:
return row_iterator.to_dataframe(
bool_dtype=pandas.BooleanDtype(),
int_dtype=pandas.Int64Dtype(),
float_dtype=pandas.Float64Dtype(),
string_dtype=pandas.StringDtype(storage="pyarrow"),
)
arrow_table = row_iterator.to_arrow()
return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)

def _start_generic_job(self, job: formatting_helpers.GenericJob):
if bigframes.options.display.progress_bar is not None:
Expand Down
77 changes: 77 additions & 0 deletions bigframes/session/_io/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, Union

import geopandas # type: ignore
import pandas
import pandas.arrays
import pyarrow # type: ignore
import pyarrow.compute # type: ignore

import bigframes.constants


def arrow_to_pandas(
arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch], dtypes: Dict
):
if len(dtypes) != arrow_table.num_columns:
raise ValueError(
f"Number of types {len(dtypes)} doesn't match number of columns "
f"{arrow_table.num_columns}. {bigframes.constants.FEEDBACK_LINK}"
)

serieses = {}
for field, column in zip(arrow_table.schema, arrow_table):
dtype = dtypes[field.name]

if dtype == geopandas.array.GeometryDtype():
series = geopandas.GeoSeries.from_wkt(
column,
# BigQuery geography type is based on the WGS84 reference ellipsoid.
crs="EPSG:4326",
)
elif dtype == pandas.Float64Dtype():
# Preserve NA/NaN distinction. Note: This is currently needed, even if we use
# nullable Float64Dtype in the types_mapper. See:
# https://github.com/pandas-dev/pandas/issues/55668
# Regarding type: ignore, this class has been public at this
# location since pandas 1.2.0. See:
# https://pandas.pydata.org/docs/dev/reference/api/pandas.arrays.FloatingArray.html
pd_array = pandas.arrays.FloatingArray( # type: ignore
column.to_numpy(),
pyarrow.compute.is_null(column).to_numpy(),
)
series = pandas.Series(pd_array, dtype=dtype)
elif dtype == pandas.Int64Dtype():
# Avoid out-of-bounds errors in Pandas 1.5.x, which incorrectly
# casts to float64 in an intermediate step.
pd_array = pandas.arrays.IntegerArray(
pyarrow.compute.fill_null(column, 0).to_numpy(),
pyarrow.compute.is_null(column).to_numpy(),
)
series = pandas.Series(pd_array, dtype=dtype)
elif isinstance(dtype, pandas.ArrowDtype):
# Avoid conversion logic if we are backing the pandas Series by the
# arrow array.
series = pandas.Series(
pandas.arrays.ArrowExtensionArray(column), # type: ignore
dtype=dtype,
)
else:
series = column.to_pandas(types_mapper=lambda _: dtype)

serieses[field.name] = series

return pandas.DataFrame(serieses)
10 changes: 0 additions & 10 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2046,16 +2046,6 @@ def test__dir__with_rename(scalars_dfs):
def test_iloc_slice(scalars_df_index, scalars_pandas_df_index, start, stop, step):
bf_result = scalars_df_index.iloc[start:stop:step].to_pandas()
pd_result = scalars_pandas_df_index.iloc[start:stop:step]

# Pandas may assign non-object dtype to empty series and series index
# dtypes of empty columns are a known area of divergence from pandas
for column in pd_result.columns:
if (
pd_result[column].empty and column != "geography_col"
): # for empty geography_col, bigframes assigns non-object dtype
pd_result[column] = pd_result[column].astype("object")
pd_result.index = pd_result.index.astype("object")

pd.testing.assert_frame_equal(
bf_result,
pd_result,
Expand Down
48 changes: 37 additions & 11 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,15 @@ def test_series_int_int_operators_series(scalars_dfs, operator):
)
def test_mods(scalars_dfs, col_x, col_y, method):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = getattr(scalars_df[col_x], method)(scalars_df[col_y]).to_pandas()
x_bf = scalars_df[col_x]
y_bf = scalars_df[col_y]
bf_series = getattr(x_bf, method)(y_bf)
# BigQuery's mod functions return [BIG]NUMERIC values unless both arguments are integers.
# https://cloud.google.com/bigquery/docs/reference/standard-sql/mathematical_functions#mod
if x_bf.dtype == pd.Int64Dtype() and y_bf.dtype == pd.Int64Dtype():
bf_result = bf_series.to_pandas()
else:
bf_result = bf_series.astype("Float64").to_pandas()
pd_result = getattr(scalars_pandas_df[col_x], method)(scalars_pandas_df[col_y])
pd.testing.assert_series_equal(pd_result, bf_result)

Expand Down Expand Up @@ -620,8 +628,20 @@ def test_divmods_series(scalars_dfs, col_x, col_y, method):
pd_div_result, pd_mod_result = getattr(scalars_pandas_df[col_x], method)(
scalars_pandas_df[col_y]
)
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
# BigQuery's mod functions return NUMERIC values for non-INT64 inputs.
if bf_div_result.dtype == pd.Int64Dtype():
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
else:
pd.testing.assert_series_equal(
pd_div_result, bf_div_result.astype("Float64").to_pandas()
)

if bf_mod_result.dtype == pd.Int64Dtype():
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
else:
pd.testing.assert_series_equal(
pd_mod_result, bf_mod_result.astype("Float64").to_pandas()
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -649,8 +669,20 @@ def test_divmods_scalars(scalars_dfs, col_x, other, method):
scalars_df, scalars_pandas_df = scalars_dfs
bf_div_result, bf_mod_result = getattr(scalars_df[col_x], method)(other)
pd_div_result, pd_mod_result = getattr(scalars_pandas_df[col_x], method)(other)
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
# BigQuery's mod functions return NUMERIC values for non-INT64 inputs.
if bf_div_result.dtype == pd.Int64Dtype():
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
else:
pd.testing.assert_series_equal(
pd_div_result, bf_div_result.astype("Float64").to_pandas()
)

if bf_mod_result.dtype == pd.Int64Dtype():
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
else:
pd.testing.assert_series_equal(
pd_mod_result, bf_mod_result.astype("Float64").to_pandas()
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1941,12 +1973,6 @@ def test_iloc_nested(scalars_df_index, scalars_pandas_df_index):
def test_series_iloc(scalars_df_index, scalars_pandas_df_index, start, stop, step):
bf_result = scalars_df_index["string_col"].iloc[start:stop:step].to_pandas()
pd_result = scalars_pandas_df_index["string_col"].iloc[start:stop:step]

# Pandas may assign non-object dtype to empty series and series index
if pd_result.empty:
pd_result = pd_result.astype("object")
pd_result.index = pd_result.index.astype("object")

pd.testing.assert_series_equal(
bf_result,
pd_result,
Expand Down
Loading