Skip to content

Commit

Permalink
Refactor _to_dataframe_bqstorage_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast committed Apr 18, 2019
1 parent 60cf3aa commit 44186b6
Showing 1 changed file with 36 additions and 26 deletions.
62 changes: 36 additions & 26 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):

return pandas.concat(frames)

def _to_dataframe_bqstorage_stream(
self, bqstorage_client, dtypes, columns, session, stream
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)

frames = []
for page in rowstream.pages:
if self._to_dataframe_finished:
return
frames.append(page.to_dataframe(dtypes=dtypes))

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
return pandas.DataFrame(columns=columns)

# page.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
if bigquery_storage_v1beta1 is None:
Expand Down Expand Up @@ -1422,37 +1443,27 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
if not session.streams:
return pandas.DataFrame(columns=columns)

# Use finished to notify worker threads when to quit. See:
# https://stackoverflow.com/a/29237343/101923
finished = False

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)

frames = []
for page in rowstream.pages:
if finished:
return
frames.append(page.to_dataframe(dtypes=dtypes))

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
return pandas.DataFrame(columns=columns)

# page.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]
# Use _to_dataframe_finished to notify worker threads when to quit.
# See: https://stackoverflow.com/a/29237343/101923
self._to_dataframe_finished = False

def get_frames(pool):
frames = []

# Manually submit jobs and wait for download to complete rather
# than using pool.map because pool.map continues running in the
# background even if there is an exception on the main thread.
# See: https://github.com/googleapis/google-cloud-python/pull/7698
not_done = [
pool.submit(get_dataframe, stream) for stream in session.streams
pool.submit(
self._to_dataframe_bqstorage_stream,
bqstorage_client,
dtypes,
columns,
session,
stream,
)
for stream in session.streams
]

while not_done:
Expand All @@ -1469,10 +1480,9 @@ def get_frames(pool):
# No need for a lock because reading/replacing a variable is
# defined to be an atomic operation in the Python language
# definition (enforced by the global interpreter lock).
finished = True
self._to_dataframe_finished = True

# Use [columns] to ensure column order matches manually-parsed schema.
return pandas.concat(frames)[columns]
return pandas.concat(frames)

def _get_progress_bar(self, progress_bar_type):
"""Construct a tqdm progress bar object, if tqdm is installed."""
Expand Down

0 comments on commit 44186b6

Please sign in to comment.