Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: ensure that KeyboardInterrupt during to_dataframeno longer hangs. #7698

Merged
merged 2 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
)
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
_MARKER = object()
_PROGRESS_INTERVAL = 1.0 # Time between download status updates, in seconds.


def _reference_getter(table):
Expand Down Expand Up @@ -1386,6 +1387,27 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):

return pandas.concat(frames)

def _to_dataframe_bqstorage_stream(
self, bqstorage_client, dtypes, columns, session, stream
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)

frames = []
for page in rowstream.pages:
if self._to_dataframe_finished:
return
frames.append(page.to_dataframe(dtypes=dtypes))

# Avoid errors on unlucky streams with no blocks. pandas.concat
# will fail on an empty list.
if not frames:
return pandas.DataFrame(columns=columns)

# page.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
if bigquery_storage_v1beta1 is None:
Expand Down Expand Up @@ -1421,17 +1443,46 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
if not session.streams:
return pandas.DataFrame(columns=columns)

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position)
return rowstream.to_dataframe(session, dtypes=dtypes)
# Use _to_dataframe_finished to notify worker threads when to quit.
# See: https://stackoverflow.com/a/29237343/101923
self._to_dataframe_finished = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly I'm trying to reason about scope here. This is a worker pool, but is it possible we're generating multiple independent dataframes that would share the same access to _to_dataframe_finished? Do we need to key the workers to specific invocations, or is the nature of the access always blocking so that this isn't an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple reasons that I don't think it's an issue:

  • Yes, to_dataframe() is a blocking call, so you wouldn't really have multiple going at once.
  • RowIterator isn't really something you'd want to use across threads or even more than once anyway. Because of how the pagination state works, once you loop through all the rows once, to_dataframe() returns an empty DataFrame, even in the case where BQ Storage API isn't used.


def get_frames(pool):
frames = []

# Manually submit jobs and wait for download to complete rather
# than using pool.map because pool.map continues running in the
# background even if there is an exception on the main thread.
# See: https://github.com/googleapis/google-cloud-python/pull/7698
not_done = [
pool.submit(
self._to_dataframe_bqstorage_stream,
bqstorage_client,
dtypes,
columns,
session,
stream,
)
for stream in session.streams
]

while not_done:
done, not_done = concurrent.futures.wait(
not_done, timeout=_PROGRESS_INTERVAL
)
frames.extend([future.result() for future in done])
return frames

with concurrent.futures.ThreadPoolExecutor() as pool:
frames = pool.map(get_dataframe, session.streams)
try:
frames = get_frames(pool)
finally:
# No need for a lock because reading/replacing a variable is
# defined to be an atomic operation in the Python language
# definition (enforced by the global interpreter lock).
self._to_dataframe_finished = True

# rowstream.to_dataframe() does not preserve column order. Rearrange at
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]
return pandas.concat(frames)

def _get_progress_bar(self, progress_bar_type):
"""Construct a tqdm progress bar object, if tqdm is installed."""
Expand Down
2 changes: 1 addition & 1 deletion bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
]
extras = {
"bqstorage": [
"google-cloud-bigquery-storage >= 0.2.0dev1, <2.0.0dev",
"google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev",
"fastavro>=0.21.2",
],
"pandas": ["pandas>=0.17.1"],
Expand Down
177 changes: 168 additions & 9 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import itertools
import json
import time
import unittest
import warnings

Expand Down Expand Up @@ -1705,7 +1707,7 @@ def test_to_dataframe_error_if_pandas_is_none(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_empty(self):
def test_to_dataframe_w_bqstorage_no_streams(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut

Expand Down Expand Up @@ -1746,18 +1748,70 @@ def test_to_dataframe_w_bqstorage_empty(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_nonempty(self):
def test_to_dataframe_w_bqstorage_empty_streams(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
)
session.avro_schema.schema = json.dumps(
{
"fields": [
{"name": "colA"},
# Not alphabetical to test column order.
{"name": "colC"},
{"name": "colB"},
]
}
)
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_rowstream.to_dataframe.return_value = pandas.DataFrame(
[
{"colA": 1, "colB": "abc", "colC": 2.0},
{"colA": -1, "colB": "def", "colC": 4.0},
]
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows
mock_pages = mock.PropertyMock(return_value=())
type(mock_rows).pages = mock_pages

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

got = row_iterator.to_dataframe(bqstorage_client)

column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertTrue(got.empty)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_nonempty(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
Expand All @@ -1775,7 +1829,27 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
}
)
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
bqstorage_client.read_rows.return_value = mock_rowstream

mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
Expand All @@ -1791,10 +1865,95 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
selected_fields=schema,
)

got = row_iterator.to_dataframe(bqstorage_client)
with mock.patch(
"concurrent.futures.wait", wraps=concurrent.futures.wait
) as mock_wait:
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 2)
self.assertEqual(len(got.index), 6)
# Make sure that this test looped through multiple progress intervals.
self.assertGreaterEqual(mock_wait.call_count, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[
# Use two streams because one will fail with a
# KeyboardInterrupt, and we want to check that the other stream
# ends early.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
)
session.avro_schema.schema = json.dumps(
{"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]}
)
bqstorage_client.create_read_session.return_value = session

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages
mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_rowstream.rows.return_value = mock_rows

mock_cancelled_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_cancelled_pages = mock.PropertyMock(side_effect=KeyboardInterrupt)
type(mock_cancelled_rows).pages = mock_cancelled_pages
mock_cancelled_rowstream = mock.create_autospec(reader.ReadRowsStream)
mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows

bqstorage_client.read_rows.side_effect = (
mock_cancelled_rowstream,
mock_rowstream,
)

schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
]

row_iterator = mut.RowIterator(
_mock_client(),
None, # api_request: ignored
None, # path: ignored
schema,
table=mut.TableReference.from_string("proj.dset.tbl"),
selected_fields=schema,
)

with pytest.raises(KeyboardInterrupt):
row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

# Should not have fetched the third page of results because exit_early
# should have been set.
self.assertLessEqual(mock_page.to_dataframe.call_count, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
Expand Down