Skip to content

Commit

Permalink
BigQuery Storage: Add more in-depth system tests (googleapis#8992)
Browse files Browse the repository at this point in the history
* Add additional BQ storage system test fixtures

* Add reader column selection system test

* Add basic reader system test

* Add reader with row filter system test

* Add reading data with snapshot system test

* Add reading column partitioned table system test

* Add system test for column types data conversions

* Add ingestion time partitioned table system test

* Add system test for resuming a read at an offset

* Remove unnecessary protobuf install in noxfile

* Add TODO comment to replace a test helper method

A similar method is planned to be added to the library itself, and when
done, the _add_rows() will not be needed anymore.

* Extract BQ client to session fixture in tests

Creating a client once per system tests session avoids the overhead
of authenticating before each test case.

* Only create BQ storage client once per test run

Creating a client just once avoids the auth overhead on every system
test case.

* Add common credentials fixture for system tests
  • Loading branch information
plamut authored and emar-kar committed Sep 18, 2019
1 parent f41f85c commit aad16d2
Show file tree
Hide file tree
Showing 4 changed files with 553 additions and 4 deletions.
1 change: 1 addition & 0 deletions bigquery_storage/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def system(session):
session.install("-e", local_dep)
session.install("-e", "../test_utils/")
session.install("-e", ".[fastavro,pandas,pyarrow]")
session.install("-e", "../bigquery/")
session.install("-e", ".")

# Run py.test against the system tests.
Expand Down
6 changes: 6 additions & 0 deletions bigquery_storage/tests/system/assets/people_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
first_name,last_name,age
John,Doe,42
Jack,Black,53
Nick,Sleek,24
Kevin,Powell,50
Johnny,Young,2
191 changes: 187 additions & 4 deletions bigquery_storage/tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,203 @@
"""System tests for reading rows from tables."""

import os
import uuid

import pytest

from google.cloud import bigquery_storage_v1beta1


@pytest.fixture()
_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets")


@pytest.fixture(scope="session")
def project_id():
return os.environ["PROJECT_ID"]


@pytest.fixture()
def client():
return bigquery_storage_v1beta1.BigQueryStorageClient()
@pytest.fixture(scope="session")
def credentials():
from google.oauth2 import service_account

# NOTE: the test config in noxfile checks that the env variable is indeed set
filename = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
return service_account.Credentials.from_service_account_file(filename)


@pytest.fixture(scope="session")
def bq_client(credentials):
from google.cloud import bigquery

return bigquery.Client(credentials=credentials)


@pytest.fixture(scope="session")
def dataset(project_id, bq_client):
from google.cloud import bigquery

unique_suffix = str(uuid.uuid4()).replace("-", "_")
dataset_name = "bq_storage_system_tests_" + unique_suffix

dataset_id = "{}.{}".format(project_id, dataset_name)
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
created_dataset = bq_client.create_dataset(dataset)

yield created_dataset

bq_client.delete_dataset(dataset, delete_contents=True)


@pytest.fixture(scope="session")
def table(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("first_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]

table_id = "{}.{}.{}".format(project_id, dataset.dataset_id, "users")
bq_table = bigquery.Table(table_id, schema=schema)
created_table = bq_client.create_table(bq_table)

yield created_table

bq_client.delete_table(created_table)


@pytest.fixture
def table_with_data_ref(dataset, table, bq_client):
from google.cloud import bigquery

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.schema = table.schema

filename = os.path.join(_ASSETS_DIR, "people_data.csv")

with open(filename, "rb") as source_file:
job = bq_client.load_table_from_file(source_file, table, job_config=job_config)

job.result() # wait for the load to complete

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = table.project
table_ref.dataset_id = table.dataset_id
table_ref.table_id = table.table_id
yield table_ref

# truncate table data
query = "DELETE FROM {}.{} WHERE 1 = 1".format(dataset.dataset_id, table.table_id)
query_job = bq_client.query(query, location="US")
query_job.result()


@pytest.fixture
def col_partition_table_ref(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("occurred", "DATE", mode="REQUIRED"),
bigquery.SchemaField("description", "STRING", mode="REQUIRED"),
]
time_partitioning = bigquery.table.TimePartitioning(
type_=bigquery.table.TimePartitioningType.DAY, field="occurred"
)
bq_table = bigquery.table.Table(
table_ref="{}.{}.notable_events".format(project_id, dataset.dataset_id),
schema=schema,
)
bq_table.time_partitioning = time_partitioning

created_table = bq_client.create_table(bq_table)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = created_table.project
table_ref.dataset_id = created_table.dataset_id
table_ref.table_id = created_table.table_id
yield table_ref

bq_client.delete_table(created_table)


@pytest.fixture
def ingest_partition_table_ref(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("shape", "STRING", mode="REQUIRED"),
bigquery.SchemaField("altitude", "INT64", mode="REQUIRED"),
]
time_partitioning = bigquery.table.TimePartitioning(
type_=bigquery.table.TimePartitioningType.DAY,
field=None, # use _PARTITIONTIME pseudo column
)
bq_table = bigquery.table.Table(
table_ref="{}.{}.ufo_sightings".format(project_id, dataset.dataset_id),
schema=schema,
)
bq_table.time_partitioning = time_partitioning

created_table = bq_client.create_table(bq_table)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = created_table.project
table_ref.dataset_id = created_table.dataset_id
table_ref.table_id = created_table.table_id
yield table_ref

bq_client.delete_table(created_table)


@pytest.fixture
def all_types_table_ref(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("string_field", "STRING"),
bigquery.SchemaField("bytes_field", "BYTES"),
bigquery.SchemaField("int64_field", "INT64"),
bigquery.SchemaField("float64_field", "FLOAT64"),
bigquery.SchemaField("numeric_field", "NUMERIC"),
bigquery.SchemaField("bool_field", "BOOL"),
bigquery.SchemaField("geography_field", "GEOGRAPHY"),
bigquery.SchemaField(
"person_struct_field",
"STRUCT",
fields=(
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("age", "INT64"),
),
),
bigquery.SchemaField("timestamp_field", "TIMESTAMP"),
bigquery.SchemaField("date_field", "DATE"),
bigquery.SchemaField("time_field", "TIME"),
bigquery.SchemaField("datetime_field", "DATETIME"),
bigquery.SchemaField("string_array_field", "STRING", mode="REPEATED"),
]
bq_table = bigquery.table.Table(
table_ref="{}.{}.complex_records".format(project_id, dataset.dataset_id),
schema=schema,
)

created_table = bq_client.create_table(bq_table)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = created_table.project
table_ref.dataset_id = created_table.dataset_id
table_ref.table_id = created_table.table_id
yield table_ref

bq_client.delete_table(created_table)


@pytest.fixture(scope="session")
def client(credentials):
return bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)


@pytest.fixture()
Expand Down
Loading

0 comments on commit aad16d2

Please sign in to comment.