Skip to content

Commit

Permalink
Determine the schema in load_table_from_dataframe based on dtypes.
Browse files Browse the repository at this point in the history
This PR updates `load_table_from_dataframe` to automatically determine
the BigQuery schema based on the DataFrame's dtypes. If any field's type
cannot be determined, fallback to the logic in the pandas `to_parquet`
method.
  • Loading branch information
tswast committed Aug 16, 2019
1 parent 1a103d3 commit 16945d5
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 0 deletions.
40 changes: 40 additions & 0 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@

_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.

_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
"datetime64[ns]": "DATETIME",
"float32": "FLOAT",
"float64": "FLOAT",
"int8": "INTEGER",
"int16": "INTEGER",
"int32": "INTEGER",
"int64": "INTEGER",
"uint8": "INTEGER",
"uint16": "INTEGER",
"uint32": "INTEGER",
}


class _DownloadState(object):
"""Flag to indicate that a thread should exit early."""
Expand Down Expand Up @@ -172,6 +187,31 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.array(series, type=arrow_type)


def dataframe_to_bq_schema(dataframe):
"""Convert a pandas DataFrame schema to a BigQuery schema.
TODO(GH#8140): Add bq_schema argument to allow overriding autodetected
schema for a subset of columns.
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
Returns:
Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]:
The automatically determined schema. Returns None if the type of
any column cannot be determined.
"""
bq_schema = []
for column, dtype in zip(dataframe.columns, dataframe.dtypes):
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
if not bq_type:
return None
bq_field = schema.SchemaField(column, bq_type)
bq_schema.append(bq_field)
return tuple(bq_schema)


def dataframe_to_arrow(dataframe, bq_schema):
"""Convert pandas dataframe to Arrow table, using BigQuery schema.
Expand Down
9 changes: 9 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
except ImportError: # Python 2.7
import collections as collections_abc

import copy
import functools
import gzip
import io
Expand Down Expand Up @@ -1520,11 +1521,19 @@ def load_table_from_dataframe(

if job_config is None:
job_config = job.LoadJobConfig()
else:
# Make a copy so that the job config isn't modified in-place.
job_config_properties = copy.deepcopy(job_config._properties)
job_config = job.LoadJobConfig()
job_config._properties = job_config_properties
job_config.source_format = job.SourceFormat.PARQUET

if location is None:
location = self.location

if not job_config.schema:
job_config.schema = _pandas_helpers.dataframe_to_bq_schema(dataframe)

tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
os.close(tmpfd)

Expand Down
94 changes: 94 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,100 @@ def test_load_table_from_local_avro_file_then_dump_table(self):
sorted(row_tuples, key=by_wavelength), sorted(ROWS, key=by_wavelength)
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_automatic_schema(self):
"""Test that a DataFrame with dtypes that map well to BigQuery types
can be uploaded without specifying a schema.
https://github.com/googleapis/google-cloud-python/issues/9044
"""
bool_col = pandas.Series([True, False, True], dtype="bool")
ts_col = pandas.Series(
[
datetime.datetime(2010, 1, 2, 3, 44, 50),
datetime.datetime(2011, 2, 3, 14, 50, 59),
datetime.datetime(2012, 3, 14, 15, 16),
],
dtype="datetime64[ns]",
).dt.tz_localize(pytz.utc)
dt_col = pandas.Series(
[
datetime.datetime(2010, 1, 2, 3, 44, 50),
datetime.datetime(2011, 2, 3, 14, 50, 59),
datetime.datetime(2012, 3, 14, 15, 16),
],
dtype="datetime64[ns]",
)
float32_col = pandas.Series([1.0, 2.0, 3.0], dtype="float32")
float64_col = pandas.Series([4.0, 5.0, 6.0], dtype="float64")
int8_col = pandas.Series([-12, -11, -10], dtype="int8")
int16_col = pandas.Series([-9, -8, -7], dtype="int16")
int32_col = pandas.Series([-6, -5, -4], dtype="int32")
int64_col = pandas.Series([-3, -2, -1], dtype="int64")
uint8_col = pandas.Series([0, 1, 2], dtype="uint8")
uint16_col = pandas.Series([3, 4, 5], dtype="uint16")
uint32_col = pandas.Series([6, 7, 8], dtype="uint32")
dataframe = pandas.DataFrame(
{
"bool_col": bool_col,
"ts_col": ts_col,
"dt_col": dt_col,
"float32_col": float32_col,
"float64_col": float64_col,
"int8_col": int8_col,
"int16_col": int16_col,
"int32_col": int32_col,
"int64_col": int64_col,
"uint8_col": uint8_col,
"uint16_col": uint16_col,
"uint32_col": uint32_col,
},
columns=[
"bool_col",
"ts_col",
"dt_col",
"float32_col",
"float64_col",
"int8_col",
"int16_col",
"int32_col",
"int64_col",
"uint8_col",
"uint16_col",
"uint32_col",
],
)

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_automatic_schema".format(
Config.CLIENT.project, dataset_id
)

load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()

table = Config.CLIENT.get_table(table_id)
self.assertEqual(
tuple(table.schema),
(
bigquery.SchemaField("bool_col", "BOOLEAN"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
bigquery.SchemaField("dt_col", "DATETIME"),
bigquery.SchemaField("float32_col", "FLOAT"),
bigquery.SchemaField("float64_col", "FLOAT"),
bigquery.SchemaField("int8_col", "INTEGER"),
bigquery.SchemaField("int16_col", "INTEGER"),
bigquery.SchemaField("int32_col", "INTEGER"),
bigquery.SchemaField("int64_col", "INTEGER"),
bigquery.SchemaField("uint8_col", "INTEGER"),
bigquery.SchemaField("uint16_col", "INTEGER"),
bigquery.SchemaField("uint32_col", "INTEGER"),
),
)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nulls(self):
Expand Down

0 comments on commit 16945d5

Please sign in to comment.