Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery): write pandas datetime[ns] columns to BigQuery TIMESTAMP columns #10028

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
"datetime64[ns]": "DATETIME",
# BigQuery does not support uploading DATETIME values from Parquet files.
# See: https://github.com/googleapis/google-cloud-python/issues/9996
"datetime64[ns]": "TIMESTAMP",
"float32": "FLOAT",
"float64": "FLOAT",
"int8": "INTEGER",
Expand Down Expand Up @@ -218,7 +220,7 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.ListArray.from_pandas(series, type=arrow_type)
if field_type_upper in schema._STRUCT_TYPES:
return pyarrow.StructArray.from_pandas(series, type=arrow_type)
return pyarrow.array(series, type=arrow_type)
return pyarrow.Array.from_pandas(series, type=arrow_type)


def get_column_or_index(dataframe, name):
Expand Down
53 changes: 47 additions & 6 deletions bigquery/samples/load_table_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
def load_table_dataframe(table_id):

# [START bigquery_load_table_dataframe]
from google.cloud import bigquery
import datetime

from google.cloud import bigquery
import pandas
import pytz

# Construct a BigQuery client object.
client = bigquery.Client()
Expand All @@ -27,16 +29,55 @@ def load_table_dataframe(table_id):
# table_id = "your-project.your_dataset.your_table_name"

