Skip to content

Commit

Permalink
Merge pull request #1836 from tseaver/bigquery-testable_sync_query_sn…
Browse files Browse the repository at this point in the history
…ippets

Make synchronous query snippets testable
  • Loading branch information
tseaver authored Jul 14, 2016
2 parents aed487d + 9da9d82 commit 167a911
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 113 deletions.
118 changes: 17 additions & 101 deletions docs/bigquery-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Authentication / Configuration
>>> from gcloud import bigquery
>>> client = bigquery.Client()


Projects
--------

Expand All @@ -43,13 +44,15 @@ To override the project inferred from the environment, pass an explicit
>>> from gcloud import bigquery
>>> client = bigquery.Client(project='PROJECT_ID')


Project ACLs
~~~~~~~~~~~~

Each project has an access control list granting reader / writer / owner
permission to one or more entities. This list cannot be queried or set
via the API: it must be managed using the Google Developer Console.


Datasets
--------

Expand All @@ -62,6 +65,7 @@ policies to tables as they are created:
- A default table expiration period. If set, tables created within the
dataset will have the value as their expiration period.


Dataset operations
~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -191,104 +195,34 @@ Jobs describe actions peformed on data in BigQuery tables:

List jobs for a project:

.. doctest::
.. literalinclude:: bigquery_snippets.py
:start-after: [START client_list_jobs]
:end-before: [END client_list_jobs]

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> jobs, token = client.list_jobs() # API request
>>> [(job.name, job.job_type, job.created, job.state) for job in jobs]
['load-table-job', 'load', (datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>), 'done')]

Querying data (synchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Run a query which can be expected to complete within bounded time:

.. doctest::

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> QUERY = """\
... SELECT count(*) AS age_count FROM dataset_name.person_ages
... """
>>> query = client.run_sync_query(QUERY)
>>> query.timeout_ms = 1000
>>> query.run() # API request
>>> query.complete
True
>>> len(query.schema)
1
>>> field = query.schema[0]
>>> field.name
u'count'
>>> field.field_type
u'INTEGER'
>>> field.mode
u'NULLABLE'
>>> query.rows
[(15,)]
>>> query.total_rows
1
.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query]
:end-before: [END client_run_sync_query]

If the rows returned by the query do not fit into the inital response,
then we need to fetch the remaining rows via ``fetch_data``:

.. doctest::

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> QUERY = """\
... SELECT * FROM dataset_name.person_ages
... """
>>> query = client.run_sync_query(QUERY)
>>> query.timeout_ms = 1000
>>> query.run() # API request
>>> query.complete
True
>>> query.total_rows
1234
>>> query.page_token
'8d6e452459238eb0fe87d8eb191dd526ee70a35e'
>>> do_something_with(query.schema, query.rows)
>>> token = query.page_token # for initial request
>>> while True:
... do_something_with(query.schema, rows)
... if token is None:
... break
... rows, _, token = query.fetch_data(page_token=token)

.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query_paged]
:end-before: [END client_run_sync_query_paged]

If the query takes longer than the timeout allowed, ``query.complete``
will be ``False``. In that case, we need to poll the associated job until
it is done, and then fetch the reuslts:

.. doctest::

>>> from gcloud import bigquery
>>> client = bigquery.Client()
>>> QUERY = """\
... SELECT * FROM dataset_name.person_ages
... """
>>> query = client.run_sync_query(QUERY)
>>> query.timeout_ms = 1000
>>> query.run() # API request
>>> query.complete
False
>>> job = query.job
>>> retry_count = 100
>>> while retry_count > 0 and job.state == 'running':
... retry_count -= 1
... time.sleep(10)
... job.reload() # API call
>>> job.state
'done'
>>> token = None # for initial request
>>> while True:
... rows, _, token = query.fetch_data(page_token=token)
... do_something_with(query.schema, rows)
... if token is None:
... break

.. literalinclude:: bigquery_snippets.py
:start-after: [START client_run_sync_query_timeout]
:end-before: [END client_run_sync_query_timeout]


Querying data (asynchronous)
Expand Down Expand Up @@ -350,25 +284,6 @@ Poll until the job is complete:
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)

