Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): allow passing schema as a sequence of dicts #9550

Merged
merged 11 commits into from
Nov 3, 2019
27 changes: 25 additions & 2 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,44 @@ def _row_tuple_from_json(row, schema):

Args:
row (Dict): A JSON response row to be converted.
schema (Tuple): A tuple of :class:`~google.cloud.bigquery.schema.SchemaField`.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]): Specification of the field types in ``row``.

Returns:
Tuple: A tuple of data converted to native types.
"""
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)

row_data = []
for field, cell in zip(schema, row["f"]):
row_data.append(_field_from_json(cell["v"], field))
return tuple(row_data)


def _rows_from_json(values, schema):
"""Convert JSON row data to rows with appropriate types."""
"""Convert JSON row data to rows with appropriate types.

Args:
values (Sequence[Dict]): The list of responses (JSON rows) to convert.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.

Returns:
List[:class:`~google.cloud.bigquery.Row`]
"""
from google.cloud.bigquery import Row
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)
field_to_index = _field_to_index_mapping(schema)
return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values]

Expand Down
58 changes: 46 additions & 12 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame for which the client determines the BigQuery schema.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A BigQuery schema. Use this argument to override the autodetected
type for some or all of the DataFrame columns.

Expand All @@ -249,6 +252,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
any column cannot be determined.
"""
if bq_schema:
bq_schema = schema._to_schema_fields(bq_schema)
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
Expand Down Expand Up @@ -297,9 +301,12 @@ def dataframe_to_arrow(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Arrow table.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Desired BigQuery schema. The number of columns must match the
number of columns in the DataFrame.

Returns:
pyarrow.Table:
Expand All @@ -310,6 +317,8 @@ def dataframe_to_arrow(dataframe, bq_schema):
column_and_index_names = set(
name for name, _ in list_columns_and_indexes(dataframe)
)

bq_schema = schema._to_schema_fields(bq_schema)
bq_field_names = set(field.name for field in bq_schema)

extra_fields = bq_field_names - column_and_index_names
Expand Down Expand Up @@ -354,7 +363,10 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
filepath (str):
Expand All @@ -368,6 +380,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)

Expand All @@ -388,20 +401,24 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, schema):
def download_arrow_tabledata_list(pages, bq_schema):
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""Use tabledata.list to construct an iterable of RecordBatches.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A decription of the fields in result pages.
Yields:
:class:`pyarrow.RecordBatch`
The next page of records as a ``pyarrow`` record batch.
"""
column_names = bq_to_arrow_schema(schema) or [field.name for field in schema]
arrow_types = [bq_to_arrow_data_type(field) for field in schema]
bq_schema = schema._to_schema_fields(bq_schema)
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
Expand All @@ -422,9 +439,26 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_names = [field.name for field in schema]
def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""Use (slower, but free) tabledata.list to construct a DataFrame.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A decription of the fields in result pages.
dtypes(Mapping[str, numpy.dtype]):
The types of columns in result data to hint construction of the
resulting DataFrame. Not all column types have to be specified.
Yields:
:class:`pandas.DataFrame`
The next page of records as a ``pandas.DataFrame`` record batch.
"""
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)

Expand Down
11 changes: 7 additions & 4 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1225,8 +1226,10 @@ def range_partitioning(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Schema of the
destination table.
"""Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]: Schema of the destination table.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.schema
Expand All @@ -1242,8 +1245,8 @@ def schema(self, value):
self._del_sub_prop("schema")
return

if not all(hasattr(field, "to_api_repr") for field in value):
raise ValueError("Schema items must be fields")
value = _to_schema_fields(value)

_helpers._set_sub_prop(
self._properties,
["load", "schema", "fields"],
Expand Down
35 changes: 35 additions & 0 deletions bigquery/google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Schemas for BigQuery tables / queries."""

import collections

from google.cloud.bigquery_v2 import types


Expand Down Expand Up @@ -256,3 +258,36 @@ def _build_schema_resource(fields):
Sequence[Dict]: Mappings describing the schema of the supplied fields.
"""
return [field.to_api_repr() for field in fields]


def _to_schema_fields(schema):
"""Coerce `schema` to a list of schema field instances.

Args:
schema(Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Table schema to convert. If some items are passed as mappings,
their content must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.

Returns:
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]

Raises:
Exception: If ``schema`` is not a sequence, or if any item in the
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
for field in schema:
if not isinstance(field, (SchemaField, collections.Mapping)):
raise ValueError(
"Schema items must either be fields or compatible "
"mapping representations."
)

return [
field if isinstance(field, SchemaField) else SchemaField.from_api_repr(field)
for field in schema
]
36 changes: 26 additions & 10 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

Expand Down Expand Up @@ -305,8 +305,13 @@ class Table(object):
A pointer to a table. If ``table_ref`` is a string, it must
included a project ID, dataset ID, and table ID, each separated
by ``.``.
schema (List[google.cloud.bigquery.schema.SchemaField]):
The table's schema
schema (Optional[Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
"""

_PROPERTY_TO_API_FIELD = {
Expand Down Expand Up @@ -369,13 +374,17 @@ def require_partition_filter(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Table's schema.
"""Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]:
Table's schema.

Raises:
TypeError: If 'value' is not a sequence
ValueError:
If any item in the sequence is not a
:class:`~google.cloud.bigquery.schema.SchemaField`
Exception:
If ``schema`` is not a sequence, or if any item in the sequence
is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
prop = self._properties.get("schema")
if not prop:
Expand All @@ -387,9 +396,8 @@ def schema(self):
def schema(self, value):
if value is None:
self._properties["schema"] = None
elif not all(isinstance(field, SchemaField) for field in value):
raise ValueError("Schema items must be fields")
else:
value = _to_schema_fields(value)
self._properties["schema"] = {"fields": _build_schema_resource(value)}

@property
Expand Down Expand Up @@ -1284,6 +1292,13 @@ class RowIterator(HTTPIterator):
api_request (Callable[google.cloud._http.JSONConnection.api_request]):
The function to use to make API requests.
path (str): The method path to query for the list of items.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
page_token (str): A token identifying a page in a result set to start
fetching results from.
max_results (int, optional): The maximum number of results to fetch.
Expand Down Expand Up @@ -1328,6 +1343,7 @@ def __init__(
page_start=_rows_page_start,
next_token="pageToken",
)
schema = _to_schema_fields(schema)
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
Expand Down
4 changes: 2 additions & 2 deletions bigquery/samples/query_external_sheets_permanent_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def query_external_sheets_permanent_table(dataset_id):
external_config.source_uris = [sheet_url]
external_config.options.skip_leading_rows = 1 # Optionally skip header row.
external_config.options.range = (
"us-states!A20:B49"
) # Optionally set range of the sheet to query from.
"us-states!A20:B49" # Optionally set range of the sheet to query from.
)
table.external_data_configuration = external_config

# Create a permanent table linked to the Sheets file.
Expand Down
4 changes: 2 additions & 2 deletions bigquery/samples/query_external_sheets_temporary_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def query_external_sheets_temporary_table():
]
external_config.options.skip_leading_rows = 1 # Optionally skip header row.
external_config.options.range = (
"us-states!A20:B49"
) # Optionally set range of the sheet to query from.
"us-states!A20:B49" # Optionally set range of the sheet to query from.
)
table_id = "us_states"
job_config = bigquery.QueryJobConfig()
job_config.table_definitions = {table_id: external_config}
Expand Down
Loading