records = [
{"title": u"The Meaning of Life", "release_year": 1983},
{"title": u"Monty Python and the Holy Grail", "release_year": 1975},
{"title": u"Life of Brian", "release_year": 1979},
{"title": u"And Now for Something Completely Different", "release_year": 1971},
{
"title": u"The Meaning of Life",
"release_year": 1983,
"length_minutes": 112.5,
"release_date": pytz.timezone("Europe/Paris")
.localize(datetime.datetime(1983, 5, 9, 13, 0, 0))
.astimezone(pytz.utc),
# Assume UTC timezone when a datetime object contains no timezone.
"dvd_release": datetime.datetime(2002, 1, 22, 7, 0, 0),
},
{
"title": u"Monty Python and the Holy Grail",
"release_year": 1975,
"length_minutes": 91.5,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1975, 4, 9, 23, 59, 2))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2002, 7, 16, 9, 0, 0),
},
{
"title": u"Life of Brian",
"release_year": 1979,
"length_minutes": 94.25,
"release_date": pytz.timezone("America/New_York")
.localize(datetime.datetime(1979, 8, 17, 23, 59, 5))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2008, 1, 14, 8, 0, 0),
},
{
"title": u"And Now for Something Completely Different",
"release_year": 1971,
"length_minutes": 88.0,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1971, 9, 28, 23, 59, 7))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2003, 10, 22, 10, 0, 0),
},
]
dataframe = pandas.DataFrame(
records,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=["title", "release_year"],
columns=[
"title",
"release_year",
"length_minutes",
"release_date",
"dvd_release",
],
# Optionally, set a named index, which can also be written to the
# BigQuery table.
index=pandas.Index(
Expand Down
47 changes: 44 additions & 3 deletions bigquery/samples/tests/test_load_table_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,52 @@
pyarrow = pytest.importorskip("pyarrow")


def test_load_table_dataframe(capsys, random_table_id):
def test_load_table_dataframe(capsys, client, random_table_id):

table = load_table_dataframe.load_table_dataframe(random_table_id)
out, _ = capsys.readouterr()
assert "Loaded 4 rows and 3 columns" in out
expected_column_names = [
"wikidata_id",
"title",
"release_year",
"length_minutes",
"release_date",
"dvd_release",
]
assert "Loaded 4 rows and {} columns".format(len(expected_column_names)) in out

column_names = [field.name for field in table.schema]
assert column_names == ["wikidata_id", "title", "release_year"]
assert column_names == expected_column_names
column_types = [field.field_type for field in table.schema]
assert column_types == [
"STRING",
"STRING",
"INTEGER",
"FLOAT",
"TIMESTAMP",
"TIMESTAMP",
]

df = client.list_rows(table).to_dataframe()
plamut marked this conversation as resolved.
Show resolved Hide resolved
df.sort_values("release_year", inplace=True)
assert df["title"].tolist() == [
u"And Now for Something Completely Different",
u"Monty Python and the Holy Grail",
u"Life of Brian",
u"The Meaning of Life",
]
assert df["release_year"].tolist() == [1971, 1975, 1979, 1983]
assert df["length_minutes"].tolist() == [88.0, 91.5, 94.25, 112.5]
assert df["release_date"].tolist() == [
pandas.Timestamp("1971-09-28T22:59:07+00:00"),
pandas.Timestamp("1975-04-09T22:59:02+00:00"),
pandas.Timestamp("1979-08-18T03:59:05+00:00"),
pandas.Timestamp("1983-05-09T11:00:00+00:00"),
]
assert df["dvd_release"].tolist() == [
pandas.Timestamp("2003-10-22T10:00:00+00:00"),
pandas.Timestamp("2002-07-16T09:00:00+00:00"),
pandas.Timestamp("2008-01-14T08:00:00+00:00"),
pandas.Timestamp("2002-01-22T07:00:00+00:00"),
]
assert df["wikidata_id"].tolist() == [u"Q16403", u"Q25043", u"Q24953", u"Q24980"]
5 changes: 4 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,10 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
(
bigquery.SchemaField("bool_col", "BOOLEAN"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
bigquery.SchemaField("dt_col", "DATETIME"),
# BigQuery does not support uploading DATETIME values from
# Parquet files. See:
# https://github.com/googleapis/google-cloud-python/issues/9996
bigquery.SchemaField("dt_col", "TIMESTAMP"),
bigquery.SchemaField("float32_col", "FLOAT"),
bigquery.SchemaField("float64_col", "FLOAT"),
bigquery.SchemaField("int8_col", "INTEGER"),
Expand Down
84 changes: 56 additions & 28 deletions bigquery/tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def test_is_datetime():
assert is_datetime(pyarrow.timestamp("us", tz=None))
assert not is_datetime(pyarrow.timestamp("ms", tz=None))
assert not is_datetime(pyarrow.timestamp("us", tz="UTC"))
assert not is_datetime(pyarrow.timestamp("ns", tz="UTC"))
assert not is_datetime(pyarrow.string())


Expand Down Expand Up @@ -386,20 +387,15 @@ def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test):
),
("BOOLEAN", [True, None, False, None]),
("BOOL", [False, None, True, None]),
# TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is
# resolved, test with TIMESTAMP column. Conversion from pyarrow
# TimestampArray to list of Python objects fails with OverflowError:
# Python int too large to convert to C long.
#
# (
# "TIMESTAMP",
# [
# datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
# None,
# datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc),
# datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
# ],
# ),
(
"TIMESTAMP",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc),
datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
],
),
(
"DATE",
[
Expand All @@ -418,20 +414,16 @@ def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test):
datetime.time(12, 0, 0),
],
),
# TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is
# resolved, test with DATETIME column. Conversion from pyarrow
# TimestampArray to list of Python objects fails with OverflowError:
# Python int too large to convert to C long.
#
# (
# "DATETIME",
# [
# datetime.datetime(1, 1, 1, 0, 0, 0),
# None,
# datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
# datetime.datetime(1970, 1, 1, 0, 0, 0),
# ],
# ),
(
"DATETIME",
[
datetime.datetime(1, 1, 1, 0, 0, 0),
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
None,
datetime.datetime(1970, 1, 1, 0, 0, 0),
datetime.datetime(1999, 3, 14, 15, 9, 26, 535898),
],
),
(
"GEOGRAPHY",
[
Expand All @@ -453,6 +445,42 @@ def test_bq_to_arrow_array_w_nullable_scalars(module_under_test, bq_type, rows):
assert rows == roundtrip


@pytest.mark.parametrize(
"bq_type,rows",
[
(
"TIMESTAMP",
[
"1971-09-28T23:59:07+00:00",
"1975-04-09T23:59:02+00:00",
"1979-08-17T23:59:05+00:00",
"NaT",
"1983-05-09T13:00:00+00:00",
],
),
(
"DATETIME",
[
"1971-09-28T23:59:07",
"1975-04-09T23:59:02",
"1979-08-17T23:59:05",
"NaT",
"1983-05-09T13:00:00",
],
),
],
)
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_bq_to_arrow_array_w_pandas_timestamp(module_under_test, bq_type, rows):
rows = [pandas.Timestamp(row) for row in rows]
series = pandas.Series(rows)
bq_field = schema.SchemaField("field_name", bq_type)
arrow_array = module_under_test.bq_to_arrow_array(series, bq_field)
roundtrip = arrow_array.to_pandas()
assert series.equals(roundtrip)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_bq_to_arrow_array_w_arrays(module_under_test):
Expand Down
4 changes: 2 additions & 2 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6425,7 +6425,7 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
SchemaField("int_col", "INTEGER"),
SchemaField("float_col", "FLOAT"),
SchemaField("bool_col", "BOOLEAN"),
SchemaField("dt_col", "DATETIME"),
SchemaField("dt_col", "TIMESTAMP"),
SchemaField("ts_col", "TIMESTAMP"),
)

Expand Down Expand Up @@ -6671,7 +6671,7 @@ def test_load_table_from_dataframe_w_partial_schema(self):
SchemaField("int_as_float_col", "INTEGER"),
SchemaField("float_col", "FLOAT"),
SchemaField("bool_col", "BOOLEAN"),
SchemaField("dt_col", "DATETIME"),
SchemaField("dt_col", "TIMESTAMP"),
SchemaField("ts_col", "TIMESTAMP"),
SchemaField("string_col", "STRING"),
SchemaField("bytes_col", "BYTES"),
Expand Down