From 240f381a770d4378f8011658796da2c488e40467 Mon Sep 17 00:00:00 2001 From: HemangChothani <50404902+HemangChothani@users.noreply.github.com> Date: Sat, 11 Jan 2020 02:54:40 +0530 Subject: [PATCH] feat(bigquery): add `RowIterator.to_dataframe_iterable` method to get pandas DataFrame per page (#10017) * feat(bigquery): make rowIterator._to_dataframe_iterable public * feat(bigquery): cosmetic changes and unittest change * feat(bigquery): change as per comment --- bigquery/google/cloud/bigquery/table.py | 39 ++++++++++++++-- bigquery/tests/unit/test_table.py | 62 +++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index a71acf8ecc8a..585676490c38 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1554,11 +1554,44 @@ def to_arrow( arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) return pyarrow.Table.from_batches(record_batches, schema=arrow_schema) - def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): + def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): """Create an iterable of pandas DataFrames, to process the table as a stream. - See ``to_dataframe`` for argument descriptions. + Args: + bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): + **Beta Feature** Optional. A BigQuery Storage API client. If + supplied, use the faster BigQuery Storage API to fetch rows + from BigQuery. + + This method requires the ``pyarrow`` and + ``google-cloud-bigquery-storage`` libraries. + + Reading from a specific partition or snapshot is not + currently supported by this method. + + **Caution**: There is a known issue reading small anonymous + query result tables with the BQ Storage API. When a problem + is encountered reading a table, the tabledata.list method + from the BigQuery API is used, instead. + dtypes (Map[str, Union[str, pandas.Series.dtype]]): + Optional. A dictionary of column names pandas ``dtype``s. The + provided ``dtype`` is used when constructing the series for + the column specified. Otherwise, the default pandas behavior + is used. + + Returns: + pandas.DataFrame: + A generator of :class:`~pandas.DataFrame`. + + Raises: + ValueError: + If the :mod:`pandas` library cannot be imported. """ + if pandas is None: + raise ValueError(_NO_PANDAS_ERROR) + if dtypes is None: + dtypes = {} + column_names = [field.name for field in self._schema] bqstorage_download = functools.partial( _pandas_helpers.download_dataframe_bqstorage, @@ -1683,7 +1716,7 @@ def to_dataframe( progress_bar = self._get_progress_bar(progress_bar_type) frames = [] - for frame in self._to_dataframe_iterable( + for frame in self.to_dataframe_iterable( bqstorage_client=bqstorage_client, dtypes=dtypes ): frames.append(frame) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 1043df45f9a3..73fe1c10d49b 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -2014,6 +2014,68 @@ def test_to_arrow_w_pyarrow_none(self): with self.assertRaises(ValueError): row_iterator.to_arrow() + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_iterable(self): + from google.cloud.bigquery.schema import SchemaField + import types + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + + path = "/foo" + api_request = mock.Mock( + side_effect=[ + { + "rows": [{"f": [{"v": "Bengt"}, {"v": "32"}]}], + "pageToken": "NEXTPAGE", + }, + {"rows": [{"f": [{"v": "Sven"}, {"v": "33"}]}]}, + ] + ) + + row_iterator = self._make_one( + _mock_client(), api_request, path, schema, page_size=1, max_results=5 + ) + dfs = row_iterator.to_dataframe_iterable() + + self.assertIsInstance(dfs, types.GeneratorType) + + df_1 = next(dfs) + self.assertIsInstance(df_1, pandas.DataFrame) + self.assertEqual(df_1.name.dtype.name, "object") + self.assertEqual(df_1.age.dtype.name, "int64") + self.assertEqual(len(df_1), 1) # verify the number of rows + self.assertEqual( + df_1["name"][0], "Bengt" + ) # verify the first value of 'name' column + self.assertEqual(df_1["age"][0], 32) # verify the first value of 'age' column + + df_2 = next(dfs) + self.assertEqual(len(df_2), 1) # verify the number of rows + self.assertEqual(df_2["name"][0], "Sven") + self.assertEqual(df_2["age"][0], 33) + + @mock.patch("google.cloud.bigquery.table.pandas", new=None) + def test_to_dataframe_iterable_error_if_pandas_is_none(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + with pytest.raises(ValueError, match="pandas"): + row_iterator.to_dataframe_iterable() + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe(self): from google.cloud.bigquery.schema import SchemaField