Skip to content

Commit

Permalink
feat(bigquery): add RowIterator.to_dataframe_iterable method to get…
Browse files Browse the repository at this point in the history
… pandas DataFrame per page (#10017)

* feat(bigquery): make rowIterator._to_dataframe_iterable public

* feat(bigquery): cosmetic changes and unittest change

* feat(bigquery): change as per comment
  • Loading branch information
HemangChothani authored and tswast committed Jan 10, 2020
1 parent 3bb565b commit 240f381
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
39 changes: 36 additions & 3 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,11 +1554,44 @@ def to_arrow(
arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema)
return pyarrow.Table.from_batches(record_batches, schema=arrow_schema)

def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
"""Create an iterable of pandas DataFrames, to process the table as a stream.
See ``to_dataframe`` for argument descriptions.
Args:
bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient):
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery.
This method requires the ``pyarrow`` and
``google-cloud-bigquery-storage`` libraries.
Reading from a specific partition or snapshot is not
currently supported by this method.
**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. When a problem
is encountered reading a table, the tabledata.list method
from the BigQuery API is used, instead.
dtypes (Map[str, Union[str, pandas.Series.dtype]]):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A generator of :class:`~pandas.DataFrame`.
Raises:
ValueError:
If the :mod:`pandas` library cannot be imported.
"""
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)
if dtypes is None:
dtypes = {}

column_names = [field.name for field in self._schema]
bqstorage_download = functools.partial(
_pandas_helpers.download_dataframe_bqstorage,
Expand Down Expand Up @@ -1683,7 +1716,7 @@ def to_dataframe(
progress_bar = self._get_progress_bar(progress_bar_type)

frames = []
for frame in self._to_dataframe_iterable(
for frame in self.to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
Expand Down
62 changes: 62 additions & 0 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,68 @@ def test_to_arrow_w_pyarrow_none(self):
with self.assertRaises(ValueError):
row_iterator.to_arrow()

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_iterable(self):
from google.cloud.bigquery.schema import SchemaField
import types

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]

path = "/foo"
api_request = mock.Mock(
side_effect=[
{
"rows": [{"f": [{"v": "Bengt"}, {"v": "32"}]}],
"pageToken": "NEXTPAGE",
},
{"rows": [{"f": [{"v": "Sven"}, {"v": "33"}]}]},
]
)

row_iterator = self._make_one(
_mock_client(), api_request, path, schema, page_size=1, max_results=5
)
dfs = row_iterator.to_dataframe_iterable()

self.assertIsInstance(dfs, types.GeneratorType)

df_1 = next(dfs)
self.assertIsInstance(df_1, pandas.DataFrame)
self.assertEqual(df_1.name.dtype.name, "object")
self.assertEqual(df_1.age.dtype.name, "int64")
self.assertEqual(len(df_1), 1) # verify the number of rows
self.assertEqual(
df_1["name"][0], "Bengt"
) # verify the first value of 'name' column
self.assertEqual(df_1["age"][0], 32) # verify the first value of 'age' column

df_2 = next(dfs)
self.assertEqual(len(df_2), 1) # verify the number of rows
self.assertEqual(df_2["name"][0], "Sven")
self.assertEqual(df_2["age"][0], 33)

@mock.patch("google.cloud.bigquery.table.pandas", new=None)
def test_to_dataframe_iterable_error_if_pandas_is_none(self):
from google.cloud.bigquery.schema import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = self._make_one(_mock_client(), api_request, path, schema)

with pytest.raises(ValueError, match="pandas"):
row_iterator.to_dataframe_iterable()

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe(self):
from google.cloud.bigquery.schema import SchemaField
Expand Down

0 comments on commit 240f381

Please sign in to comment.