Skip to content

Commit

Permalink
fix(bigquery): write pandas datetime[ns] columns to BigQuery TIMEST…
Browse files Browse the repository at this point in the history
…AMP columns (#10028)

* fix(bigquery): write pandas datetime[ns] columns to BigQuery TIMESTAMP columns

Also:

* Enable TIMESTAMP and DATETIME unit tests for `_pandas_helpers`.
* Add more data types to load dataframe sample.

* blacken

* lint

* update client tests

* doc: show timezone conversions for timestamp columns

Pandas doesn't automatically convert datetime objects to UTC time, so
show how to do this in the code sample.

* doc: update comments to indicate desired use of TIMESTAMP

* fix: add missing client fixture
  • Loading branch information
tswast authored and plamut committed Jan 11, 2020
1 parent 70fe9b4 commit d897d56
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 42 deletions.
6 changes: 4 additions & 2 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
"datetime64[ns]": "DATETIME",
# BigQuery does not support uploading DATETIME values from Parquet files.
# See: https://github.com/googleapis/google-cloud-python/issues/9996
"datetime64[ns]": "TIMESTAMP",
"float32": "FLOAT",
"float64": "FLOAT",
"int8": "INTEGER",
Expand Down Expand Up @@ -218,7 +220,7 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.ListArray.from_pandas(series, type=arrow_type)
if field_type_upper in schema._STRUCT_TYPES:
return pyarrow.StructArray.from_pandas(series, type=arrow_type)
return pyarrow.array(series, type=arrow_type)
return pyarrow.Array.from_pandas(series, type=arrow_type)


def get_column_or_index(dataframe, name):
Expand Down
53 changes: 47 additions & 6 deletions bigquery/samples/load_table_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
def load_table_dataframe(table_id):

# [START bigquery_load_table_dataframe]
from google.cloud import bigquery
import datetime

from google.cloud import bigquery
import pandas
import pytz

# Construct a BigQuery client object.
client = bigquery.Client()
Expand All @@ -27,16 +29,55 @@ def load_table_dataframe(table_id):
# table_id = "your-project.your_dataset.your_table_name"

records = [
{"title": u"The Meaning of Life", "release_year": 1983},
{"title": u"Monty Python and the Holy Grail", "release_year": 1975},
{"title": u"Life of Brian", "release_year": 1979},
{"title": u"And Now for Something Completely Different", "release_year": 1971},
{
"title": u"The Meaning of Life",
"release_year": 1983,
"length_minutes": 112.5,
"release_date": pytz.timezone("Europe/Paris")
.localize(datetime.datetime(1983, 5, 9, 13, 0, 0))
.astimezone(pytz.utc),
# Assume UTC timezone when a datetime object contains no timezone.
"dvd_release": datetime.datetime(2002, 1, 22, 7, 0, 0),
},
{
"title": u"Monty Python and the Holy Grail",
"release_year": 1975,
"length_minutes": 91.5,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1975, 4, 9, 23, 59, 2))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2002, 7, 16, 9, 0, 0),
},
{
"title": u"Life of Brian",
"release_year": 1979,
"length_minutes": 94.25,
"release_date": pytz.timezone("America/New_York")
.localize(datetime.datetime(1979, 8, 17, 23, 59, 5))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2008, 1, 14, 8, 0, 0),
},
{
"title": u"And Now for Something Completely Different",
"release_year": 1971,
"length_minutes": 88.0,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1971, 9, 28, 23, 59, 7))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2003, 10, 22, 10, 0, 0),
},
]
dataframe = pandas.DataFrame(
records,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=["title", "release_year"],
columns=[
"title",
"release_year",
"length_minutes",
"release_date",
"dvd_release",
],
# Optionally, set a named index, which can also be written to the
# BigQuery table.
index=pandas.Index(
Expand Down
47 changes: 44 additions & 3 deletions bigquery/samples/tests/test_load_table_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,52 @@
pyarrow = pytest.importorskip("pyarrow")


def test_load_table_dataframe(capsys, random_table_id):
def test_load_table_dataframe(capsys, client, random_table_id):

table = load_table_dataframe.load_table_dataframe(random_table_id)
out, _ = capsys.readouterr()
assert "Loaded 4 rows and 3 columns" in out
expected_column_names = [
"wikidata_id",
"title",
"release_year",
"length_minutes",
"release_date",
"dvd_release",
]
assert "Loaded 4 rows and {} columns".format(len(expected_column_names)) in out

column_names = [field.name for field in table.schema]
assert column_names == ["wikidata_id", "title", "release_year"]
assert column_names == expected_column_names
column_types = [field.field_type for field in table.schema]
assert column_types == [
"STRING",
"STRING",
"INTEGER",
"FLOAT",
"TIMESTAMP",
"TIMESTAMP",
]

df = client.list_rows(table).to_dataframe()
df.sort_values("release_year", inplace=True)
assert df["title"].tolist() == [
u"And Now for Something Completely Different",
u"Monty Python and the Holy Grail",
u"Life of Brian",
u"The Meaning of Life",
]
assert df["release_year"].tolist() == [1971, 1975, 1979, 1983]
assert df["length_minutes"].tolist() == [88.0, 91.5, 94.25, 112.5]
assert df["release_date"].tolist() == [
pandas.Timestamp("1971-09-28T22:59:07+00:00"),
pandas.Timestamp("1975-04-09T22:59:02+00:00"),
pandas.Timestamp("1979-08-18T03:59:05+00:00"),
pandas.Timestamp("1983-05-09T11:00:00+00:00"),
]
assert df["dvd_release"].tolist() == [
pandas.Timestamp("2003-10-22T10:00:00+00:00"),
pandas.Timestamp("2002-07-16T09:00:00+00:00"),
pandas.Timestamp("2008-01-14T08:00:00+00:00"),
pandas.Timestamp("2002-01-22T07:00:00+00:00"),
]
assert df["wikidata_id"].tolist() == [u"Q16403", u"Q25043", u"Q24953", u"Q24980"]
5 changes: 4 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,10 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
(
bigquery.SchemaField("bool_col", "BOOLEAN"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
bigquery.SchemaField("dt_col", "DATETIME"),
# BigQuery does not support uploading DATETIME values from
# Parquet files. See:
# https://github.com/googleapis/google-cloud-python/issues/9996
bigquery.SchemaField("dt_col", "TIMESTAMP"),
bigquery.SchemaField("float32_col", "FLOAT"),
bigquery.SchemaField("float64_col", "FLOAT"),
bigquery.SchemaField("int8_col", "INTEGER"),
Expand Down
84 changes: 56 additions & 28 deletions bigquery/tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def test_is_datetime():
assert is_datetime(pyarrow.timestamp("us", tz=None))
assert not is_datetime(pyarrow.timestamp("ms", tz=None))
assert not is_datetime(pyarrow.timestamp("us", tz="UTC"))
assert not is_datetime(pyarrow.timestamp("ns", tz="UTC"))
assert not is_datetime(pyarrow.string())


Expand Down Expand Up @@ -386,20 +387,15 @@ def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test):
),
("BOOLEAN", [True, None, False, None]),
("BOOL", [False, None, True, None]),
# TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is
# resolved, test with TIMESTAMP column. Conversion from pyarrow
# TimestampArray to list of Python objects fails with OverflowError:
# Python int too large to convert to C long.
#
# (
# "TIMESTAMP",
# [
# datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
# None,
# datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc),
# datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
# ],
# ),
(
"TIMESTAMP",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc),
datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
],
),
(
"DATE",
[
Expand All @@ -418,20 +414,16 @@ def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test):
datetime.time(12, 0, 0),
],
),
# TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is
# resolved, test with DATETIME column. Conversion from pyarrow
# TimestampArray to list of Python objects fails with OverflowError:
# Python int too large to convert to C long.
#
# (
# "DATETIME",
# [
# datetime.datetime(1, 1, 1, 0, 0, 0),
# None,
# datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
# datetime.datetime(1970, 1, 1, 0, 0, 0),
# ],
# ),
(
"DATETIME",
[
datetime.datetime(1, 1, 1, 0, 0, 0),
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
None,
datetime.datetime(1970, 1, 1, 0, 0, 0),
datetime.datetime(1999, 3, 14, 15, 9, 26, 535898),
],
),
(
"GEOGRAPHY",
[
Expand All @@ -453,6 +445,42 @@ def test_bq_to_arrow_array_w_nullable_scalars(module_under_test, bq_type, rows):
assert rows == roundtrip


@pytest.mark.parametrize(
"bq_type,rows",
[
(
"TIMESTAMP",
[
"1971-09-28T23:59:07+00:00",
"1975-04-09T23:59:02+00:00",
"1979-08-17T23:59:05+00:00",
"NaT",
"1983-05-09T13:00:00+00:00",
],
),
(
"DATETIME",
[
"1971-09-28T23:59:07",
"1975-04-09T23:59:02",
"1979-08-17T23:59:05",
"NaT",
"1983-05-09T13:00:00",
],
),
],
)
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_bq_to_arrow_array_w_pandas_timestamp(module_under_test, bq_type, rows):
rows = [pandas.Timestamp(row) for row in rows]
series = pandas.Series(rows)
bq_field = schema.SchemaField("field_name", bq_type)
arrow_array = module_under_test.bq_to_arrow_array(series, bq_field)
roundtrip = arrow_array.to_pandas()
assert series.equals(roundtrip)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_bq_to_arrow_array_w_arrays(module_under_test):
Expand Down
4 changes: 2 additions & 2 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6425,7 +6425,7 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
SchemaField("int_col", "INTEGER"),
SchemaField("float_col", "FLOAT"),
SchemaField("bool_col", "BOOLEAN"),
SchemaField("dt_col", "DATETIME"),
SchemaField("dt_col", "TIMESTAMP"),
SchemaField("ts_col", "TIMESTAMP"),
)

Expand Down Expand Up @@ -6671,7 +6671,7 @@ def test_load_table_from_dataframe_w_partial_schema(self):
SchemaField("int_as_float_col", "INTEGER"),
SchemaField("float_col", "FLOAT"),
SchemaField("bool_col", "BOOLEAN"),
SchemaField("dt_col", "DATETIME"),
SchemaField("dt_col", "TIMESTAMP"),
SchemaField("ts_col", "TIMESTAMP"),
SchemaField("string_col", "STRING"),
SchemaField("bytes_col", "BYTES"),
Expand Down

0 comments on commit d897d56

Please sign in to comment.