Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#234 from GoogleCloudPlatform/f…
Browse files Browse the repository at this point in the history
…eature/parquetRecordGenerator

Add RECORD type support for parquet.
  • Loading branch information
ryanmcdowell authored Apr 15, 2019
2 parents ef408f9 + bb53d43 commit 64d2235
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 34 deletions.
16 changes: 13 additions & 3 deletions examples/dataflow-data-generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ Note that `RECORD` are not supported when writing to CSV, because it is a flat f
```

`--write_to_parquet` is a flag that specifies the output should be parquet. In order for beam to write to parquet,
a pyarrow schema is needed. Therefore, this tool translates the schema in the --schema_file to
a pyarrow schema is needed. Therefore, this tool translates the schema in the `--schema_file` to
a pyarrow schema automatically if this flag is included, but pyarrow doesn't support all fields that are supported
by BigQuery. STRING, NUMERIC, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, and DATETIME types are supported.
However BYTE, GEOGRAPHY, and RECORD fields are not supported and cannot be included in the --schema_file when writing
by BigQuery. STRING, NUMERIC, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME, and RECORD types are supported.
However BYTE, and GEOGRAPHY fields are not supported and cannot be included in the `--schema_file` when writing
to parquet .

```
Expand Down Expand Up @@ -364,3 +364,13 @@ python bq_table_resizer.py \
--target_gb 15000 \
--location US
```

### Running the tests
Note, that the tests for the BigQuery table resizer require that you have
`GOOGLE_APPLICATION_DEFAULT` set to credentials with access to a BigQuery
environment where you can create and destory tables.

```
cd data-generator-pipeline
python -m unittest discover
```
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import avro.schema
import json
from TimeUtil import datetime_to_epoch_timestamp, date_to_epoch_date, \
time_to_epoch_time

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Expand All @@ -25,30 +25,59 @@ def get_pyarrow_translated_schema(string_schema):
:param string_schema:
:return: pyarrow schema
"""
type_conversions = {
'STRING': pa.string(),
'NUMERIC': pa.int64(),
'BYTE': None,
'INTEGER': pa.int64(),
'FLOAT': pa.float64(),
'NUMERIC': pa.int64(),
'BOOLEAN': pa.bool_(),
'TIMESTAMP': pa.timestamp('us'),
'DATE': pa.date32(),
'TIME': pa.time64('us'),
'DATETIME': pa.timestamp('us'),
'GEOGRAPHY': None,
'RECORD': None
}
def _bq_to_pa_type(field):
"""
A function to convert BigQuery types to pyarrow types.
:param field (bigquery.schema.SchemaField)
:return: pa.DataType
"""
type_conversions = {
'STRING': pa.string(),
'NUMERIC': pa.int64(),
'BYTE': None,
'INTEGER': pa.int64(),
'FLOAT': pa.float64(),
'BOOLEAN': pa.bool_(),
'TIMESTAMP': pa.timestamp('us'),
'DATE': pa.date32(),
'TIME': pa.time64('us'),
'DATETIME': pa.timestamp('us'),
'GEOGRAPHY': None,
}

try:
if field['mode'] == 'REPEATED':
if field['type'] == 'RECORD':
nested_fields = field['fields']
# Recursively call to convert the next nested layer.
return pa.list_(pa.struct(
[(fld['name'], _bq_to_pa_type(fld))
for fld in nested_fields]))
else:
return pa.list_(
_bq_to_pa_type(type_conversions[field['type']]))
elif field['type'] == 'RECORD':
nested_fields = field['fields']
# Recursively call to convert the next nested layer.
return pa.struct(
[(fld['name'], _bq_to_pa_type(fld))
for fld in nested_fields])
else:
return type_conversions.get(field.get('type'))
except KeyError as err:
raise KeyError(
"""Type {} is not a valid BigQuery type and not supported by this
utility.""".format(field['type']))

pa_schema_list = []
for field in string_schema:
field_type = field['type']
field_name = field['name']
field_mode = field['mode']
converted_field_type = type_conversions[field_type]
if converted_field_type is None:
converted_type = _bq_to_pa_type(field)
if converted_type is None:
error_message = 'Error: json schema included a {0:s} field. ' \
'BYTE, GEOGRAPHY, and RECORD types cannot ' \
'BYTE, and GEOGRAPHY types cannot ' \
'currently be used when outputting to ' \
'parquet.'.format(field_type)
logging.error(error_message)
Expand All @@ -57,7 +86,7 @@ def get_pyarrow_translated_schema(string_schema):
nullable = False if field_mode == 'REQUIRED' else True
pa_field = pa.field(
name=field_name,
type=converted_field_type
type=converted_type
#nullable=nullable
)
pa_schema_list.append(pa_field)
Expand All @@ -74,17 +103,23 @@ def fix_record_for_parquet(record, schema):
:return: record with converted TIMESTAMP, DATETIME, DATE, and/or TIME
fields.
"""
for field in schema:
field_name = field["name"]
if field["type"] in ("TIMESTAMP", "DATETIME"):
def _fix_primitive(record, field):
"""
Converts the a value in the field in the record for parquet
compatibility. This is mainly to consistently repeated types.
:param record: record from data from beam pipeline.
:param field: (bigquery.schema.SchemaField) to convert.
"""
field_name = field['name']
if field['type'] in ('TIMESTAMP', 'DATETIME'):
record[field_name] = int(datetime_to_epoch_timestamp(
record[field_name]
))
elif field["type"] == "DATE":
elif field['type'] == 'DATE':
record[field_name] = int(date_to_epoch_date(
record[field_name]
))
elif field["type"] == "TIME":
elif field['type'] == 'TIME':
try:
record[field_name] = datetime.datetime.strptime(
record[field_name],
Expand All @@ -95,5 +130,25 @@ def fix_record_for_parquet(record, schema):
record[field_name],
'%H:%M:%S.%f'
).time()
return record[field_name]

for field in schema:
field_name = field['name']
if field['mode'] == 'REPEATED':
fixed_array = []
for value in record[field_name]:
if field['type'] == 'RECORD':
record[field_name] = fix_record_for_parquet(
value, field['fields'])
else:
fixed_array.append(
_fix_primitive(value, field))
record[field_name] = fixed_array
else:
if field['type'] == 'RECORD':
record[field_name] = fix_record_for_parquet(
record[field_name], field['fields']
)
else:
record[field_name] = _fix_primitive(record, field)
return [record]
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ def test_get_pyarrow_translated_schema(self):
"type": "DATETIME",
"name": "datetime1",
"mode": "REQUIRED"
},
{
"type": "RECORD",
"name": "record1",
"mode": "REPEATED",
"fields": [
{
"type": "BOOLEAN",
"name": "boolean1",
"mode": "REQUIRED"
},
{
"type": "TIMESTAMP",
"name": "timestamp1",
"mode": "REQUIRED"
}]
}
]
expected_pa_schema = pa.schema(
Expand Down Expand Up @@ -115,12 +131,26 @@ def test_get_pyarrow_translated_schema(self):
name='datetime1',
type=pa.timestamp('us')
#nullable=False
),
pa.field(
name='record1',
type=pa.list_(pa.struct([
pa.field(
name='boolean1',
type=pa.bool_()
#nullable=False
),
pa.field(
name='timestamp1',
type=pa.timestamp('us')
#nullable=False
)])
)
)
]
)

pyarrow_schema = get_pyarrow_translated_schema(string_input_schema)
print(pyarrow_schema)
self.assertEqual(pyarrow_schema, expected_pa_schema)

def test_fix_record_for_parquet(self):
Expand All @@ -144,25 +174,42 @@ def test_fix_record_for_parquet(self):
"type": "TIME",
"name": "time1",
"mode": "REQUIRED"
},
{
"type": "RECORD",
"name": "record1",
"mode": "REPEATED",
"fields": [
{
"type": "TIMESTAMP",
"name": "timestamp1",
"mode": "REQUIRED"
}]
}
]

record = {
'timestamp1': u'2019-03-15T20:22:28',
'datetime1': u'2019-03-15T20:24:58',
'date1': u'2019-03-15',
'time1': u'20:20:00.00'
'time1': u'20:20:00.00',
'record1': [{
'timestamp1': u'2019-03-15T20:22:28',
}]
}

expected_output = [{
'timestamp1': 1552681348000000,
'datetime1': 1552681498000000,
'date1': 17970,
'time1': datetime.time(20, 20)
'time1': datetime.time(20, 20),
'record1': [{
'timestamp1': 1552681348000000
}]
}]

output_record = fix_record_for_parquet(record, input_schema)
self.assertEqual(expected_output, output_record)
self.assertEquals(output_record, expected_output)


if __name__ == '__main__':
Expand Down

0 comments on commit 64d2235

Please sign in to comment.