Skip to content

BUG: Pagination in pandas.io.gbq #5262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 21, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/requirements-2.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ python-dateutil==1.5
pytz==2013b
http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz
html5lib==1.0b2
bigquery==2.0.15
bigquery==2.0.17
2 changes: 1 addition & 1 deletion ci/requirements-2.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ MySQL-python==1.2.4
scipy==0.10.0
beautifulsoup4==4.2.1
statsmodels==0.5.0
bigquery==2.0.15
bigquery==2.0.17
2 changes: 1 addition & 1 deletion ci/requirements-2.7_LOCALE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ lxml==3.2.1
scipy==0.10.0
beautifulsoup4==4.2.1
statsmodels==0.5.0
bigquery==2.0.15
bigquery==2.0.17
108 changes: 68 additions & 40 deletions pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import csv
import logging
from datetime import datetime
import pkg_resources
from distutils.version import LooseVersion

import pandas as pd
import numpy as np
Expand All @@ -19,6 +21,13 @@
import bigquery_client
import gflags as flags
_BQ_INSTALLED = True

_BQ_VERSION = pkg_resources.get_distribution('bigquery').version
if LooseVersion(_BQ_VERSION) >= '2.0.17':
_BQ_VALID_VERSION = True
else:
_BQ_VALID_VERSION = False

except ImportError:
_BQ_INSTALLED = False

Expand Down Expand Up @@ -102,6 +111,9 @@ def _parse_entry(field_value, field_type):
field_value = np.datetime64(timestamp)
elif field_type == 'BOOLEAN':
field_value = field_value == 'true'
# Note that results are unicode, so this will
# fail for non-ASCII characters.. this probably
# functions differently in Python 3
else:
field_value = str(field_value)
return field_value
Expand Down Expand Up @@ -228,68 +240,76 @@ def _parse_data(client, job, index_col=None, col_order=None):
# Iterate over the result rows.
# Since Google's API now requires pagination of results,
# we do that here. The following is repurposed from
# bigquery_client.py :: Client.ReadTableRows()
# bigquery_client.py :: Client._JobTableReader._ReadOnePage

# TODO: Enable Reading From Table, see Client._TableTableReader._ReadOnePage

# Initially, no page token is set
page_token = None

# Most of Google's client API's allow one to set total_rows in case
# the user only wants the first 'n' results from a query. Typically
# they set this to sys.maxint by default, but this caused problems
# during testing - specifically on OS X. It appears that at some
# point in bigquery_client.py, there is an attempt to cast this value
# to an unsigned integer. Depending on the python install,
# sys.maxint may exceed the limitations of unsigned integers.
#
# See:
# https://code.google.com/p/google-bigquery-tools/issues/detail?id=14

# This is hardcoded value for 32bit sys.maxint per
# the above note. Theoretically, we could simply use
# 100,000 (or whatever the current max page size is),
# but this is more flexible in the event of an API change
total_rows = 2147483647

# Keep track of rows read
row_count = 0
# This number is the current max results per page
max_rows = bigquery_client._MAX_ROWS_PER_REQUEST

# How many rows in result set? Initialize to max_rows
total_rows = max_rows

# This is the starting row for a particular page...
# is ignored if page_token is present, though
# it may be useful if we wish to implement SQL like LIMITs
# with minimums
start_row = 0

# Keep our page DataFrames until the end when we
# concatentate them
dataframe_list = list()

# Iterate over all rows
while row_count < total_rows:
data = client.apiclient.tabledata().list(maxResults=total_rows - row_count,
pageToken=page_token,
**table_dict).execute()
current_job = job['jobReference']

# If there are more results than will fit on a page,
# you will recieve a token for the next page.
page_token = data.get('pageToken', None)
# Iterate over all rows
while start_row < total_rows:
# Setup the parameters for getQueryResults() API Call
kwds = dict(current_job)
kwds['maxResults'] = max_rows
# Sets the timeout to 0 because we assume the table is already ready.
# This is because our previous call to Query() is synchronous
# and will block until it's actually done
kwds['timeoutMs'] = 0
# Use start row if there's no page_token ... in other words, the
# user requested to start somewhere other than the beginning...
# presently this is not a parameter to read_gbq(), but it will be
# added eventually.
if page_token:
kwds['pageToken'] = page_token
else:
kwds['startIndex'] = start_row
data = client.apiclient.jobs().getQueryResults(**kwds).execute()
if not data['jobComplete']:
raise BigqueryError('Job was not completed, or was invalid')

# How many rows are there across all pages?
total_rows = min(total_rows, int(data['totalRows'])) # Changed to use get(data[rows],0)
# Note: This is presently the only reason we don't just use
# _ReadOnePage() directly
total_rows = int(data['totalRows'])

