Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery: Add page_size parameter to QueryJob.result. #8206

Merged
merged 4 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2832,29 +2832,33 @@ def _blocking_poll(self, timeout=None):
self._done_timeout = timeout
super(QueryJob, self)._blocking_poll(timeout=timeout)

def result(self, timeout=None, retry=DEFAULT_RETRY):
def result(self, timeout=None, page_size=None, retry=DEFAULT_RETRY):
"""Start the job and wait for it to complete and get the result.

:type timeout: float
:param timeout:
How long (in seconds) to wait for job to complete before raising
a :class:`concurrent.futures.TimeoutError`.

:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the call that retrieves rows.
Args:
timeout (float):
How long (in seconds) to wait for job to complete before
raising a :class:`concurrent.futures.TimeoutError`.
page_size (int):
(Optional) The maximum number of rows in each page of results
from this request. Non-positive values are ignored.
retry (google.api_core.retry.Retry):
(Optional) How to retry the call that retrieves rows.

:rtype: :class:`~google.cloud.bigquery.table.RowIterator`
:returns:
Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s.
During each page, the iterator will have the ``total_rows``
attribute set, which counts the total number of rows **in the
result set** (this is distinct from the total number of rows in
the current page: ``iterator.page.num_items``).
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
set, which counts the total number of rows **in the result
set** (this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).

:raises:
:class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`concurrent.futures.TimeoutError` if the job did
not complete in the given timeout.
Raises:
google.cloud.exceptions.GoogleCloudError:
If the job failed.
concurrent.futures.TimeoutError:
If the job did not complete in the given timeout.
"""
super(QueryJob, self).result(timeout=timeout)
# Return an iterator instead of returning the job.
Expand All @@ -2874,7 +2878,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table_ref = self.destination
dest_table = Table(dest_table_ref, schema=schema)
dest_table._properties["numRows"] = self._query_results.total_rows
rows = self._client.list_rows(dest_table, retry=retry)
rows = self._client.list_rows(dest_table, page_size=page_size, retry=retry)
rows._preserve_order = _contains_order_by(self.query)
return rows

Expand Down
9 changes: 9 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,15 @@ def test_query_w_timeout(self):
# 1 second is much too short for this query.
query_job.result(timeout=1)

def test_query_w_page_size(self):
page_size = 45
query_job = Config.CLIENT.query(
"SELECT word FROM `bigquery-public-data.samples.shakespeare`;",
job_id_prefix="test_query_w_page_size_",
)
iterator = query_job.result(page_size=page_size)
self.assertEqual(next(iterator.pages).num_items, page_size)

def test_query_statistics(self):
"""
A system test to exercise some of the extended query statistics.
Expand Down
56 changes: 56 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4168,6 +4168,62 @@ def test_result_w_timeout(self):
self.assertEqual(query_request[1]["query_params"]["timeoutMs"], 900)
self.assertEqual(reload_request[1]["method"], "GET")

def test_result_w_page_size(self):
# Arrange
query_results_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "4",
}
job_resource = self._make_resource(started=True, ended=True)
q_config = job_resource["configuration"]["query"]
q_config["destinationTable"] = {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": self.TABLE_ID,
}
tabledata_resource = {
"totalRows": 4,
"pageToken": "some-page-token",
"rows": [
{"f": [{"v": "row1"}]},
{"f": [{"v": "row2"}]},
{"f": [{"v": "row3"}]},
],
}
tabledata_resource_page_2 = {"totalRows": 4, "rows": [{"f": [{"v": "row4"}]}]}
conn = _make_connection(
query_results_resource, tabledata_resource, tabledata_resource_page_2
)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

# Act
result = job.result(page_size=3)

# Assert
actual_rows = list(result)
self.assertEqual(len(actual_rows), 4)

tabledata_path = "/projects/%s/datasets/%s/tables/%s/data" % (
self.PROJECT,
self.DS_ID,
self.TABLE_ID,
)
conn.api_request.assert_has_calls(
[
mock.call(
method="GET", path=tabledata_path, query_params={"maxResults": 3}
),
mock.call(
method="GET",
path=tabledata_path,
query_params={"pageToken": "some-page-token", "maxResults": 3},
),
]
)

def test_result_error(self):
from google.cloud import exceptions

Expand Down