Skip to content

Commit

Permalink
Fallback to BQ API when there are problems reading from BQ Storage. (#…
Browse files Browse the repository at this point in the history
…7633)

The tabledata.list API works for more kinds of tables, including small
anonymous query results tables. By falling back to this API, we enable
a developer to always specify a `bqstorage_client` even when they
aren't writing their query results to a destination table and don't
know how large their query results will be.
  • Loading branch information
tswast authored Apr 1, 2019
1 parent 682903b commit 8049e8a
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 33 deletions.
21 changes: 15 additions & 6 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
except ImportError: # pragma: NO COVER
tqdm = None

import google.api_core.exceptions
from google.api_core.page_iterator import HTTPIterator

import google.cloud._helpers
Expand Down Expand Up @@ -1437,7 +1438,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
**Alpha Feature** Optional. A BigQuery Storage API client. If
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.
Expand All @@ -1448,8 +1449,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
currently supported by this method.
**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. Write your query
results to a destination table to work around this issue.
query result tables with the BQ Storage API. When a problem
is encountered reading a table, the tabledata.list method
from the BigQuery API is used, instead.
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Expand Down Expand Up @@ -1496,9 +1498,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
progress_bar = self._get_progress_bar(progress_bar_type)

if bqstorage_client is not None:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
else:
return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)
try:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
except google.api_core.exceptions.GoogleAPICallError:
# There is a known issue with reading from small anonymous
# query results tables, so some errors are expected. Rather
# than throw those errors, try reading the DataFrame again, but
# with the tabledata.list API.
pass

return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)


class _EmptyRowIterator(object):
Expand Down
61 changes: 34 additions & 27 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1540,35 +1540,42 @@ def test_query_results_to_dataframe_w_bqstorage(self):
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
df = (
Config.CLIENT.query(
query,
# There is a known issue reading small anonymous query result
# tables with the BQ Storage API. Writing to a destination
# table works around this issue.
job_config=bigquery.QueryJobConfig(
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
),
)
.result()
.to_dataframe(bqstorage_client)

job_configs = (
# There is a known issue reading small anonymous query result
# tables with the BQ Storage API. Writing to a destination
# table works around this issue.
bigquery.QueryJobConfig(
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
),
# Check that the client is able to work around the issue with
# reading small anonymous query result tables by falling back to
# the tabledata.list API.
None,
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ["id", "author", "time_ts", "dead"]
self.assertEqual(list(df), column_names)
exp_datatypes = {
"id": int,
"author": six.text_type,
"time_ts": pandas.Timestamp,
"dead": bool,
}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])
for job_config in job_configs:
df = (
Config.CLIENT.query(query, job_config=job_config)
.result()
.to_dataframe(bqstorage_client)
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ["id", "author", "time_ts", "dead"]
self.assertEqual(list(df), column_names)
exp_datatypes = {
"id": int,
"author": six.text_type,
"time_ts": pandas.Timestamp,
"dead": bool,
}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

def test_insert_rows_nested_nested(self):
# See #2951
Expand Down
45 changes: 45 additions & 0 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import pytest
import six

import google.api_core.exceptions

try:
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

try:
import pandas
except (ImportError, AttributeError): # pragma: NO COVER
Expand Down Expand Up @@ -1748,6 +1751,48 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_fallback_to_tabledata_list(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
bqstorage_client.create_read_session.side_effect = google.api_core.exceptions.InternalServerError(
"can't read with bqstorage_client"
)
iterator_schema = [
schema.SchemaField("name", "STRING", mode="REQUIRED"),
schema.SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = mut.RowIterator(
_mock_client(),
api_request,
path,
iterator_schema,
table=mut.Table("proj.dset.tbl"),
)

df = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ["name", "age"]) # verify the column names
self.assertEqual(df.name.dtype.name, "object")
self.assertEqual(df.age.dtype.name, "int64")

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
Expand Down

0 comments on commit 8049e8a

Please sign in to comment.