Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Remove Client.query_rows() #4429

Merged
merged 1 commit into from
Nov 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 2 additions & 66 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import absolute_import

import collections
import concurrent.futures
import functools
import os
import uuid
Expand All @@ -29,8 +28,6 @@
from google.resumable_media.requests import ResumableUpload

from google.api_core import page_iterator
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import NotFound
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

Expand Down Expand Up @@ -1144,67 +1141,6 @@ def create_rows_json(self, table, json_rows, row_ids=None,

return errors

def query_rows(
self, query, job_config=None, job_id=None, job_id_prefix=None,
timeout=None, retry=DEFAULT_RETRY):
"""Start a query job and wait for the results.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query

:type query: str
:param query:
SQL query to be executed. Defaults to the standard SQL dialect.
Use the ``job_config`` parameter to change dialects.

:type job_config: :class:`google.cloud.bigquery.job.QueryJobConfig`
:param job_config: (Optional) Extra configuration options for the job.

:type job_id: str
:param job_id: (Optional) ID to use for the query job.

:type job_id_prefix: str or ``NoneType``
:param job_id_prefix: (Optional) the user-provided prefix for a
randomly generated job ID. This parameter will be
ignored if a ``job_id`` is also given.

:type timeout: float
:param timeout:
(Optional) How long (in seconds) to wait for job to complete
before raising a :class:`concurrent.futures.TimeoutError`.

:rtype: :class:`~google.api_core.page_iterator.Iterator`
:returns:
Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s.
During each page, the iterator will have the ``total_rows``
attribute set, which counts the total number of rows **in the
result set** (this is distinct from the total number of rows in
the current page: ``iterator.page.num_items``).

:raises:
:class:`~google.api_core.exceptions.GoogleAPICallError` if the
job failed or :class:`concurrent.futures.TimeoutError` if the job
did not complete in the given timeout.

When an exception happens, the query job will be cancelled on a
best-effort basis.
"""
job_id = _make_job_id(job_id, job_id_prefix)

try:
job = self.query(
query, job_config=job_config, job_id=job_id, retry=retry)
rows_iterator = job.result(timeout=timeout)
except (GoogleAPICallError, concurrent.futures.TimeoutError):
try:
self.cancel_job(job_id)
except NotFound:
# It's OK if couldn't cancel because job never got created.
pass
raise

return rows_iterator

def list_rows(self, table, selected_fields=None, max_results=None,
page_token=None, start_index=None, retry=DEFAULT_RETRY):
"""List the rows of the table.
Expand Down Expand Up @@ -1303,12 +1239,12 @@ def list_partitions(self, table, retry=DEFAULT_RETRY):
"""
config = QueryJobConfig()
config.use_legacy_sql = True # required for '$' syntax
rows = self.query_rows(
query_job = self.query(
'SELECT partition_id from [%s:%s.%s$__PARTITIONS_SUMMARY__]' %
(table.project, table.dataset_id, table.table_id),
job_config=config,
retry=retry)
return [row[0] for row in rows]
return [row[0] for row in query_job]


# pylint: disable=unused-argument
Expand Down
32 changes: 17 additions & 15 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ def test_job_cancel(self):
# raise an error, and that the job completed (in the `retry()`
# above).

def test_query_rows_w_legacy_sql_types(self):
def test_query_w_legacy_sql_types(self):
naive = datetime.datetime(2016, 12, 5, 12, 41, 9)
stamp = '%s %s' % (naive.date().isoformat(), naive.time().isoformat())
zoned = naive.replace(tzinfo=UTC)
Expand Down Expand Up @@ -706,7 +706,7 @@ def test_query_rows_w_legacy_sql_types(self):
for example in examples:
job_config = bigquery.QueryJobConfig()
job_config.use_legacy_sql = True
rows = list(Config.CLIENT.query_rows(
rows = list(Config.CLIENT.query(
example['sql'], job_config=job_config))
self.assertEqual(len(rows), 1)
self.assertEqual(len(rows[0]), 1)
Expand Down Expand Up @@ -806,26 +806,28 @@ def _generate_standard_sql_types_examples(self):
},
]

def test_query_rows_w_standard_sql_types(self):
def test_query_w_standard_sql_types(self):
examples = self._generate_standard_sql_types_examples()
for example in examples:
rows = list(Config.CLIENT.query_rows(example['sql']))
rows = list(Config.CLIENT.query(example['sql']))
self.assertEqual(len(rows), 1)
self.assertEqual(len(rows[0]), 1)
self.assertEqual(rows[0][0], example['expected'])

def test_query_rows_w_failed_query(self):
def test_query_w_failed_query(self):
from google.api_core.exceptions import BadRequest

with self.assertRaises(BadRequest):
Config.CLIENT.query_rows('invalid syntax;')
Config.CLIENT.query('invalid syntax;').result()

def test_query_w_timeout(self):
query_job = Config.CLIENT.query(
'SELECT * FROM `bigquery-public-data.github_repos.commits`;',
job_id_prefix='test_query_w_timeout_')

def test_query_rows_w_timeout(self):
with self.assertRaises(concurrent.futures.TimeoutError):
Config.CLIENT.query_rows(
'SELECT * FROM `bigquery-public-data.github_repos.commits`;',
job_id_prefix='test_query_rows_w_timeout_',
timeout=1) # 1 second is much too short for this query.
# 1 second is much too short for this query.
query_job.result(timeout=1)

def test_dbapi_w_standard_sql_types(self):
examples = self._generate_standard_sql_types_examples()
Expand Down Expand Up @@ -1224,9 +1226,9 @@ def test_large_query_w_public_data(self):
SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format(
PUBLIC, DATASET_ID, TABLE_NAME, LIMIT)

iterator = Config.CLIENT.query_rows(SQL)
query_job = Config.CLIENT.query(SQL)

rows = list(iterator)
rows = list(query_job)
self.assertEqual(len(rows), LIMIT)

def test_query_future(self):
Expand Down Expand Up @@ -1256,7 +1258,7 @@ def test_query_table_def(self):
job_config.table_definitions = {table_id: ec}
sql = 'SELECT * FROM %s' % table_id

got_rows = Config.CLIENT.query_rows(sql, job_config=job_config)
got_rows = Config.CLIENT.query(sql, job_config=job_config)

row_tuples = [r.values() for r in got_rows]
by_age = operator.itemgetter(1)
Expand All @@ -1280,7 +1282,7 @@ def test_query_external_table(self):

sql = 'SELECT * FROM %s.%s' % (dataset_id, table_id)

got_rows = Config.CLIENT.query_rows(sql)
got_rows = Config.CLIENT.query(sql)

row_tuples = [r.values() for r in got_rows]
by_age = operator.itemgetter(1)
Expand Down
Loading