Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Replace avro with arrow schemas in test_table.py #9056

Merged
merged 1 commit into from
Aug 20, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 50 additions & 30 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import itertools
import json
import logging
import time
import unittest
Expand Down Expand Up @@ -2271,26 +2270,26 @@ def test_to_dataframe_w_bqstorage_logs_session(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_to_dataframe_w_bqstorage_empty_streams(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

arrow_fields = [
pyarrow.field("colA", pyarrow.int64()),
# Not alphabetical to test column order.
pyarrow.field("colC", pyarrow.float64()),
pyarrow.field("colB", pyarrow.utf8()),
]
arrow_schema = pyarrow.schema(arrow_fields)

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession(
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}]
)
session.avro_schema.schema = json.dumps(
{
"fields": [
{"name": "colA"},
# Not alphabetical to test column order.
{"name": "colC"},
{"name": "colB"},
]
}
streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}],
arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()},
)
bqstorage_client.create_read_session.return_value = session

Expand Down Expand Up @@ -2327,11 +2326,20 @@ def test_to_dataframe_w_bqstorage_empty_streams(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_to_dataframe_w_bqstorage_nonempty(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

arrow_fields = [
pyarrow.field("colA", pyarrow.int64()),
# Not alphabetical to test column order.
pyarrow.field("colC", pyarrow.float64()),
pyarrow.field("colB", pyarrow.utf8()),
]
arrow_schema = pyarrow.schema(arrow_fields)

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
Expand All @@ -2340,16 +2348,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
session.avro_schema.schema = json.dumps(
{
"fields": [
{"name": "colA"},
# Not alphabetical to test column order.
{"name": "colC"},
{"name": "colB"},
]
}
session = bigquery_storage_v1beta1.types.ReadSession(
streams=streams,
arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()},
)
bqstorage_client.create_read_session.return_value = session

Expand Down Expand Up @@ -2400,17 +2401,23 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
from google.cloud.bigquery_storage_v1beta1 import reader

arrow_fields = [pyarrow.field("colA", pyarrow.int64())]
arrow_schema = pyarrow.schema(arrow_fields)

streams = [
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
session.avro_schema.schema = json.dumps({"fields": [{"name": "colA"}]})
session = bigquery_storage_v1beta1.types.ReadSession(
streams=streams,
arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()},
)

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
Expand Down Expand Up @@ -2448,6 +2455,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
@mock.patch("tqdm.tqdm")
def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
from google.cloud.bigquery import schema
Expand All @@ -2457,6 +2465,9 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

arrow_fields = [pyarrow.field("testcol", pyarrow.int64())]
arrow_schema = pyarrow.schema(arrow_fields)

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
Expand All @@ -2466,8 +2477,10 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
session = bigquery_storage_v1beta1.types.ReadSession(streams=streams)
session.avro_schema.schema = json.dumps({"fields": [{"name": "testcol"}]})
session = bigquery_storage_v1beta1.types.ReadSession(
streams=streams,
arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()},
)
bqstorage_client.create_read_session.return_value = session

mock_rowstream = mock.create_autospec(reader.ReadRowsStream)
Expand Down Expand Up @@ -2521,6 +2534,7 @@ def blocking_to_dataframe(*args, **kwargs):
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
from google.cloud.bigquery import schema
from google.cloud.bigquery import table as mut
Expand All @@ -2529,6 +2543,14 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
# Speed up testing.
mut._PROGRESS_INTERVAL = 0.01

arrow_fields = [
pyarrow.field("colA", pyarrow.int64()),
# Not alphabetical to test column order.
pyarrow.field("colC", pyarrow.float64()),
pyarrow.field("colB", pyarrow.utf8()),
]
arrow_schema = pyarrow.schema(arrow_fields)

bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
Expand All @@ -2539,10 +2561,8 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
# ends early.
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"},
{"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"},
]
)
session.avro_schema.schema = json.dumps(
{"fields": [{"name": "colA"}, {"name": "colB"}, {"name": "colC"}]}
],
arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()},
)
bqstorage_client.create_read_session.return_value = session

Expand Down