Skip to content

Commit

Permalink
Use finally so that 'finished' is always set at end of looping.
Browse files Browse the repository at this point in the history
Update tests to ensure multiple progress interval loops.
  • Loading branch information
tswast committed Apr 12, 2019
1 parent ff0a26b commit 0971781
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
11 changes: 5 additions & 6 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,17 +1403,17 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
if not session.streams:
return pandas.DataFrame(columns=columns)

# Use exit_early to notify worker threads when to quit. See:
# Use finished to notify worker threads when to quit. See:
# https://stackoverflow.com/a/29237343/101923
exit_early = False
finished = False

def get_dataframe(stream):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)

frames = []
for page in rowstream.pages:
if exit_early:
if finished:
return
frames.append(page.to_dataframe(dtypes=dtypes))

Expand Down Expand Up @@ -1446,12 +1446,11 @@ def get_frames(pool):
with concurrent.futures.ThreadPoolExecutor() as pool:
try:
frames = get_frames(pool)
except:
finally:
# No need for a lock because reading/replacing a variable is
# defined to be an atomic operation in the Python language
# definition (enforced by the global interpreter lock).
exit_early = True
raise
finished = True

# Use [columns] to ensure column order matches manually-parsed schema.
return pandas.concat(frames)[columns]
Expand Down
29 changes: 22 additions & 7 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import itertools
import json
import time
Expand Down Expand Up @@ -1784,6 +1785,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
Expand All @@ -1808,12 +1812,18 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.return_value = pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)
mock_pages = mock.PropertyMock(return_value=(mock_page,))
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages

schema = [
Expand All @@ -1831,11 +1841,16 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
selected_fields=schema,
)

got = row_iterator.to_dataframe(bqstorage_client)
with mock.patch(
"concurrent.futures.wait", wraps=concurrent.futures.wait
) as mock_wait:
got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client)

column_names = ["colA", "colC", "colB"]
self.assertEqual(list(got), column_names)
self.assertEqual(len(got.index), 2)
self.assertEqual(len(got.index), 6)
# Make sure that this test looped through multiple progress intervals.
self.assertGreaterEqual(mock_wait.call_count, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
Expand Down

0 comments on commit 0971781

Please sign in to comment.