From 60cf3aad2aebc7d1d1f2398346aa0e14449b59f0 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Apr 2019 15:58:34 -0700 Subject: [PATCH] fix: `KeyboardInterrupt` during `to_dataframe` (with BQ Storage API) no longer hangs I noticed in manually testing `to_dataframe` that it would stop the current cell when I hit Ctrl-C, but data kept on downloading in the background. Trying to exit the Python shell, I'd notice that it would hang until I pressed Ctrl-C a few more times. Rather than get the DataFrame for each stream in one big chunk, loop through each block and exit if the function needs to quit early. This follows the pattern at https://stackoverflow.com/a/29237343/101923 Update tests to ensure multiple progress interval loops. --- bigquery/google/cloud/bigquery/table.py | 53 ++++++- bigquery/setup.py | 2 +- bigquery/tests/unit/test_table.py | 177 ++++++++++++++++++++++-- 3 files changed, 216 insertions(+), 16 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 92af19c43ce56..ec0ea813615c1 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -66,6 +66,7 @@ ) _TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"' _MARKER = object() +_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds. def _reference_getter(table): @@ -1421,16 +1422,56 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): if not session.streams: return pandas.DataFrame(columns=columns) + # Use finished to notify worker threads when to quit. See: + # https://stackoverflow.com/a/29237343/101923 + finished = False + def get_dataframe(stream): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - rowstream = bqstorage_client.read_rows(position) - return rowstream.to_dataframe(session, dtypes=dtypes) + rowstream = bqstorage_client.read_rows(position).rows(session) + + frames = [] + for page in rowstream.pages: + if finished: + return + frames.append(page.to_dataframe(dtypes=dtypes)) + + # Avoid errors on unlucky streams with no blocks. pandas.concat + # will fail on an empty list. + if not frames: + return pandas.DataFrame(columns=columns) + + # page.to_dataframe() does not preserve column order. Rearrange at + # the end using manually-parsed schema. + return pandas.concat(frames)[columns] + + def get_frames(pool): + frames = [] + + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + not_done = [ + pool.submit(get_dataframe, stream) for stream in session.streams + ] + + while not_done: + done, not_done = concurrent.futures.wait( + not_done, timeout=_PROGRESS_INTERVAL + ) + frames.extend([future.result() for future in done]) + return frames with concurrent.futures.ThreadPoolExecutor() as pool: - frames = pool.map(get_dataframe, session.streams) - - # rowstream.to_dataframe() does not preserve column order. Rearrange at - # the end using manually-parsed schema. + try: + frames = get_frames(pool) + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + finished = True + + # Use [columns] to ensure column order matches manually-parsed schema. return pandas.concat(frames)[columns] def _get_progress_bar(self, progress_bar_type): diff --git a/bigquery/setup.py b/bigquery/setup.py index 2c4d570d2c7e8..b51fa63a9a757 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ ] extras = { "bqstorage": [ - "google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev", + "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", "fastavro>=0.21.2", ], "pandas": ["pandas>=0.17.1"], diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 7ac982394c9d2..8bba2befccbc7 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import itertools import json +import time import unittest import warnings @@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) - def test_to_dataframe_w_bqstorage_empty(self): + def test_to_dataframe_w_bqstorage_no_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut @@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) - def test_to_dataframe_w_bqstorage_nonempty(self): + def test_to_dataframe_w_bqstorage_empty_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}] + ) + session.avro_schema.schema = json.dumps( + { + "fields": [ + {"name": "colA"}, + # Not alphabetical to test column order. + {"name": "colC"}, + {"name": "colB"}, + ] + } + ) + bqstorage_client.create_read_session.return_value = session + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) - mock_rowstream.to_dataframe.return_value = pandas.DataFrame( - [ - {"colA": 1, "colB": "abc", "colC": 2.0}, - {"colA": -1, "colB": "def", "colC": 4.0}, - ] + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + mock_pages = mock.PropertyMock(return_value=()) + type(mock_rows).pages = mock_pages + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, ) + + got = row_iterator.to_dataframe(bqstorage_client) + + column_names = ["colA", "colC", "colB"] + self.assertEqual(list(got), column_names) + self.assertTrue(got.empty) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_nonempty(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self): } ) bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame( + {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, + columns=["colA", "colB", "colC"], + ) + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) + type(mock_rows).pages = mock_pages + schema = [ schema.SchemaField("colA", "IGNORED"), schema.SchemaField("colC", "IGNORED"), @@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self): selected_fields=schema, ) - got = row_iterator.to_dataframe(bqstorage_client) + with mock.patch( + "concurrent.futures.wait", wraps=concurrent.futures.wait + ) as mock_wait: + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + column_names = ["colA", "colC", "colB"] self.assertEqual(list(got), column_names) - self.assertEqual(len(got.index), 2) + self.assertEqual(len(got.index), 6) + # Make sure that this test looped through multiple progress intervals. + self.assertGreaterEqual(mock_wait.call_count, 2) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + # Speed up testing. + mut._PROGRESS_INTERVAL = 0.01 + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession( + streams=[ + # Use two streams because one will fail with a + # KeyboardInterrupt, and we want to check that the other stream + # ends early. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + ) + session.avro_schema.schema = json.dumps( + {"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]} + ) + bqstorage_client.create_read_session.return_value = session + + def blocking_to_dataframe(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. + time.sleep(2 * mut._PROGRESS_INTERVAL) + return pandas.DataFrame( + {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, + columns=["colA", "colB", "colC"], + ) + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) + type(mock_rows).pages = mock_pages + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + mock_rowstream.rows.return_value = mock_rows + + mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt) + type(mock_cancelled_rows).pages = mock_cancelled_pages + mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream) + mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows + + bqstorage_client.read_rows.side_effect = ( + mock_cancelled_rowstream, + mock_rowstream, + ) + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + with pytest.raises(KeyboardInterrupt): + row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + + # Should not have fetched the third page of results because exit_early + # should have been set. + self.assertLessEqual(mock_page.to_dataframe.call_count, 2) @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(