Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): allow passing schema as a sequence of dicts #9550

Merged
merged 11 commits into from
Nov 3, 2019
27 changes: 25 additions & 2 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,44 @@ def _row_tuple_from_json(row, schema):

Args:
row (Dict): A JSON response row to be converted.
schema (Tuple): A tuple of :class:`~google.cloud.bigquery.schema.SchemaField`.
schema (Sequence[Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
plamut marked this conversation as resolved.
Show resolved Hide resolved
Sequence[Mapping[str, str]] \
plamut marked this conversation as resolved.
Show resolved Hide resolved
]]): Specification of the field types in ``row``.

Returns:
Tuple: A tuple of data converted to native types.
"""
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)

row_data = []
for field, cell in zip(schema, row["f"]):
row_data.append(_field_from_json(cell["v"], field))
return tuple(row_data)


def _rows_from_json(values, schema):
"""Convert JSON row data to rows with appropriate types."""
"""Convert JSON row data to rows with appropriate types.

Args:
values (Sequence[Dict]): The list of responses (JSON rows) to convert.
schema (Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]):
The table's schema. If given as a sequence of dicts, their content
must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.

Returns:
List[:class:`~google.cloud.bigquery.Row`]
"""
from google.cloud.bigquery import Row
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)
field_to_index = _field_to_index_mapping(schema)
return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values]

Expand Down
54 changes: 44 additions & 10 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame for which the client determines the BigQuery schema.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]]):
A BigQuery schema. Use this argument to override the autodetected
type for some or all of the DataFrame columns.

Expand All @@ -249,6 +252,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
any column cannot be determined.
"""
if bq_schema:
bq_schema = schema._to_schema_fields(bq_schema)
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
Expand Down Expand Up @@ -297,7 +301,10 @@ def dataframe_to_arrow(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Arrow table.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.

Expand All @@ -310,6 +317,8 @@ def dataframe_to_arrow(dataframe, bq_schema):
column_and_index_names = set(
name for name, _ in list_columns_and_indexes(dataframe)
)

bq_schema = schema._to_schema_fields(bq_schema)
bq_field_names = set(field.name for field in bq_schema)

extra_fields = bq_field_names - column_and_index_names
Expand Down Expand Up @@ -354,7 +363,10 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
filepath (str):
Expand All @@ -368,6 +380,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)

Expand All @@ -388,20 +401,24 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, schema):
def download_arrow_tabledata_list(pages, bq_schema):
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""Use tabledata.list to construct an iterable of RecordBatches.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]]):
A decription of the fields in result pages.
Yields:
:class:`pyarrow.RecordBatch`
The next page of records as a ``pyarrow`` record batch.
"""
column_names = bq_to_arrow_schema(schema) or [field.name for field in schema]
arrow_types = [bq_to_arrow_data_type(field) for field in schema]
bq_schema = schema._to_schema_fields(bq_schema)
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
Expand All @@ -422,9 +439,26 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_names = [field.name for field in schema]
def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""Use (slower, but free) tabledata.list to construct a DataFrame.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
bq_schema (Sequence[Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]]):
A decription of the fields in result pages.
dtypes(Mapping[str, numpy.dtype]):
The types of columns in result data to hint construction of the
resulting DataFrame. Not all column types have to be specified.
Yields:
:class:`pandas.DataFrame`
The next page of records as a ``pandas.DataFrame`` record batch.
"""
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)

Expand Down
11 changes: 7 additions & 4 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1225,8 +1226,10 @@ def range_partitioning(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Schema of the
destination table.
"""Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]: Schema of the destination table.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.schema
Expand All @@ -1242,8 +1245,8 @@ def schema(self, value):
self._del_sub_prop("schema")
return

if not all(hasattr(field, "to_api_repr") for field in value):
raise ValueError("Schema items must be fields")
value = _to_schema_fields(value)

_helpers._set_sub_prop(
self._properties,
["load", "schema", "fields"],
Expand Down
40 changes: 40 additions & 0 deletions bigquery/google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Schemas for BigQuery tables / queries."""

import collections

from google.cloud.bigquery_v2 import types


Expand Down Expand Up @@ -256,3 +258,41 @@ def _build_schema_resource(fields):
Sequence[Dict]: Mappings describing the schema of the supplied fields.
"""
return [field.to_api_repr() for field in fields]


def _to_schema_fields(schema):
"""Coerce `schema` to a list of schema field instances.

Args:
schema(Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]):
Table schema to convert. If given as a sequence of
mappings, their content must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.

Returns:
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]