page_token = data.get('pageToken', None)
raw_page = data.get('rows', [])
page_array = _parse_page(raw_page, col_names, col_types, col_dtypes)

row_count += len(page_array)
start_row += len(raw_page)
if total_rows > 0:
completed = (100 * row_count) / total_rows
logger.info('Remaining Rows: ' + str(total_rows - row_count) + '(' + str(completed) + '% Complete)')
completed = (100 * start_row) / total_rows
logger.info('Remaining Rows: ' + str(total_rows - start_row) + '(' + str(completed) + '% Complete)')
else:
logger.info('No Rows')

dataframe_list.append(DataFrame(page_array))

# Handle any exceptions that might have occured
if not page_token and row_count != total_rows:
# Did we get enough rows? Note: gbq.py stopped checking for this
# but we felt it was still a good idea.
if not page_token and not raw_page and start_row != total_rows:
raise bigquery_client.BigqueryInterfaceError(
'PageToken missing for %r' % (
bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),))
if not raw_page and row_count != total_rows:
raise bigquery_client.BigqueryInterfaceError(
'Not enough rows returned by server for %r' % (
bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),))
("Not enough rows returned by server. Expected: {0}" + \
" Rows, But Recieved {1}").format(total_rows, start_row))

# Build final dataframe
final_df = concat(dataframe_list, ignore_index=True)
Expand Down Expand Up @@ -355,6 +375,10 @@ def to_gbq(dataframe, destination_table, schema=None, col_order=None, if_exists=
else:
raise ImportError('Could not import Google BigQuery Client.')

if not _BQ_VALID_VERSION:
raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery "
"support, current version " + _BQ_VERSION)

ALLOWED_TYPES = ['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'TIMESTAMP', 'RECORD']

if if_exists == 'replace' and schema is None:
Expand Down Expand Up @@ -456,6 +480,10 @@ def read_gbq(query, project_id = None, destination_table = None, index_col=None,
else:
raise ImportError('Could not import Google BigQuery Client.')

if not _BQ_VALID_VERSION:
raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery "
"support, current version " + _BQ_VERSION)

query_args = kwargs
query_args['project_id'] = project_id
query_args['query'] = query
Expand Down
38 changes: 20 additions & 18 deletions pandas/io/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@ def GetTableSchema(self,table_dict):
# Fake Google BigQuery API Client
class FakeApiClient:
def __init__(self):
self._tabledata = FakeTableData()
self._fakejobs = FakeJobs()


def tabledata(self):
return self._tabledata
def jobs(self):
return self._fakejobs

class FakeTableData:
class FakeJobs:
def __init__(self):
self._list = FakeList()
self._fakequeryresults = FakeResults()

def list(self,maxResults = None, pageToken = None, **table_dict):
return self._list
def getQueryResults(self, job_id=None, project_id=None,
max_results=None, timeout_ms=None, **kwargs):
return self._fakequeryresults

class FakeList:
class FakeResults:
def execute(self):
return {'rows': [ {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'brave'}, {'v': '3'}]},
{'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'attended'}, {'v': '1'}]},
Expand All @@ -68,7 +69,8 @@ def execute(self):
],
'kind': 'bigquery#tableDataList',
'etag': '"4PTsVxg68bQkQs1RJ1Ndewqkgg4/hoRHzb4qfhJAIa2mEewC-jhs9Bg"',
'totalRows': '10'}
'totalRows': '10',
'jobComplete' : True}

####################################################################################

Expand Down Expand Up @@ -225,16 +227,16 @@ def test_column_order_plus_index(self):
correct_frame_small = DataFrame(correct_frame_small)[col_order]
tm.assert_index_equal(result_frame.columns, correct_frame_small.columns)

# @with_connectivity_check
# def test_download_dataset_larger_than_100k_rows(self):
# # Test for known BigQuery bug in datasets larger than 100k rows
# # http://stackoverflow.com/questions/19145587/bq-py-not-paging-results
# if not os.path.exists(self.bq_token):
# raise nose.SkipTest('Skipped because authentication information is not available.')
@with_connectivity_check
def test_download_dataset_larger_than_100k_rows(self):
# Test for known BigQuery bug in datasets larger than 100k rows
# http://stackoverflow.com/questions/19145587/bq-py-not-paging-results
if not os.path.exists(self.bq_token):
raise nose.SkipTest('Skipped because authentication information is not available.')

# client = gbq._authenticate()
# a = gbq.read_gbq("SELECT id, FROM [publicdata:samples.wikipedia] LIMIT 100005")
# self.assertTrue(len(a) == 100005)
client = gbq._authenticate()
a = gbq.read_gbq("SELECT id, FROM [publicdata:samples.wikipedia] LIMIT 100005")
self.assertTrue(len(a) == 100005)

@with_connectivity_check
def test_download_all_data_types(self):
Expand Down