Skip to content

perf: inline small Series and DataFrames in query text #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,56 @@ def mem_expr_from_pandas(
"""
Builds an in-memory only (SQL only) expr from a pandas dataframe.

Caution: If session is None, only a subset of expr functionality will be available (null Session is usually not supported).
Caution: If session is None, only a subset of expr functionality will
be available (null Session is usually not supported).
"""
# must set non-null column labels. these are not the user-facing labels
pd_df = pd_df.set_axis(
[column or bigframes.core.guid.generate_guid() for column in pd_df.columns],
axis="columns",
)
# We can't include any hidden columns in the ArrayValue constructor, so
# grab the column names before we add the hidden ordering column.
column_names = [str(column) for column in pd_df.columns]
# Make sure column names are all strings.
pd_df = pd_df.set_axis(column_names, axis="columns")
pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))})

# ibis memtable cannot handle NA, must convert to None
pd_df = pd_df.astype("object") # type: ignore
pd_df = pd_df.where(pandas.notnull(pd_df), None)

# NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases.
keys_memtable = ibis.memtable(pd_df)
schema = keys_memtable.schema()
new_schema = []
for column_index, column in enumerate(schema):
if column == ORDER_ID_COLUMN:
new_type: ibis_dtypes.DataType = ibis_dtypes.int64
else:
column_type = schema[column]
# The autodetected type might not be one we can support, such
# as NULL type for empty rows, so convert to a type we do
# support.
new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype(
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type)
)
# TODO(swast): Ibis memtable doesn't use backticks in struct
# field names, so spaces and other characters aren't allowed in
# the memtable context. Blocked by
# https://github.com/ibis-project/ibis/issues/7187
column = f"col_{column_index}"
new_schema.append((column, new_type))

# must set non-null column labels. these are not the user-facing labels
pd_df = pd_df.set_axis(
[column for column, _ in new_schema],
axis="columns",
)
keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema))

