Skip to content

Commit

Permalink
Add args from 'RowIterator.to_dataframe()' to 'QueryJob.to_dataframe(…
Browse files Browse the repository at this point in the history
…)'. (#7241)
  • Loading branch information
tswast authored and tseaver committed Feb 1, 2019
1 parent e35a67e commit 20c6d16
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 3 deletions.
31 changes: 29 additions & 2 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2778,9 +2778,34 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table = Table(dest_table_ref, schema=schema)
return self._client.list_rows(dest_table, retry=retry)

def to_dataframe(self):
def to_dataframe(self, bqstorage_client=None, dtypes=None):
"""Return a pandas DataFrame from a QueryJob
Args:
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
**Alpha Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.
This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.
Reading from a specific partition or snapshot is not
currently supported by this method.
**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. Write your query
results to a destination table to work around this issue.
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
Expand All @@ -2789,7 +2814,9 @@ def to_dataframe(self):
Raises:
ValueError: If the `pandas` library cannot be imported.
"""
return self.result().to_dataframe()
return self.result().to_dataframe(
bqstorage_client=bqstorage_client, dtypes=dtypes
)

def __iter__(self):
return iter(self.result())
Expand Down
14 changes: 13 additions & 1 deletion google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,19 @@ class _EmptyRowIterator(object):
pages = ()
total_rows = 0

def to_dataframe(self):
def to_dataframe(self, bqstorage_client=None, dtypes=None):
"""Create an empty dataframe.
Args:
bqstorage_client (Any):
Ignored. Added for compatibility with RowIterator.
dtypes (Any):
Ignored. Added for compatibility with RowIterator.
Returns:
pandas.DataFrame:
An empty :class:`~pandas.DataFrame`.
"""
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)
return pandas.DataFrame()
Expand Down
113 changes: 113 additions & 0 deletions tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import copy
import json
import unittest

import mock
Expand All @@ -22,6 +23,10 @@
import pandas
except (ImportError, AttributeError): # pragma: NO COVER
pandas = None
try:
from google.cloud import bigquery_storage_v1beta1
except (ImportError, AttributeError): # pragma: NO COVER
bigquery_storage_v1beta1 = None


def _make_credentials():
Expand Down Expand Up @@ -4543,6 +4548,114 @@ def test_to_dataframe(self):
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ["name", "age"]) # verify the column names

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_ddl_query(self):
# Destination table may have no schema for some DDL and DML queries.
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": []},
}
connection = _make_connection(query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)

df = job.to_dataframe()

self.assertEqual(len(df), 0)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage(self):
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"totalRows": "4",
"schema": {
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
}
connection = _make_connection(query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "long"]},
],
}
)
bqstorage_client.create_read_session.return_value = session

job.to_dataframe(bqstorage_client=bqstorage_client)

bqstorage_client.create_read_session.assert_called_once_with(
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_column_dtypes(self):
begun_resource = self._make_resource()
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"totalRows": "4",
"schema": {
"fields": [
{"name": "start_timestamp", "type": "TIMESTAMP"},
{"name": "seconds", "type": "INT64"},
{"name": "miles", "type": "FLOAT64"},
{"name": "km", "type": "FLOAT64"},
{"name": "payment_type", "type": "STRING"},
{"name": "complete", "type": "BOOL"},
{"name": "date", "type": "DATE"},
]
},
}
row_data = [
["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
query_resource["rows"] = rows
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(
begun_resource, query_resource, done_resource, query_resource
)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

df = job.to_dataframe(dtypes={"km": "float16"})

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 3) # verify the number of rows
exp_columns = [field["name"] for field in query_resource["schema"]["fields"]]
self.assertEqual(list(df), exp_columns) # verify the column names

self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]")
self.assertEqual(df.seconds.dtype.name, "int64")
self.assertEqual(df.miles.dtype.name, "float64")
self.assertEqual(df.km.dtype.name, "float16")
self.assertEqual(df.payment_type.dtype.name, "object")
self.assertEqual(df.complete.dtype.name, "bool")
self.assertEqual(df.date.dtype.name, "object")

def test_iter(self):
import types

Expand Down

0 comments on commit 20c6d16

Please sign in to comment.