Raises:
TypeError: If `schema` is not a sequence.
ValueError:
If any item in the sequence is not a
:class:`~google.cloud.bigquery.schema.SchemaField` or a
compatible mapping representation of the field.
"""
for field in schema:
if not isinstance(field, (SchemaField, collections.Mapping)):
raise ValueError(
"Schema items must either be fields or compatible "
"mapping representations."
)

if schema and not isinstance(schema[0], SchemaField):
try:
schema = [SchemaField.from_api_repr(field) for field in schema]
except Exception as exc:
raise ValueError("Invalid field representation: {!r}".format(exc))

return schema
plamut marked this conversation as resolved.
Show resolved Hide resolved
37 changes: 27 additions & 10 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

Expand Down Expand Up @@ -305,8 +305,13 @@ class Table(object):
A pointer to a table. If ``table_ref`` is a string, it must
included a project ID, dataset ID, and table ID, each separated
by ``.``.
schema (List[google.cloud.bigquery.schema.SchemaField]):
The table's schema
schema (Optional[Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]]):
The table's schema. If given as a sequence of dicts, their content
must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
"""

_PROPERTY_TO_API_FIELD = {
Expand Down Expand Up @@ -369,13 +374,17 @@ def require_partition_filter(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Table's schema.
"""Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]: Table's schema.

Raises:
TypeError: If 'value' is not a sequence
TypeError: If `value` is not a sequence
ValueError:
If any item in the sequence is not a
:class:`~google.cloud.bigquery.schema.SchemaField`
:class:`~google.cloud.bigquery.schema.SchemaField` or a
compatible mapping representation.
"""
prop = self._properties.get("schema")
if not prop:
Expand All @@ -387,10 +396,10 @@ def schema(self):
def schema(self, value):
if value is None:
self._properties["schema"] = None
elif not all(isinstance(field, SchemaField) for field in value):
raise ValueError("Schema items must be fields")
else:
self._properties["schema"] = {"fields": _build_schema_resource(value)}
return
plamut marked this conversation as resolved.
Show resolved Hide resolved

value = _to_schema_fields(value)
self._properties["schema"] = {"fields": _build_schema_resource(value)}

@property
def labels(self):
Expand Down Expand Up @@ -1284,6 +1293,13 @@ class RowIterator(HTTPIterator):
api_request (Callable[google.cloud._http.JSONConnection.api_request]):
The function to use to make API requests.
path (str): The method path to query for the list of items.
schema (Union[ \
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`], \
Sequence[Mapping[str, str]] \
]):
The table's schema. If given as a sequence of dicts, their content
must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
page_token (str): A token identifying a page in a result set to start
fetching results from.
max_results (int, optional): The maximum number of results to fetch.
Expand Down Expand Up @@ -1328,6 +1344,7 @@ def __init__(
page_start=_rows_page_start,
next_token="pageToken",
)
schema = _to_schema_fields(schema)
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
Expand Down
28 changes: 26 additions & 2 deletions bigquery/tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import decimal
import unittest

import mock


class Test_not_null(unittest.TestCase):
def _call_fut(self, value, field):
Expand Down Expand Up @@ -412,7 +414,8 @@ class Test_row_tuple_from_json(unittest.TestCase):
def _call_fut(self, row, schema):
from google.cloud.bigquery._helpers import _row_tuple_from_json

return _row_tuple_from_json(row, schema)
with _field_isinstance_patcher():
return _row_tuple_from_json(row, schema)

def test_w_single_scalar_column(self):
# SELECT 1 AS col
Expand Down Expand Up @@ -529,7 +532,8 @@ class Test_rows_from_json(unittest.TestCase):
def _call_fut(self, rows, schema):
from google.cloud.bigquery._helpers import _rows_from_json

return _rows_from_json(rows, schema)
with _field_isinstance_patcher():
return _rows_from_json(rows, schema)

def test_w_record_subfield(self):
from google.cloud.bigquery.table import Row
Expand Down Expand Up @@ -1023,3 +1027,23 @@ def __init__(self, mode, name="unknown", field_type="UNKNOWN", fields=()):
self.name = name
self.field_type = field_type
self.fields = fields


def _field_isinstance_patcher():
"""A patcher thank makes _Field instances seem like SchemaField instances.
"""
from google.cloud.bigquery.schema import SchemaField

def fake_isinstance(instance, target_class):
if instance.__class__.__name__ != "_Field":
return isinstance(instance, target_class) # pragma: NO COVER

# pretend that _Field() instances are actually instances of SchemaField
return target_class is SchemaField or (
isinstance(target_class, tuple) and SchemaField in target_class
)

patcher = mock.patch(
"google.cloud.bigquery.schema.isinstance", side_effect=fake_isinstance
)
return patcher
Loading