return cls(
session, # type: ignore # Session cannot normally be none, see "caution" above
keys_memtable,
columns=[
keys_memtable[f"col_{column_index}"].name(column)
for column_index, column in enumerate(column_names)
],
ordering=ExpressionOrdering(
ordering_value_columns=[OrderingColumnReference(ORDER_ID_COLUMN)],
total_ordering_columns=frozenset([ORDER_ID_COLUMN]),
Expand Down Expand Up @@ -426,11 +461,16 @@ def shape(self) -> typing.Tuple[int, int]:
width = len(self.columns)
count_expr = self._to_ibis_expr(ordering_mode="unordered").count()
sql = self._session.ibis_client.compile(count_expr)
row_iterator, _ = self._session._start_query(
sql=sql,
max_results=1,
)
length = next(row_iterator)[0]

# Support in-memory engines for hermetic unit tests.
if not isinstance(sql, str):
length = self._session.ibis_client.execute(count_expr)
else:
row_iterator, _ = self._session._start_query(
sql=sql,
max_results=1,
)
length = next(row_iterator)[0]
return (length, width)

def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
Expand Down
55 changes: 27 additions & 28 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import third_party.bigframes_vendored.pandas.io.common as vendored_pandas_io_common

# Type constraint for wherever column labels are used
Label = typing.Hashable
Expand Down Expand Up @@ -1522,37 +1523,35 @@ def _is_monotonic(
return result


def block_from_local(data, session=None, use_index=True) -> Block:
# TODO(tbergeron): Handle duplicate column labels
def block_from_local(data, session=None) -> Block:
pd_data = pd.DataFrame(data)
columns = pd_data.columns

column_labels = list(pd_data.columns)
if not all((label is None) or isinstance(label, str) for label in column_labels):
raise NotImplementedError(
f"Only string column labels supported. {constants.FEEDBACK_LINK}"
)
# Make a flattened version to treat as a table.
if len(pd_data.columns.names) > 1:
pd_data.columns = columns.to_flat_index()

if use_index:
if pd_data.index.nlevels > 1:
raise NotImplementedError(
f"multi-indices not supported. {constants.FEEDBACK_LINK}"
)
index_label = pd_data.index.name

index_id = guid.generate_guid()
pd_data = pd_data.reset_index(names=index_id)
keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
return Block(
keys_expr,
column_labels=column_labels,
index_columns=[index_id],
index_labels=[index_label],
)
else:
keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
keys_expr, offsets_id = keys_expr.promote_offsets()
# Constructor will create default range index
return Block(keys_expr, index_columns=[offsets_id], column_labels=column_labels)
index_labels = list(pd_data.index.names)
# The ArrayValue layer doesn't know about indexes, so make sure indexes
# are real columns with unique IDs.
pd_data = pd_data.reset_index(
names=[f"level_{level}" for level in range(len(index_labels))]
)
pd_data = pd_data.set_axis(
vendored_pandas_io_common.dedup_names(
list(pd_data.columns), is_potential_multiindex=False
),
axis="columns",
)
index_ids = pd_data.columns[: len(index_labels)]

keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
return Block(
keys_expr,
column_labels=columns,
index_columns=index_ids,
index_labels=index_labels,
)


def _align_block_to_schema(
Expand Down
4 changes: 1 addition & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
# TODO(swast): Address issues with string escaping and empty tables before
# re-enabling inline data (ibis.memtable) feature.
MAX_INLINE_DF_SIZE = -1
MAX_INLINE_DF_SIZE = 5000

LevelType = typing.Union[str, int]
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
Expand Down
21 changes: 13 additions & 8 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@
),
)

BIGFRAMES_TO_IBIS: Dict[Dtype, IbisDtype] = {
BIGFRAMES_TO_IBIS: Dict[Dtype, ibis_dtypes.DataType] = {
pandas: ibis for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}

IBIS_TO_BIGFRAMES: Dict[
Union[IbisDtype, ReadOnlyIbisDtype], Union[Dtype, np.dtype[Any]]
] = {ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS}
IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = {
ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}
# Allow REQUIRED fields to map correctly.
IBIS_TO_BIGFRAMES.update(
{ibis.copy(nullable=False): pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS}
Expand Down Expand Up @@ -130,7 +130,7 @@


def ibis_dtype_to_bigframes_dtype(
ibis_dtype: Union[IbisDtype, ReadOnlyIbisDtype]
ibis_dtype: ibis_dtypes.DataType,
) -> Union[Dtype, np.dtype[Any]]:
"""Converts an Ibis dtype to a BigQuery DataFrames dtype

Expand All @@ -155,6 +155,9 @@ def ibis_dtype_to_bigframes_dtype(

if ibis_dtype in IBIS_TO_BIGFRAMES:
return IBIS_TO_BIGFRAMES[ibis_dtype]
elif isinstance(ibis_dtype, ibis_dtypes.Null):
# Fallback to STRING for NULL values for most flexibility in SQL.
return IBIS_TO_BIGFRAMES[ibis_dtypes.string]
else:
raise ValueError(
f"Unexpected Ibis data type {ibis_dtype}. {constants.FEEDBACK_LINK}"
Expand Down Expand Up @@ -185,8 +188,8 @@ def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:


def bigframes_dtype_to_ibis_dtype(
bigframes_dtype: Union[DtypeString, Dtype]
) -> IbisDtype:
bigframes_dtype: Union[DtypeString, Dtype, np.dtype[Any]]
) -> ibis_dtypes.DataType:
"""Converts a BigQuery DataFrames supported dtype to an Ibis dtype.

Args:
Expand Down Expand Up @@ -281,7 +284,9 @@ def literal_to_ibis_scalar(
return scalar_expr


def cast_ibis_value(value: ibis_types.Value, to_type: IbisDtype) -> ibis_types.Value:
def cast_ibis_value(
value: ibis_types.Value, to_type: ibis_dtypes.DataType
) -> ibis_types.Value:
"""Perform compatible type casts of ibis values

Args:
Expand Down
4 changes: 1 addition & 3 deletions bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
# TODO(swast): Address issues with string escaping and empty tables before
# re-enabling inline data (ibis.memtable) feature.
MAX_INLINE_SERIES_SIZE = -1
MAX_INLINE_SERIES_SIZE = 5000


class SeriesMethods:
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
"google-cloud-resource-manager >=1.10.3",
"google-cloud-storage >=2.0.0",
# TODO: Relax upper bound once we have fixed `system_prerelease` tests.
"ibis-framework[bigquery] >=6.0.0,<=6.1.0",
"ibis-framework[bigquery] >=6.2.0,<7.0.0dev",
"pandas >=1.5.0",
"pydata-google-auth >=1.8.2",
"requests >=2.27.1",
"scikit-learn >=1.2.2",
"sqlalchemy >=1.4,<3.0",
"sqlalchemy >=1.4,<3.0dev",
"ipywidgets >=7.7.1",
"humanize >= 4.6.0",
]
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ greenlet==2.0.2
grpc-google-iam-v1==0.12.6
grpcio==1.53.0
grpcio-status==1.48.2
ibis-framework==6.0.0
ibis-framework==6.2.0
humanize==4.6.0
identify==2.5.22
idna==3.4
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
85 changes: 85 additions & 0 deletions tests/unit/core/test_blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas
import pandas.testing
import pytest

import bigframes.core.blocks as blocks

from .. import resources


@pytest.mark.parametrize(
("data",),
(
pytest.param(
{"test 1": [1, 2, 3], "test 2": [0.25, 0.5, 0.75]},
id="dict_spaces_in_column_names",
),
pytest.param(
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]],
id="nested_list",
),
pytest.param(
pandas.concat(
[
pandas.Series([1, 2, 3], name="some col"),
pandas.Series([2, 3, 4], name="some col"),
],
axis="columns",
),
id="duplicate_column_names",
),
pytest.param(
pandas.DataFrame(
{"test": [1, 2, 3]},
index=pandas.Index(["a", "b", "c"], name="string index"),
),
id="string_index",
),
pytest.param(
pandas.DataFrame(
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]],
columns=pandas.MultiIndex.from_tuples(
[(1, 1), (1, 2), (0, 0), (0, 1)],
names=["some level", "another level"],
),
),
marks=[
pytest.mark.skipif(
tuple(pandas.__version__.split()) < ("2", "0", "0"),
reason="pandas 1.5.3 treats column MultiIndex as Index of tuples",
),
],
id="multiindex_columns",
),
pytest.param(
pandas.DataFrame(
{"test": [1, 2, 3]},
index=pandas.MultiIndex.from_tuples([(1, 1), (1, 2), (0, 0)]),
),
id="multiindex_rows",
),
),
)
def test_block_from_local(data):
expected = pandas.DataFrame(data)
session = resources.create_pandas_session({})

block = blocks.block_from_local(data, session=session)

pandas.testing.assert_index_equal(block.column_labels, expected.columns)
assert tuple(block.index_labels) == tuple(expected.index.names)
assert block.shape == expected.shape