Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Raise helpful error when loading table from dataframe with STRUCT columns #9053

Merged
merged 5 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import _STRUCT_TYPES
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _table_arg_to_table
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1529,6 +1530,14 @@ def load_table_from_dataframe(
os.close(tmpfd)

try:
if job_config.schema:
for field in job_config.schema:
if field.field_type in _STRUCT_TYPES:
raise ValueError(
"Pyarrow does not support serializing dataframes with "
plamut marked this conversation as resolved.
Show resolved Hide resolved
"struct (record) column types."
)

if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()
Expand All @@ -1548,6 +1557,14 @@ def load_table_from_dataframe(
PendingDeprecationWarning,
stacklevel=2,
)
else:
warnings.warn(
plamut marked this conversation as resolved.
Show resolved Hide resolved
"Loading from a dataframe without a schema will be "
"deprecated in the future, please provide a schema.",
PendingDeprecationWarning,
stacklevel=2,
)

dataframe.to_parquet(tmppath, compression=parquet_compression)

with open(tmppath, "rb") as parquet_file:
Expand Down
59 changes: 59 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5328,6 +5328,65 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
assert sent_config is job_config
assert sent_config.source_format == job.SourceFormat.PARQUET

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_struct_fields_error(self):
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()

records = [{"float_column": 3.14, "struct_column": [{"foo": 1}, {"bar": -1}]}]
dataframe = pandas.DataFrame(data=records)

schema = [
SchemaField("float_column", "FLOAT"),
SchemaField(
"agg_col",
"RECORD",
fields=[SchemaField("foo", "INTEGER"), SchemaField("bar", "INTEGER")],
),
]
job_config = job.LoadJobConfig(schema=schema)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

with pytest.raises(ValueError) as exc_info, load_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION
)

err_msg = str(exc_info.value)
assert "struct" in err_msg
assert "not support" in err_msg

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_wo_schema_warning(self):
client = self._make_client()
records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}]
dataframe = pandas.DataFrame(records)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None)

with load_patch, pyarrow_patch, warnings.catch_warnings(record=True) as warned:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, location=self.LOCATION
)

matches = [
warning
for warning in warned
if warning.category in (DeprecationWarning, PendingDeprecationWarning)
and "please provide a schema" in str(warning)
]
assert matches, "A missing schema deprecation warning was not raised."

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_schema_wo_pyarrow(self):
Expand Down