Skip to content

Commit

Permalink
Add page_size parameter to QueryJob.result. (#8206)
Browse files Browse the repository at this point in the history
  • Loading branch information
AzemaBaptiste authored and tseaver committed Jun 5, 2019
1 parent 8a68bb9 commit 6db6b4e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 20 deletions.
44 changes: 24 additions & 20 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2832,29 +2832,33 @@ def _blocking_poll(self, timeout=None):
self._done_timeout = timeout
super(QueryJob, self)._blocking_poll(timeout=timeout)

def result(self, timeout=None, retry=DEFAULT_RETRY):
def result(self, timeout=None, page_size=None, retry=DEFAULT_RETRY):
"""Start the job and wait for it to complete and get the result.
:type timeout: float
:param timeout:
How long (in seconds) to wait for job to complete before raising
a :class:`concurrent.futures.TimeoutError`.
:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the call that retrieves rows.
Args:
timeout (float):
How long (in seconds) to wait for job to complete before
raising a :class:`concurrent.futures.TimeoutError`.
page_size (int):
(Optional) The maximum number of rows in each page of results
from this request. Non-positive values are ignored.
retry (google.api_core.retry.Retry):
(Optional) How to retry the call that retrieves rows.
:rtype: :class:`~google.cloud.bigquery.table.RowIterator`
:returns:
Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s.
During each page, the iterator will have the ``total_rows``
attribute set, which counts the total number of rows **in the
result set** (this is distinct from the total number of rows in
the current page: ``iterator.page.num_items``).
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
set, which counts the total number of rows **in the result
set** (this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
:raises:
:class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`concurrent.futures.TimeoutError` if the job did
not complete in the given timeout.
Raises:
google.cloud.exceptions.GoogleCloudError:
If the job failed.
concurrent.futures.TimeoutError:
If the job did not complete in the given timeout.
"""
super(QueryJob, self).result(timeout=timeout)
# Return an iterator instead of returning the job.
Expand All @@ -2874,7 +2878,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table_ref = self.destination
dest_table = Table(dest_table_ref, schema=schema)
dest_table._properties["numRows"] = self._query_results.total_rows
rows = self._client.list_rows(dest_table, retry=retry)
rows = self._client.list_rows(dest_table, page_size=page_size, retry=retry)
rows._preserve_order = _contains_order_by(self.query)
return rows

Expand Down
9 changes: 9 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,15 @@ def test_query_w_timeout(self):
# 1 second is much too short for this query.
query_job.result(timeout=1)

def test_query_w_page_size(self):
page_size = 45
query_job = Config.CLIENT.query(
"SELECT word FROM `bigquery-public-data.samples.shakespeare`;",
job_id_prefix="test_query_w_page_size_",
)
iterator = query_job.result(page_size=page_size)
self.assertEqual(next(iterator.pages).num_items, page_size)

def test_query_statistics(self):
"""
A system test to exercise some of the extended query statistics.
Expand Down
56 changes: 56 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4168,6 +4168,62 @@ def test_result_w_timeout(self):
self.assertEqual(query_request[1]["query_params"]["timeoutMs"], 900)
self.assertEqual(reload_request[1]["method"], "GET")

def test_result_w_page_size(self):
# Arrange
query_results_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "4",
}
job_resource = self._make_resource(started=True, ended=True)
q_config = job_resource["configuration"]["query"]
q_config["destinationTable"] = {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": self.TABLE_ID,
}
tabledata_resource = {
"totalRows": 4,
"pageToken": "some-page-token",
"rows": [
{"f": [{"v": "row1"}]},
{"f": [{"v": "row2"}]},
{"f": [{"v": "row3"}]},
],
}
tabledata_resource_page_2 = {"totalRows": 4, "rows": [{"f": [{"v": "row4"}]}]}
conn = _make_connection(
query_results_resource, tabledata_resource, tabledata_resource_page_2
)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

# Act
result = job.result(page_size=3)

# Assert
actual_rows = list(result)
self.assertEqual(len(actual_rows), 4)

tabledata_path = "/projects/%s/datasets/%s/tables/%s/data" % (
self.PROJECT,
self.DS_ID,
self.TABLE_ID,
)
conn.api_request.assert_has_calls(
[
mock.call(
method="GET", path=tabledata_path, query_params={"maxResults": 3}
),
mock.call(
method="GET",
path=tabledata_path,
query_params={"pageToken": "some-page-token", "maxResults": 3},
),
]
)

def test_result_error(self):
from google.cloud import exceptions

Expand Down

0 comments on commit 6db6b4e

Please sign in to comment.