Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Allow specifying index data type in partial schema to load_table_from_dataframe. #9084

Merged
merged 10 commits into from
Aug 28, 2019
47 changes: 0 additions & 47 deletions bigquery/docs/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2741,52 +2741,5 @@ def test_list_rows_as_dataframe(client):
assert len(df) == table.num_rows # verify the number of rows


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.parametrize("parquet_engine", ["pyarrow", "fastparquet"])
def test_load_table_from_dataframe(client, to_delete, parquet_engine):
if parquet_engine == "pyarrow" and pyarrow is None:
pytest.skip("Requires `pyarrow`")
if parquet_engine == "fastparquet" and fastparquet is None:
pytest.skip("Requires `fastparquet`")

pandas.set_option("io.parquet.engine", parquet_engine)

dataset_id = "load_table_from_dataframe_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_load_table_dataframe]
# from google.cloud import bigquery
# import pandas
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table("monty_python")
records = [
{"title": u"The Meaning of Life", "release_year": 1983},
{"title": u"Monty Python and the Holy Grail", "release_year": 1975},
{"title": u"Life of Brian", "release_year": 1979},
{"title": u"And Now for Something Completely Different", "release_year": 1971},
]
# Optionally set explicit indices.
# If indices are not specified, a column will be created for the default
# indices created by pandas.
index = [u"Q24980", u"Q25043", u"Q24953", u"Q16403"]
dataframe = pandas.DataFrame(records, index=pandas.Index(index, name="wikidata_id"))

job = client.load_table_from_dataframe(dataframe, table_ref, location="US")

job.result() # Waits for table load to complete.

assert job.state == "DONE"
table = client.get_table(table_ref)
assert table.num_rows == 4
# [END bigquery_load_table_dataframe]
column_names = [field.name for field in table.schema]
assert sorted(column_names) == ["release_year", "title", "wikidata_id"]


if __name__ == "__main__":
pytest.main()
2 changes: 2 additions & 0 deletions bigquery/google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from google.cloud.bigquery.dataset import AccessEntry
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
Expand Down Expand Up @@ -124,6 +125,7 @@
"GoogleSheetsOptions",
"DEFAULT_RETRY",
# Enum Constants
"enums",
"Compression",
"CreateDisposition",
"DestinationFormat",
Expand Down
48 changes: 45 additions & 3 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,28 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.array(series, type=arrow_type)


def _columns_and_indexes(dataframe):
"""Return all index and column names with dtypes.

Returns:
Sequence[Tuple[dtype, str]]:
Returns a sorted list of indexes and column names with
corresponding dtypes.
"""
columns_and_indexes = []
if isinstance(dataframe.index, pandas.MultiIndex):
for name in dataframe.index.names:
if name:
values = dataframe.index.get_level_values(name)
columns_and_indexes.append((name, values.dtype))
else:
if dataframe.index.name:
columns_and_indexes.append((dataframe.index.name, dataframe.index.dtype))

columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
return columns_and_indexes


def dataframe_to_bq_schema(dataframe, bq_schema):
"""Convert a pandas DataFrame schema to a BigQuery schema.

Expand Down Expand Up @@ -217,7 +239,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
bq_schema_unused = set()

bq_schema_out = []
for column, dtype in zip(dataframe.columns, dataframe.dtypes):
for column, dtype in _columns_and_indexes(dataframe):
# Use provided type from schema, if present.
bq_field = bq_schema_index.get(column)
if bq_field:
Expand Down Expand Up @@ -245,6 +267,21 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
return tuple(bq_schema_out)


def _column_or_index(dataframe, name):
"""Return a column or index as a pandas series."""
if name in dataframe.columns:
return dataframe[name]

if isinstance(dataframe.index, pandas.MultiIndex):
if name in dataframe.index.names:
return dataframe.index.get_level_values(name)
else:
if name == dataframe.index.name:
return dataframe.index.to_series()

raise ValueError("column or index '{}' not found.".format(name))
plamut marked this conversation as resolved.
Show resolved Hide resolved


def dataframe_to_arrow(dataframe, bq_schema):
"""Convert pandas dataframe to Arrow table, using BigQuery schema.

Expand All @@ -261,16 +298,19 @@ def dataframe_to_arrow(dataframe, bq_schema):
BigQuery schema.
"""
column_names = set(dataframe.columns)
column_and_index_names = set(name for name, _ in _columns_and_indexes(dataframe))
bq_field_names = set(field.name for field in bq_schema)

extra_fields = bq_field_names - column_names
extra_fields = bq_field_names - column_and_index_names
if extra_fields:
raise ValueError(
"bq_schema contains fields not present in dataframe: {}".format(
extra_fields
)
)

# It's okay for indexes to be missing from bq_schema, but it's not okay to
# be missing columns.
missing_fields = column_names - bq_field_names
if missing_fields:
raise ValueError(
Expand All @@ -283,7 +323,9 @@ def dataframe_to_arrow(dataframe, bq_schema):
for bq_field in bq_schema:
arrow_fields.append(bq_to_arrow_field(bq_field))
arrow_names.append(bq_field.name)
arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field))
arrow_arrays.append(
bq_to_arrow_array(_column_or_index(dataframe, bq_field.name), bq_field)
)

if all((field is not None for field in arrow_fields)):
return pyarrow.Table.from_arrays(
Expand Down
63 changes: 63 additions & 0 deletions bigquery/samples/load_table_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def load_table_dataframe(client, table_id):
# [START bigquery_load_table_dataframe]
from google.cloud import bigquery
import pandas

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

records = [
{"title": u"The Meaning of Life", "release_year": 1983},
{"title": u"Monty Python and the Holy Grail", "release_year": 1975},
{"title": u"Life of Brian", "release_year": 1979},
{"title": u"And Now for Something Completely Different", "release_year": 1971},
]
# Optionally set explicit indices.
index = [u"Q24980", u"Q25043", u"Q24953", u"Q16403"]
dataframe = pandas.DataFrame(records, index=pandas.Index(index, name="wikidata_id"))
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
# Specify the type of columns whose type cannot be auto-detected. For
# example the "title" column uses pandas dtype "object", so its
# data type is ambiguous.
bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
# Indexes are written if included in the schema by name.
bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
]
)

job = client.load_table_from_dataframe(
dataframe, table_id, job_config=job_config, location="US"
)
job.result() # Waits for table load to complete.

table = client.get_table(table_id)
print("Wrote {} rows to {}".format(table.num_rows, table_id))
# [END bigquery_load_table_dataframe]


if __name__ == "__main__":
import sys
from google.cloud import bigquery

load_table_dataframe(bigquery.Client(), sys.argv[1])
28 changes: 28 additions & 0 deletions bigquery/samples/tests/test_load_table_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from .. import load_table_dataframe


pytest.importorskip("pandas")
pytest.importorskip("pyarrow")


def test_load_table_dataframe(client, random_table_id):
load_table_dataframe.load_table_dataframe(client, random_table_id)

column_names = [field.name for field in table.schema]
plamut marked this conversation as resolved.
Show resolved Hide resolved
assert sorted(column_names) == ["release_year", "title", "wikidata_id"]