From 89eaedb4a40bd1268df134cc40a4063d6cf8b823 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Sun, 3 Nov 2019 23:08:37 +0200 Subject: [PATCH] feat(bigquery): allow passing schema as a sequence of dicts (#9550) * feat(bigquery): add _to_schema_fields() schema helper * Allow passing schema as dicts _helpers * Allow passing schema as dicts in table.py * Allow passing schema as dicts in job.py * Import SchemaField directly in several tests SchemaField should not be imported from bigquery.table, but directly from where it's defined, so that any changes to the imports in bigquery.table do not cause unnecessary test failures. * Allow passing schema as dicts in pandas helpers * Replace return statement with an else block * Alter the type spec of values in schema field dict * Blacken a few files * Simplify _to_schema_fields() schema helper * Update docstrings for schema parameter --- bigquery/google/cloud/bigquery/_helpers.py | 27 +++- .../google/cloud/bigquery/_pandas_helpers.py | 58 ++++++-- bigquery/google/cloud/bigquery/job.py | 11 +- bigquery/google/cloud/bigquery/schema.py | 35 +++++ bigquery/google/cloud/bigquery/table.py | 36 +++-- bigquery/tests/unit/test__helpers.py | 28 +++- bigquery/tests/unit/test__pandas_helpers.py | 137 ++++++++++++++++++ bigquery/tests/unit/test_client.py | 47 +++--- bigquery/tests/unit/test_job.py | 38 ++++- bigquery/tests/unit/test_schema.py | 66 +++++++++ bigquery/tests/unit/test_table.py | 108 +++++++++++--- 11 files changed, 516 insertions(+), 75 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 266bfc2c666c..98eadb0a2f8e 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -224,11 +224,18 @@ def _row_tuple_from_json(row, schema): Args: row (Dict): A JSON response row to be converted. - schema (Tuple): A tuple of :class:`~google.cloud.bigquery.schema.SchemaField`. + schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): Specification of the field types in ``row``. Returns: Tuple: A tuple of data converted to native types. """ + from google.cloud.bigquery.schema import _to_schema_fields + + schema = _to_schema_fields(schema) + row_data = [] for field, cell in zip(schema, row["f"]): row_data.append(_field_from_json(cell["v"], field)) @@ -236,9 +243,25 @@ def _row_tuple_from_json(row, schema): def _rows_from_json(values, schema): - """Convert JSON row data to rows with appropriate types.""" + """Convert JSON row data to rows with appropriate types. + + Args: + values (Sequence[Dict]): The list of responses (JSON rows) to convert. + schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + The table's schema. If any item is a mapping, its content must be + compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. + + Returns: + List[:class:`~google.cloud.bigquery.Row`] + """ from google.cloud.bigquery import Row + from google.cloud.bigquery.schema import _to_schema_fields + schema = _to_schema_fields(schema) field_to_index = _field_to_index_mapping(schema) return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values] diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index c7edf2ae51f5..aeb18c2d213d 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -239,7 +239,10 @@ def dataframe_to_bq_schema(dataframe, bq_schema): Args: dataframe (pandas.DataFrame): DataFrame for which the client determines the BigQuery schema. - bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): A BigQuery schema. Use this argument to override the autodetected type for some or all of the DataFrame columns. @@ -249,6 +252,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema): any column cannot be determined. """ if bq_schema: + bq_schema = schema._to_schema_fields(bq_schema) for field in bq_schema: if field.field_type in schema._STRUCT_TYPES: raise ValueError( @@ -297,9 +301,12 @@ def dataframe_to_arrow(dataframe, bq_schema): Args: dataframe (pandas.DataFrame): DataFrame to convert to Arrow table. - bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): - Desired BigQuery schema. Number of columns must match number of - columns in the DataFrame. + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + Desired BigQuery schema. The number of columns must match the + number of columns in the DataFrame. Returns: pyarrow.Table: @@ -310,6 +317,8 @@ def dataframe_to_arrow(dataframe, bq_schema): column_and_index_names = set( name for name, _ in list_columns_and_indexes(dataframe) ) + + bq_schema = schema._to_schema_fields(bq_schema) bq_field_names = set(field.name for field in bq_schema) extra_fields = bq_field_names - column_and_index_names @@ -354,7 +363,10 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN Args: dataframe (pandas.DataFrame): DataFrame to convert to Parquet file. - bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): Desired BigQuery schema. Number of columns must match number of columns in the DataFrame. filepath (str): @@ -368,6 +380,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN if pyarrow is None: raise ValueError("pyarrow is required for BigQuery schema conversion.") + bq_schema = schema._to_schema_fields(bq_schema) arrow_table = dataframe_to_arrow(dataframe, bq_schema) pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression) @@ -388,20 +401,24 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types): return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) -def download_arrow_tabledata_list(pages, schema): +def download_arrow_tabledata_list(pages, bq_schema): """Use tabledata.list to construct an iterable of RecordBatches. Args: pages (Iterator[:class:`google.api_core.page_iterator.Page`]): An iterator over the result pages. - schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): A decription of the fields in result pages. Yields: :class:`pyarrow.RecordBatch` The next page of records as a ``pyarrow`` record batch. """ - column_names = bq_to_arrow_schema(schema) or [field.name for field in schema] - arrow_types = [bq_to_arrow_data_type(field) for field in schema] + bq_schema = schema._to_schema_fields(bq_schema) + column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] + arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] for page in pages: yield _tabledata_list_page_to_arrow(page, column_names, arrow_types) @@ -422,9 +439,26 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes): return pandas.DataFrame(columns, columns=column_names) -def download_dataframe_tabledata_list(pages, schema, dtypes): - """Use (slower, but free) tabledata.list to construct a DataFrame.""" - column_names = [field.name for field in schema] +def download_dataframe_tabledata_list(pages, bq_schema, dtypes): + """Use (slower, but free) tabledata.list to construct a DataFrame. + + Args: + pages (Iterator[:class:`google.api_core.page_iterator.Page`]): + An iterator over the result pages. + bq_schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + A decription of the fields in result pages. + dtypes(Mapping[str, numpy.dtype]): + The types of columns in result data to hint construction of the + resulting DataFrame. Not all column types have to be specified. + Yields: + :class:`pandas.DataFrame` + The next page of records as a ``pandas.DataFrame`` record batch. + """ + bq_schema = schema._to_schema_fields(bq_schema) + column_names = [field.name for field in bq_schema] for page in pages: yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 96724c9f805b..a8d797f4bef5 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -38,6 +38,7 @@ from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.routine import RoutineReference from google.cloud.bigquery.schema import SchemaField +from google.cloud.bigquery.schema import _to_schema_fields from google.cloud.bigquery.table import _EmptyRowIterator from google.cloud.bigquery.table import RangePartitioning from google.cloud.bigquery.table import _table_arg_to_table_ref @@ -1225,8 +1226,10 @@ def range_partitioning(self, value): @property def schema(self): - """List[google.cloud.bigquery.schema.SchemaField]: Schema of the - destination table. + """Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]: Schema of the destination table. See https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.schema @@ -1242,8 +1245,8 @@ def schema(self, value): self._del_sub_prop("schema") return - if not all(hasattr(field, "to_api_repr") for field in value): - raise ValueError("Schema items must be fields") + value = _to_schema_fields(value) + _helpers._set_sub_prop( self._properties, ["load", "schema", "fields"], diff --git a/bigquery/google/cloud/bigquery/schema.py b/bigquery/google/cloud/bigquery/schema.py index cb94133abdad..d766cb542608 100644 --- a/bigquery/google/cloud/bigquery/schema.py +++ b/bigquery/google/cloud/bigquery/schema.py @@ -14,6 +14,8 @@ """Schemas for BigQuery tables / queries.""" +import collections + from google.cloud.bigquery_v2 import types @@ -256,3 +258,36 @@ def _build_schema_resource(fields): Sequence[Dict]: Mappings describing the schema of the supplied fields. """ return [field.to_api_repr() for field in fields] + + +def _to_schema_fields(schema): + """Coerce `schema` to a list of schema field instances. + + Args: + schema(Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + Table schema to convert. If some items are passed as mappings, + their content must be compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. + + Returns: + Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`] + + Raises: + Exception: If ``schema`` is not a sequence, or if any item in the + sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField` + instance or a compatible mapping representation of the field. + """ + for field in schema: + if not isinstance(field, (SchemaField, collections.Mapping)): + raise ValueError( + "Schema items must either be fields or compatible " + "mapping representations." + ) + + return [ + field if isinstance(field, SchemaField) else SchemaField.from_api_repr(field) + for field in schema + ] diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7e36c582c42b..2f2ee50cc89e 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -51,9 +51,9 @@ import google.cloud._helpers from google.cloud.bigquery import _helpers from google.cloud.bigquery import _pandas_helpers -from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.schema import _build_schema_resource from google.cloud.bigquery.schema import _parse_schema_resource +from google.cloud.bigquery.schema import _to_schema_fields from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration @@ -305,8 +305,13 @@ class Table(object): A pointer to a table. If ``table_ref`` is a string, it must included a project ID, dataset ID, and table ID, each separated by ``.``. - schema (List[google.cloud.bigquery.schema.SchemaField]): - The table's schema + schema (Optional[Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]]): + The table's schema. If any item is a mapping, its content must be + compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. """ _PROPERTY_TO_API_FIELD = { @@ -369,13 +374,17 @@ def require_partition_filter(self, value): @property def schema(self): - """List[google.cloud.bigquery.schema.SchemaField]: Table's schema. + """Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]: + Table's schema. Raises: - TypeError: If 'value' is not a sequence - ValueError: - If any item in the sequence is not a - :class:`~google.cloud.bigquery.schema.SchemaField` + Exception: + If ``schema`` is not a sequence, or if any item in the sequence + is not a :class:`~google.cloud.bigquery.schema.SchemaField` + instance or a compatible mapping representation of the field. """ prop = self._properties.get("schema") if not prop: @@ -387,9 +396,8 @@ def schema(self): def schema(self, value): if value is None: self._properties["schema"] = None - elif not all(isinstance(field, SchemaField) for field in value): - raise ValueError("Schema items must be fields") else: + value = _to_schema_fields(value) self._properties["schema"] = {"fields": _build_schema_resource(value)} @property @@ -1284,6 +1292,13 @@ class RowIterator(HTTPIterator): api_request (Callable[google.cloud._http.JSONConnection.api_request]): The function to use to make API requests. path (str): The method path to query for the list of items. + schema (Sequence[Union[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + Mapping[str, Any] \ + ]]): + The table's schema. If any item is a mapping, its content must be + compatible with + :meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`. page_token (str): A token identifying a page in a result set to start fetching results from. max_results (int, optional): The maximum number of results to fetch. @@ -1328,6 +1343,7 @@ def __init__( page_start=_rows_page_start, next_token="pageToken", ) + schema = _to_schema_fields(schema) self._field_to_index = _helpers._field_to_index_mapping(schema) self._page_size = page_size self._preserve_order = False diff --git a/bigquery/tests/unit/test__helpers.py b/bigquery/tests/unit/test__helpers.py index 3884695d83af..6d92b4de73ba 100644 --- a/bigquery/tests/unit/test__helpers.py +++ b/bigquery/tests/unit/test__helpers.py @@ -17,6 +17,8 @@ import decimal import unittest +import mock + class Test_not_null(unittest.TestCase): def _call_fut(self, value, field): @@ -412,7 +414,8 @@ class Test_row_tuple_from_json(unittest.TestCase): def _call_fut(self, row, schema): from google.cloud.bigquery._helpers import _row_tuple_from_json - return _row_tuple_from_json(row, schema) + with _field_isinstance_patcher(): + return _row_tuple_from_json(row, schema) def test_w_single_scalar_column(self): # SELECT 1 AS col @@ -529,7 +532,8 @@ class Test_rows_from_json(unittest.TestCase): def _call_fut(self, rows, schema): from google.cloud.bigquery._helpers import _rows_from_json - return _rows_from_json(rows, schema) + with _field_isinstance_patcher(): + return _rows_from_json(rows, schema) def test_w_record_subfield(self): from google.cloud.bigquery.table import Row @@ -1023,3 +1027,23 @@ def __init__(self, mode, name="unknown", field_type="UNKNOWN", fields=()): self.name = name self.field_type = field_type self.fields = fields + + +def _field_isinstance_patcher(): + """A patcher thank makes _Field instances seem like SchemaField instances. + """ + from google.cloud.bigquery.schema import SchemaField + + def fake_isinstance(instance, target_class): + if instance.__class__.__name__ != "_Field": + return isinstance(instance, target_class) # pragma: NO COVER + + # pretend that _Field() instances are actually instances of SchemaField + return target_class is SchemaField or ( + isinstance(target_class, tuple) and SchemaField in target_class + ) + + patcher = mock.patch( + "google.cloud.bigquery.schema.isinstance", side_effect=fake_isinstance + ) + return patcher diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index 6f228fafcf8e..56ac62820841 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -701,6 +701,32 @@ def test_list_columns_and_indexes_with_multiindex(module_under_test): assert columns_and_indexes == expected +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_dataframe_to_bq_schema_dict_sequence(module_under_test): + df_data = collections.OrderedDict( + [ + ("str_column", [u"hello", u"world"]), + ("int_column", [42, 8]), + ("bool_column", [True, False]), + ] + ) + dataframe = pandas.DataFrame(df_data) + + dict_schema = [ + {"name": "str_column", "type": "STRING", "mode": "NULLABLE"}, + {"name": "bool_column", "type": "BOOL", "mode": "REQUIRED"}, + ] + + returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, dict_schema) + + expected_schema = ( + schema.SchemaField("str_column", "STRING", "NULLABLE"), + schema.SchemaField("int_column", "INTEGER", "NULLABLE"), + schema.SchemaField("bool_column", "BOOL", "REQUIRED"), + ) + assert returned_schema == expected_schema + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") def test_dataframe_to_arrow_with_multiindex(module_under_test): @@ -856,6 +882,28 @@ def test_dataframe_to_arrow_with_unknown_type(module_under_test): assert arrow_schema[3].name == "field03" +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_dataframe_to_arrow_dict_sequence_schema(module_under_test): + dict_schema = [ + {"name": "field01", "type": "STRING", "mode": "REQUIRED"}, + {"name": "field02", "type": "BOOL", "mode": "NULLABLE"}, + ] + + dataframe = pandas.DataFrame( + {"field01": [u"hello", u"world"], "field02": [True, False]} + ) + + arrow_table = module_under_test.dataframe_to_arrow(dataframe, dict_schema) + arrow_schema = arrow_table.schema + + expected_fields = [ + pyarrow.field("field01", "string", nullable=False), + pyarrow.field("field02", "bool", nullable=True), + ] + assert list(arrow_schema) == expected_fields + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_dataframe_to_parquet_without_pyarrow(module_under_test, monkeypatch): monkeypatch.setattr(module_under_test, "pyarrow", None) @@ -908,6 +956,36 @@ def test_dataframe_to_parquet_compression_method(module_under_test): assert call_args.kwargs.get("compression") == "ZSTD" +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_dataframe_to_parquet_dict_sequence_schema(module_under_test): + dict_schema = [ + {"name": "field01", "type": "STRING", "mode": "REQUIRED"}, + {"name": "field02", "type": "BOOL", "mode": "NULLABLE"}, + ] + + dataframe = pandas.DataFrame( + {"field01": [u"hello", u"world"], "field02": [True, False]} + ) + + write_table_patch = mock.patch.object( + module_under_test.pyarrow.parquet, "write_table", autospec=True + ) + to_arrow_patch = mock.patch.object( + module_under_test, "dataframe_to_arrow", autospec=True + ) + + with write_table_patch, to_arrow_patch as fake_to_arrow: + module_under_test.dataframe_to_parquet(dataframe, dict_schema, None) + + expected_schema_arg = [ + schema.SchemaField("field01", "STRING", mode="REQUIRED"), + schema.SchemaField("field02", "BOOL", mode="NULLABLE"), + ] + schema_arg = fake_to_arrow.call_args.args[1] + assert schema_arg == expected_schema_arg + + @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") def test_download_arrow_tabledata_list_unknown_field_type(module_under_test): fake_page = api_core.page_iterator.Page( @@ -977,3 +1055,62 @@ def test_download_arrow_tabledata_list_known_field_type(module_under_test): col = result.columns[1] assert type(col) is pyarrow.lib.StringArray assert list(col) == ["2.2", "22.22", "222.222"] + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_download_arrow_tabledata_list_dict_sequence_schema(module_under_test): + fake_page = api_core.page_iterator.Page( + parent=mock.Mock(), + items=[{"page_data": "foo"}], + item_to_value=api_core.page_iterator._item_to_value_identity, + ) + fake_page._columns = [[1, 10, 100], ["2.2", "22.22", "222.222"]] + pages = [fake_page] + + dict_schema = [ + {"name": "population_size", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "non_alien_field", "type": "STRING", "mode": "NULLABLE"}, + ] + + results_gen = module_under_test.download_arrow_tabledata_list(pages, dict_schema) + result = next(results_gen) + + assert len(result.columns) == 2 + col = result.columns[0] + assert type(col) is pyarrow.lib.Int64Array + assert list(col) == [1, 10, 100] + col = result.columns[1] + assert type(col) is pyarrow.lib.StringArray + assert list(col) == ["2.2", "22.22", "222.222"] + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +def test_download_dataframe_tabledata_list_dict_sequence_schema(module_under_test): + fake_page = api_core.page_iterator.Page( + parent=mock.Mock(), + items=[{"page_data": "foo"}], + item_to_value=api_core.page_iterator._item_to_value_identity, + ) + fake_page._columns = [[1, 10, 100], ["2.2", "22.22", "222.222"]] + pages = [fake_page] + + dict_schema = [ + {"name": "population_size", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "non_alien_field", "type": "STRING", "mode": "NULLABLE"}, + ] + + results_gen = module_under_test.download_dataframe_tabledata_list( + pages, dict_schema, dtypes={} + ) + result = next(results_gen) + + expected_result = pandas.DataFrame( + collections.OrderedDict( + [ + ("population_size", [1, 10, 100]), + ("non_alien_field", ["2.2", "22.22", "222.222"]), + ] + ) + ) + assert result.equals(expected_result) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index b4e5e96f1e8e..bc56fac34c6a 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -1138,7 +1138,8 @@ def test_create_table_w_day_partition_and_expire(self): self.assertEqual(got.table_id, self.TABLE_ID) def test_create_table_w_schema_and_query(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table path = "projects/%s/datasets/%s/tables" % (self.PROJECT, self.DS_ID) query = "SELECT * from %s:%s" % (self.DS_ID, self.TABLE_ID) @@ -1753,7 +1754,8 @@ def test_update_routine(self): self.assertEqual(req[1]["headers"]["If-Match"], "im-an-etag") def test_update_table(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table path = "projects/%s/datasets/%s/tables/%s" % ( self.PROJECT, @@ -1896,7 +1898,8 @@ def test_update_table_w_query(self): import datetime from google.cloud._helpers import UTC from google.cloud._helpers import _millis - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table path = "projects/%s/datasets/%s/tables/%s" % ( self.PROJECT, @@ -4173,7 +4176,7 @@ def test_insert_rows_w_schema(self): from google.cloud._helpers import UTC from google.cloud._helpers import _datetime_to_rfc3339 from google.cloud._helpers import _microseconds_from_datetime - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField WHEN_TS = 1437767599.006 WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(tzinfo=UTC) @@ -4229,7 +4232,8 @@ def test_insert_rows_w_list_of_dictionaries(self): from google.cloud._helpers import UTC from google.cloud._helpers import _datetime_to_rfc3339 from google.cloud._helpers import _microseconds_from_datetime - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table WHEN_TS = 1437767599.006 WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace(tzinfo=UTC) @@ -4290,8 +4294,8 @@ def _row_data(row): ) def test_insert_rows_w_list_of_Rows(self): + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table - from google.cloud.bigquery.table import SchemaField from google.cloud.bigquery.table import Row PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( @@ -4335,7 +4339,8 @@ def _row_data(row): ) def test_insert_rows_w_skip_invalid_and_ignore_unknown(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( self.PROJECT, @@ -4411,7 +4416,8 @@ def _row_data(row): ) def test_insert_rows_w_repeated_fields(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( self.PROJECT, @@ -4504,7 +4510,7 @@ def test_insert_rows_w_repeated_fields(self): ) def test_insert_rows_w_record_schema(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField PATH = "projects/%s/datasets/%s/tables/%s/insertAll" % ( self.PROJECT, @@ -4633,6 +4639,7 @@ def test_insert_rows_errors(self): def test_insert_rows_w_numeric(self): from google.cloud.bigquery import table + from google.cloud.bigquery.schema import SchemaField project = "PROJECT" ds_id = "DS_ID" @@ -4642,10 +4649,7 @@ def test_insert_rows_w_numeric(self): client = self._make_one(project=project, credentials=creds, _http=http) conn = client._connection = make_connection({}) table_ref = DatasetReference(project, ds_id).table(table_id) - schema = [ - table.SchemaField("account", "STRING"), - table.SchemaField("balance", "NUMERIC"), - ] + schema = [SchemaField("account", "STRING"), SchemaField("balance", "NUMERIC")] insert_table = table.Table(table_ref, schema=schema) rows = [ ("Savings", decimal.Decimal("23.47")), @@ -4677,7 +4681,7 @@ def test_insert_rows_w_numeric(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_insert_rows_from_dataframe(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( @@ -4753,7 +4757,7 @@ def test_insert_rows_from_dataframe(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_insert_rows_from_dataframe_many_columns(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( @@ -4849,8 +4853,9 @@ def test_insert_rows_from_dataframe_w_explicit_none_insert_ids(self): ) def test_insert_rows_json(self): - from google.cloud.bigquery.table import Table, SchemaField from google.cloud.bigquery.dataset import DatasetReference + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PROJECT = "PROJECT" DS_ID = "DS_ID" @@ -4982,8 +4987,8 @@ def test_list_partitions_with_string_id(self): def test_list_rows(self): import datetime from google.cloud._helpers import UTC + from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table - from google.cloud.bigquery.table import SchemaField from google.cloud.bigquery.table import Row PATH = "projects/%s/datasets/%s/tables/%s/data" % ( @@ -5083,7 +5088,8 @@ def test_list_rows_empty_table(self): self.assertEqual(rows.total_rows, 0) def test_list_rows_query_params(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table creds = _make_credentials() http = object() @@ -5105,7 +5111,7 @@ def test_list_rows_query_params(self): self.assertEqual(req[1]["query_params"], test[1], "for kwargs %s" % test[0]) def test_list_rows_repeated_fields(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField PATH = "projects/%s/datasets/%s/tables/%s/data" % ( self.PROJECT, @@ -5165,7 +5171,8 @@ def test_list_rows_repeated_fields(self): ) def test_list_rows_w_record_schema(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table PATH = "projects/%s/datasets/%s/tables/%s/data" % ( self.PROJECT, diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 84f52627b7f3..a2aeb5efbc4a 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -1532,7 +1532,7 @@ def test_schema_hit(self): self.assertEqual(all_props, SchemaField.from_api_repr(all_props_repr)) self.assertEqual(minimal, SchemaField.from_api_repr(minimal_repr)) - def test_schema_setter(self): + def test_schema_setter_fields(self): from google.cloud.bigquery.schema import SchemaField config = self._get_target_class()() @@ -1555,6 +1555,42 @@ def test_schema_setter(self): config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]} ) + def test_schema_setter_valid_mappings_list(self): + config = self._get_target_class()() + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INTEGER", "mode": "REQUIRED"}, + ] + config.schema = schema + + full_name_repr = { + "name": "full_name", + "type": "STRING", + "mode": "REQUIRED", + "description": None, + } + age_repr = { + "name": "age", + "type": "INTEGER", + "mode": "REQUIRED", + "description": None, + } + self.assertEqual( + config._properties["load"]["schema"], {"fields": [full_name_repr, age_repr]} + ) + + def test_schema_setter_invalid_mappings_list(self): + config = self._get_target_class()() + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "typeoo": "INTEGER", "mode": "REQUIRED"}, + ] + + with self.assertRaises(Exception): + config.schema = schema + def test_schema_setter_unsetting_schema(self): from google.cloud.bigquery.schema import SchemaField diff --git a/bigquery/tests/unit/test_schema.py b/bigquery/tests/unit/test_schema.py index 862d8a823e62..fc8a41c68c46 100644 --- a/bigquery/tests/unit/test_schema.py +++ b/bigquery/tests/unit/test_schema.py @@ -568,3 +568,69 @@ def test_w_subfields(self): ], }, ) + + +class Test_to_schema_fields(unittest.TestCase): + @staticmethod + def _call_fut(schema): + from google.cloud.bigquery.schema import _to_schema_fields + + return _to_schema_fields(schema) + + def test_invalid_type(self): + schema = [ + ("full_name", "STRING", "REQUIRED"), + ("address", "STRING", "REQUIRED"), + ] + with self.assertRaises(ValueError): + self._call_fut(schema) + + def test_schema_fields_sequence(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField("age", "INT64", mode="NULLABLE"), + ] + result = self._call_fut(schema) + self.assertEqual(result, schema) + + def test_invalid_mapping_representation(self): + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "address", "typeooo": "STRING", "mode": "REQUIRED"}, + ] + with self.assertRaises(Exception): + self._call_fut(schema) + + def test_valid_mapping_representation(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + { + "name": "residence", + "type": "STRUCT", + "mode": "NULLABLE", + "fields": [ + {"name": "foo", "type": "DATE", "mode": "NULLABLE"}, + {"name": "bar", "type": "BYTES", "mode": "REQUIRED"}, + ], + }, + ] + + expected_schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField( + "residence", + "STRUCT", + mode="NULLABLE", + fields=[ + SchemaField("foo", "DATE", mode="NULLABLE"), + SchemaField("bar", "BYTES", mode="REQUIRED"), + ], + ), + ] + + result = self._call_fut(schema) + self.assertEqual(result, expected_schema) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index b04a4491e6ca..97a7b4ae745e 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -450,7 +450,7 @@ def test_ctor(self): self.assertIsNone(table.clustering_fields) def test_ctor_w_schema(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) @@ -556,7 +556,7 @@ def test_num_rows_getter(self): with self.assertRaises(ValueError): getattr(table, "num_rows") - def test_schema_setter_non_list(self): + def test_schema_setter_non_sequence(self): dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) table = self._make_one(table_ref) @@ -564,7 +564,7 @@ def test_schema_setter_non_list(self): table.schema = object() def test_schema_setter_invalid_field(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) @@ -573,8 +573,8 @@ def test_schema_setter_invalid_field(self): with self.assertRaises(ValueError): table.schema = [full_name, object()] - def test_schema_setter(self): - from google.cloud.bigquery.table import SchemaField + def test_schema_setter_valid_fields(self): + from google.cloud.bigquery.schema import SchemaField dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) @@ -584,6 +584,48 @@ def test_schema_setter(self): table.schema = [full_name, age] self.assertEqual(table.schema, [full_name, age]) + def test_schema_setter_invalid_mapping_representation(self): + dataset = DatasetReference(self.PROJECT, self.DS_ID) + table_ref = dataset.table(self.TABLE_NAME) + table = self._make_one(table_ref) + full_name = {"name": "full_name", "type": "STRING", "mode": "REQUIRED"} + invalid_field = {"name": "full_name", "typeooo": "STRING", "mode": "REQUIRED"} + with self.assertRaises(Exception): + table.schema = [full_name, invalid_field] + + def test_schema_setter_valid_mapping_representation(self): + from google.cloud.bigquery.schema import SchemaField + + dataset = DatasetReference(self.PROJECT, self.DS_ID) + table_ref = dataset.table(self.TABLE_NAME) + table = self._make_one(table_ref) + full_name = {"name": "full_name", "type": "STRING", "mode": "REQUIRED"} + job_status = { + "name": "is_employed", + "type": "STRUCT", + "mode": "NULLABLE", + "fields": [ + {"name": "foo", "type": "DATE", "mode": "NULLABLE"}, + {"name": "bar", "type": "BYTES", "mode": "REQUIRED"}, + ], + } + + table.schema = [full_name, job_status] + + expected_schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField( + "is_employed", + "STRUCT", + mode="NULLABLE", + fields=[ + SchemaField("foo", "DATE", mode="NULLABLE"), + SchemaField("bar", "BYTES", mode="REQUIRED"), + ], + ), + ] + self.assertEqual(table.schema, expected_schema) + def test_props_set_by_server(self): import datetime from google.cloud._helpers import UTC @@ -1145,7 +1187,8 @@ def test__row_from_mapping_wo_schema(self): self.assertEqual(exc.exception.args, (_TABLE_HAS_NO_SCHEMA,)) def test__row_from_mapping_w_invalid_schema(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table MAPPING = { "full_name": "Phred Phlyntstone", @@ -1167,7 +1210,8 @@ def test__row_from_mapping_w_invalid_schema(self): self.assertIn("Unknown field mode: BOGUS", str(exc.exception)) def test__row_from_mapping_w_schema(self): - from google.cloud.bigquery.table import Table, SchemaField + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table MAPPING = { "full_name": "Phred Phlyntstone", @@ -1497,8 +1541,24 @@ def test_constructor_with_table(self): self.assertIs(iterator._table, table) self.assertEqual(iterator.total_rows, 100) + def test_constructor_with_dict_schema(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ] + + iterator = self._make_one(schema=schema) + + expected_schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField("age", "INT64", mode="NULLABLE"), + ] + self.assertEqual(iterator.schema, expected_schema) + def test_iterate(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1529,7 +1589,7 @@ def test_iterate(self): api_request.assert_called_once_with(method="GET", path=path, query_params={}) def test_page_size(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1555,7 +1615,7 @@ def test_page_size(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1637,7 +1697,7 @@ def test_to_arrow(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow_w_nulls(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [SchemaField("name", "STRING"), SchemaField("age", "INTEGER")] rows = [ @@ -1670,7 +1730,7 @@ def test_to_arrow_w_nulls(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow_w_unknown_type(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1708,7 +1768,7 @@ def test_to_arrow_w_unknown_type(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow_w_empty_table(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1870,7 +1930,7 @@ def test_to_arrow_w_bqstorage_no_streams(self): @mock.patch("tqdm.tqdm_notebook") @mock.patch("tqdm.tqdm") def test_to_arrow_progress_bar(self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1913,7 +1973,7 @@ def test_to_arrow_w_pyarrow_none(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1945,7 +2005,7 @@ def test_to_dataframe(self): def test_to_dataframe_progress_bar( self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock ): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -1978,7 +2038,7 @@ def test_to_dataframe_progress_bar( @unittest.skipIf(pandas is None, "Requires `pandas`") @mock.patch("google.cloud.bigquery.table.tqdm", new=None) def test_to_dataframe_no_tqdm_no_progress_bar(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2003,7 +2063,7 @@ def test_to_dataframe_no_tqdm_no_progress_bar(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @mock.patch("google.cloud.bigquery.table.tqdm", new=None) def test_to_dataframe_no_tqdm(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2036,7 +2096,7 @@ def test_to_dataframe_no_tqdm(self): @mock.patch("tqdm.tqdm_notebook", new=None) # will raise TypeError on call @mock.patch("tqdm.tqdm", new=None) # will raise TypeError on call def test_to_dataframe_tqdm_error(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2066,7 +2126,7 @@ def test_to_dataframe_tqdm_error(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_w_empty_results(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2101,7 +2161,7 @@ def test_to_dataframe_logs_tabledata_list(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_w_various_types_nullable(self): import datetime - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("start_timestamp", "TIMESTAMP"), @@ -2141,7 +2201,7 @@ def test_to_dataframe_w_various_types_nullable(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_column_dtypes(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("start_timestamp", "TIMESTAMP"), @@ -2179,7 +2239,7 @@ def test_to_dataframe_column_dtypes(self): @mock.patch("google.cloud.bigquery.table.pandas", new=None) def test_to_dataframe_error_if_pandas_is_none(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"), @@ -2198,7 +2258,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_max_results_w_bqstorage_warning(self): - from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.schema import SchemaField schema = [ SchemaField("name", "STRING", mode="REQUIRED"),