Skip to content

Commit

Permalink
fix: KeyboardInterrupt during to_dataframe (with BQ Storage API) …
Browse files Browse the repository at this point in the history
…no longer hangs

I noticed in manually testing `to_dataframe` that it would stop
the current cell when I hit Ctrl-C, but data kept on downloading in the
background. Trying to exit the Python shell, I'd notice that it would
hang until I pressed Ctrl-C a few more times.

Rather than get the DataFrame for each stream in one big chunk, loop
through each block and exit if the function needs to quit early. This
follows the pattern at https://stackoverflow.com/a/29237343/101923

Update tests to ensure multiple progress interval loops.
  • Loading branch information
tswast committed Apr 18, 2019
1 parent 55d7dc6 commit 60cf3aa
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 16 deletions.
53 changes: 47 additions & 6 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
)
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
_MARKER = object()
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.


def _reference_getter(table):
Expand Down Expand Up @@ -1421,16 +1422,56 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
if not session.streams:
return pandas.DataFrame(columns=columns)

# Use finished to notify worker threads when to quit. See:
# https://stackoverflow.com/a/29237343/101923
finished = False

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position)
return rowstream.to_dataframe(session, dtypes=dtypes)
rowstream = bqstorage_client.read_rows(position).rows(session)

frames = []
for page in rowstream.pages:
if finished:
return
frames.append(page.to_dataframe(dtypes=dtypes))

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
return pandas.DataFrame(columns=columns)

# page.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def get_frames(pool):
frames = []

# Manually submit jobs and wait for download to complete rather
# than using pool.map because pool.map continues running in the
# background even if there is an exception on the main thread.
not_done = [
pool.submit(get_dataframe, stream) for stream in session.streams
]

while not_done:
done, not_done = concurrent.futures.wait(
not_done, timeout=_PROGRESS_INTERVAL
)
frames.extend([future.result() for future in done])
return frames

with concurrent.futures.ThreadPoolExecutor() as pool:
frames = pool.map(get_dataframe, session.streams)

# rowstream.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
try:
frames = get_frames(pool)
finally:
# No need for a lock because reading/replacing a variable is
# defined to be an atomic operation in the Python language
# definition (enforced by the global interpreter lock).
finished = True

# Use [columns] to ensure column order matches manually-parsed schema.
return pandas.concat(frames)[columns]

def _get_progress_bar(self, progress_bar_type):
Expand Down
2 changes: 1 addition & 1 deletion bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
]
extras = {
"bqstorage": [
"google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev",
"google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev",
"fastavro>=0.21.2",
],
"pandas": ["pandas>=0.17.1"],
Expand Down
177 changes: 168 additions & 9 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import itertools
import json
import time
import unittest
import warnings

Expand Down Expand Up @@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_empty(self):
def test_to_dataframe_w_bqstorage_no_streams(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut

Expand Down Expand Up @@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_nonempty(self):
def test_to_dataframe_w_bqstorage_empty_streams(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
)
session.avro_schema.schema = json.dumps(
{
"fields": [
{"name": "colA"},
# Not alphabetical to test column order.
{"name": "colC"},
{"name": "colB"},
]
}
)
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_rowstream.to_dataframe.return_value = pandas.DataFrame(
[
{"colA": 1, "colB": "abc", "colC": 2.0},
{"colA": -1, "colB": "def", "colC": 4.0},
]
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows
mock_pages = mock.PropertyMock(return_value=())
type(mock_rows).pages = mock_pages

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

got = row_iterator.to_dataframe(bqstorage_client)

column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertTrue(got.empty)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_nonempty(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
Expand All @@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
}
)
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
Expand All @@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
selected_fields=schema,
)

got = row_iterator.to_dataframe(bqstorage_client)
with mock.patch(
"concurrent.futures.wait", wraps=concurrent.futures.wait
) as mock_wait:
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 2)
self.assertEqual(len(got.index), 6)
# Make sure that this test looped through multiple progress intervals.
self.assertGreaterEqual(mock_wait.call_count, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[
# Use two streams because one will fail with a
# KeyboardInterrupt, and we want to check that the other stream
# ends early.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
)
session.avro_schema.schema = json.dumps(
{"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]}
)
bqstorage_client.create_read_session.return_value = session

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_rowstream.rows.return_value = mock_rows

mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt)
type(mock_cancelled_rows).pages = mock_cancelled_pages
mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows

bqstorage_client.read_rows.side_effect = (
mock_cancelled_rowstream,
mock_rowstream,
)

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

with pytest.raises(KeyboardInterrupt):
row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

# Should not have fetched the third page of results because exit_early
# should have been set.
self.assertLessEqual(mock_page.to_dataframe.call_count, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
Expand Down

0 comments on commit 60cf3aa

Please sign in to comment.