Skip to content

Commit

Permalink
fix(bigquerystorage): to_dataframe on an arrow stream uses 2x faster …
Browse files Browse the repository at this point in the history
…to_arrow + to_pandas, internally

Towards https://issuetracker.google.com/140579733
  • Loading branch information
tswast committed Dec 18, 2019
1 parent 8bb4068 commit 2009c86
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 8 deletions.
16 changes: 16 additions & 0 deletions bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,21 @@ def to_dataframe(self, dtypes=None):
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}

# If it's an Arrow stream, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
schema_type = self._read_session.WhichOneof("schema")
if schema_type == "arrow_schema":
record_batch = self.to_arrow()
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
Expand Down Expand Up @@ -403,6 +418,7 @@ def to_dataframe(self, dtypes=None):
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

return self._stream_parser.to_dataframe(self._message, dtypes=dtypes)


Expand Down
88 changes: 80 additions & 8 deletions bigquery_storage/tests/unit/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
return arrow_batches


def _avro_blocks_w_unavailable(avro_blocks):
for block in avro_blocks:
yield block
def _pages_w_unavailable(pages):
for page in pages:
yield page
raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect")


Expand Down Expand Up @@ -371,9 +371,7 @@ def test_rows_w_reconnect(class_under_test, mock_client):
[{"int_col": 123}, {"int_col": 234}],
[{"int_col": 345}, {"int_col": 456}],
]
avro_blocks_1 = _avro_blocks_w_unavailable(
_bq_to_avro_blocks(bq_blocks_1, avro_schema)
)
avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema))
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)

Expand Down Expand Up @@ -433,7 +431,7 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client):
)

reader = class_under_test(
_avro_blocks_w_unavailable(avro_blocks_1),
_pages_w_unavailable(avro_blocks_1),
mock_client,
stream_position,
{"metadata": {"test-key": "test-value"}},
Expand Down Expand Up @@ -680,7 +678,7 @@ def test_to_dataframe_by_page(class_under_test, mock_client):
)

reader = class_under_test(
_avro_blocks_w_unavailable(avro_blocks_1),
_pages_w_unavailable(avro_blocks_1),
mock_client,
stream_position,
{"metadata": {"test-key": "test-value"}},
Expand Down Expand Up @@ -721,6 +719,80 @@ def test_to_dataframe_by_page(class_under_test, mock_client):
)


def test_to_dataframe_by_page_arrow(class_under_test, mock_client):
bq_columns = [
{"name": "int_col", "type": "int64"},
{"name": "bool_col", "type": "bool"},
]
arrow_schema = _bq_to_arrow_schema(bq_columns)
read_session = _generate_arrow_read_session(arrow_schema)

bq_block_1 = [
{"int_col": 123, "bool_col": True},
{"int_col": 234, "bool_col": False},
]
bq_block_2 = [
{"int_col": 345, "bool_col": True},
{"int_col": 456, "bool_col": False},
]
bq_block_3 = [
{"int_col": 567, "bool_col": True},
{"int_col": 789, "bool_col": False},
]
bq_block_4 = [{"int_col": 890, "bool_col": True}]
# Break blocks into two groups to test that iteration continues across
# reconnection.
bq_blocks_1 = [bq_block_1, bq_block_2]
bq_blocks_2 = [bq_block_3, bq_block_4]
batch_1 = _bq_to_arrow_batches(bq_blocks_1, arrow_schema)
batch_2 = _bq_to_arrow_batches(bq_blocks_2, arrow_schema)

mock_client.read_rows.return_value = batch_2

reader = class_under_test(
_pages_w_unavailable(batch_1),
mock_client,
bigquery_storage_v1beta1.types.StreamPosition(),
{},
)
got = reader.rows(read_session)
pages = iter(got.pages)

page_1 = next(pages)
pandas.testing.assert_frame_equal(
page_1.to_dataframe(
dtypes={"int_col": "int64", "bool_col": "bool"}
).reset_index(drop=True),
pandas.DataFrame(bq_block_1, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)

page_2 = next(pages)
pandas.testing.assert_frame_equal(
page_2.to_dataframe().reset_index(drop=True),
pandas.DataFrame(bq_block_2, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)

page_3 = next(pages)
pandas.testing.assert_frame_equal(
page_3.to_dataframe().reset_index(drop=True),
pandas.DataFrame(bq_block_3, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)

page_4 = next(pages)
pandas.testing.assert_frame_equal(
page_4.to_dataframe().reset_index(drop=True),
pandas.DataFrame(bq_block_4, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)


def test_copy_stream_position(mut):
read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream={"name": "test"}, offset=41
Expand Down

0 comments on commit 2009c86

Please sign in to comment.