Inserting data (synchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Load data synchronously from a local CSV file into a new table:

.. doctest::

>>> import csv
>>> from gcloud import bigquery
>>> from gcloud.bigquery import SchemaField
>>> client = bigquery.Client()
>>> table = dataset.table(name='person_ages')
>>> table.schema = [
... SchemaField('full_name', 'STRING', mode='required'),
... SchemaField('age', 'INTEGER', mode='required)]
>>> with open('/path/to/person_ages.csv', 'rb') as file_obj:
... reader = csv.reader(file_obj)
... rows = list(reader)
>>> table.insert_data(rows) # API request

Inserting data (asynchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -431,6 +346,7 @@ Poll until the job is complete:
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)


Exporting data (async)
~~~~~~~~~~~~~~~~~~~~~~

Expand Down
127 changes: 121 additions & 6 deletions docs/bigquery_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def delete(self):


@snippet
def client_list_datasets(client, to_delete): # pylint: disable=unused-argument
def client_list_datasets(client, _):
"""List datasets for a project."""

def do_something_with(sub): # pylint: disable=unused-argument
def do_something_with(_):
pass

# [START client_list_datasets]
Expand Down Expand Up @@ -182,7 +182,7 @@ def dataset_update(client, to_delete):


@snippet
def dataset_delete(client, to_delete): # pylint: disable=unused-argument
def dataset_delete(client, _):
"""Delete a dataset."""
DATASET_NAME = 'dataset_delete_%d' % (_millis(),)
dataset = client.dataset(DATASET_NAME)
Expand Down Expand Up @@ -439,13 +439,12 @@ def table_upload_from_file(client, to_delete):


@snippet
def table_delete(client, to_delete): # pylint: disable=unused-argument
def table_delete(client, _):
"""Delete a table."""
DATASET_NAME = 'table_delete_dataset_%d' % (_millis(),)
TABLE_NAME = 'table_create_table_%d' % (_millis(),)
dataset = client.dataset(DATASET_NAME)
dataset.create()
to_delete.append(dataset)

table = dataset.table(TABLE_NAME, SCHEMA)
table.create()
Expand All @@ -457,6 +456,122 @@ def table_delete(client, to_delete): # pylint: disable=unused-argument
# [END table_delete]


@snippet
def client_list_jobs(client, _):
"""List jobs for a project."""

def do_something_with(_):
pass

# [START client_list_jobs]
jobs, token = client.list_jobs() # API request
while True:
for job in jobs:
do_something_with(job)
if token is None:
break
jobs, token = client.list_jobs(page_token=token) # API request
# [END client_list_jobs]


@snippet
def client_run_sync_query(client, _):
"""Run a synchronous query."""
LIMIT = 100
LIMITED = '%s LIMIT %d' % (QUERY, LIMIT)
TIMEOUT_MS = 1000

# [START client_run_sync_query]
query = client.run_sync_query(LIMITED)
query.timeout_ms = TIMEOUT_MS
query.run() # API request

assert query.complete
assert len(query.rows) == LIMIT
assert [field.name for field in query.schema] == ['name']
# [END client_run_sync_query]


@snippet
def client_run_sync_query_paged(client, _):
"""Run a synchronous query with paged results."""
TIMEOUT_MS = 1000
PAGE_SIZE = 100
LIMIT = 1000
LIMITED = '%s LIMIT %d' % (QUERY, LIMIT)

all_rows = []

def do_something_with(rows):
all_rows.extend(rows)

# [START client_run_sync_query_paged]
query = client.run_sync_query(LIMITED)
query.timeout_ms = TIMEOUT_MS
query.max_results = PAGE_SIZE
query.run() # API request

assert query.complete
assert query.page_token is not None
assert len(query.rows) == PAGE_SIZE
assert [field.name for field in query.schema] == ['name']

rows = query.rows
token = query.page_token

while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
# [END client_run_sync_query_paged]

assert total_count == LIMIT
assert len(all_rows) == LIMIT


@snippet
def client_run_sync_query_timeout(client, _):
"""Run a synchronous query w/ timeout"""
TIMEOUT_MS = 10

all_rows = []

def do_something_with(rows):
all_rows.extend(rows)

# [START client_run_sync_query_timeout]
query = client.run_sync_query(QUERY)
query.timeout_ms = TIMEOUT_MS
query.use_query_cache = False
query.run() # API request

assert not query.complete

job = query.job
job.reload() # API rquest
retry_count = 0

while retry_count < 10 and job.state != u'DONE':
time.sleep(1.5**retry_count) # exponential backoff
retry_count += 1
job.reload() # API request

assert job.state == u'DONE'

rows, total_count, token = query.fetch_data() # API request
while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
# [END client_run_sync_query_timeout]

assert len(all_rows) == total_count


def _find_examples():
funcs = [obj for obj in globals().values()
if getattr(obj, '_snippet', False)]
Expand All @@ -468,7 +583,7 @@ def main():
client = Client()
for example in _find_examples():
to_delete = []
print('%-25s: %s' % (
print('%-30s: %s' % (
example.func_name, example.func_doc))
try:
example(client, to_delete)
Expand Down
3 changes: 2 additions & 1 deletion gcloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
path = '/projects/%s/jobs' % (self.project,)
resp = self.connection.api_request(method='GET', path=path,
query_params=params)
jobs = [self.job_from_resource(resource) for resource in resp['jobs']]
jobs = [self.job_from_resource(resource)
for resource in resp.get('jobs', ())]
return jobs, resp.get('nextPageToken')

def load_table_from_storage(self, job_name, destination, *source_uris):
Expand Down
2 changes: 2 additions & 0 deletions gcloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None,
self._set_properties(response)

total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
page_token = response.get('pageToken')
rows_data = _rows_from_json(response.get('rows', ()), self.schema)

Expand Down
4 changes: 2 additions & 2 deletions gcloud/bigquery/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,10 @@ def test_list_jobs_load_job_wo_sourceUris(self):
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(req['query_params'], {'projection': 'full'})

def test_list_jobs_explicit_empty(self):
def test_list_jobs_explicit_missing(self):
PROJECT = 'PROJECT'
PATH = 'projects/%s/jobs' % PROJECT
DATA = {'jobs': []}
DATA = {}
TOKEN = 'TOKEN'
creds = _Credentials()
client = self._makeOne(PROJECT, creds)
Expand Down
Loading

0 comments on commit 167a911

Please sign in to comment.