Skip to content

Commit

Permalink
Merge branch 'master' into jakePR185
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Ferriero authored Apr 5, 2019
2 parents b5a0cc0 + 9364ec5 commit 80aca86
Show file tree
Hide file tree
Showing 41 changed files with 74,310 additions and 90 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The tools folder contains ready-made utilities which can simpilfy Google Cloud P
* [Maven Archetype Dataflow](tools/maven-archetype-dataflow) - A maven archetype which bootstraps a Dataflow project with common plugins pre-configured to help maintain high code quality.
* [Netblock Monitor](tools/netblock-monitor) - An Apps Script project that will automatically provide email notifications when changes are made to Google’s IP ranges.
* [Site Verification Group Sync](tools/site-verification-group-sync) - A tool to provision "verified owner" permissions (to create GCS buckets with custom dns) based on membership of a Google Group.
* [Agile Machine Learning API](tools/agile-machine-learning-api) - A web application which provides the ability to train and deploy ML models on Google Cloud Machine Learning Engine, and visualize the predicted results using LIME through simple post request.

## Contributing
See the contributing [instructions](/CONTRIBUTING.md) to get started contributing.
Expand Down
13 changes: 12 additions & 1 deletion examples/dataflow-data-generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ based on if you pass the `--csv_schema_order` or `--avro_schema_file` parameters

#### Output format

Output format is specified by passing one of the `--csv_schema_order` or `--avro_schema_file` parameters.
Output format is specified by passing one of the `--csv_schema_order`, `--avro_schema_file`, or `--write_to_parquet` parameters.

`--csv_schema_order` should be a comma separated list specifying the order of the fieldnames for writing.
Note that `RECORD` are not supported when writing to CSV, because it is a flat file format.
Expand All @@ -140,6 +140,17 @@ Note that `RECORD` are not supported when writing to CSV, because it is a flat f
--avro_schema_file=/path/to/linorders.avsc
```

`--write_to_parquet` is a flag that specifies the output should be parquet. In order for beam to write to parquet,
a pyarrow schema is needed. Therefore, this tool translates the schema in the --schema_file to
a pyarrow schema automatically if this flag is included, but pyarrow doesn't support all fields that are supported
by BigQuery. STRING, NUMERIC, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, and DATETIME types are supported.
However BYTE, GEOGRAPHY, and RECORD fields are not supported and cannot be included in the --schema_file when writing
to parquet .

```
--write_to_parquet
```

Alternatively, you can write directly to a BigQuery table by specifying an `--output_bq_table`. However, if you are generating
more than 100K records, you may run into the limitation of the python SDK where WriteToBigQuery does not orchestrate multiple
load jobs you hit one of the single load job limitations [BEAM-2801](https://issues.apache.org/jira/browse/BEAM-2801). If you
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,83 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import avro.schema
import datetime
import json

def datetime_to_avro_timestamp(timestamp, micros=True):
"""
This is a convienence function for converting datetime objects to
timestamps in either milliseconds or microseconds since the Unix
Epoch.
Args:
timestamp: (datetime.datetime) to be converted.
micros: (bool) should we use microsecond precision. Default behavior
is millisecond precision. This should be dictated by the avsc file.
"""
_UNIX_EPOCH = datetime.datetime(1970, 1, 1)
_MILLISECONDS_PER_SECOND = 10 ** 3
_MICROSECONDS_PER_SECOND = 10 ** 6
from TimeUtil import datetime_to_epoch_timestamp, date_to_epoch_date, \
time_to_epoch_time

if isinstance(timestamp, unicode):
try:
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S')
except ValueError:
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%f')

seconds_since_epoch = (timestamp - _UNIX_EPOCH).total_seconds()

multiplier = _MICROSECONDS_PER_SECOND if micros else _MILLISECONDS_PER_SECOND

return long(seconds_since_epoch * multiplier)

def date_to_avro_date(date):
"""
This is a convienence function for converting datetime objects to
timestamps in either milliseconds or microseconds since the Unix
Epoch.
Args:
date: (datetime.datetime) to be converted.
micros: (bool) should we use microsecond precision. Default behavior
is millisecond precision. This should be dictated by the avsc file.
"""
_UNIX_EPOCH = datetime.datetime(1970, 1, 1)

if isinstance(date, unicode):
date = datetime.datetime.strptime(date, '%Y-%m-%d')

days_since_epoch = (date - _UNIX_EPOCH).days

return int(days_since_epoch)

def time_to_avro_time(time, micros=True):
"""
This is a convienence function for converting datetime objects to
timestamps in either milliseconds or microseconds since the Unix
Epoch.
Args:
time: (datetime.datetime) to be converted.
micros: (bool) should we use microsecond precision. Default behavior
is millisecond precision. This should be dictated by the avsc file.
"""
_MIDNIGHT = datetime.time(0, 0, 0)
_MILLISECONDS_PER_SECOND = 10 ** 3
_MICROSECONDS_PER_SECOND = 10 ** 6
if isinstance(time, unicode):
try:
time = datetime.datetime.strptime(time, '%H:%M:%S').time()
except:
time = datetime.datetime.strptime(time, '%H:%M:%S.%f').time()

_TODAY = datetime.date.today()

seconds_since_midnight = (datetime.datetime.combine(_TODAY,time)
- datetime.datetime.combine(_TODAY,
_MIDNIGHT)).total_seconds()

multiplier = _MICROSECONDS_PER_SECOND if micros else _MILLISECONDS_PER_SECOND

return long(seconds_since_midnight * multiplier)

def fix_record_for_avro(record, avro_schema):
for field in avro_schema.fields:
Expand All @@ -108,13 +35,17 @@ def fix_record_for_avro(record, avro_schema):
precision = None
if logical_prefix == u'timestamp':
is_micros = (precision == u'micros')
record[field_name] = datetime_to_avro_timestamp(record[field_name],
micros=is_micros)
record[field_name] = datetime_to_epoch_timestamp(
record[field_name],
micros=is_micros
)
elif logical_type == u'date':
record[field_name] = date_to_avro_date(record[field_name])
record[field_name] = date_to_epoch_date(record[field_name])
elif logical_prefix == u'time':
is_micros = (precision == u'micros')
record[field_name] = time_to_avro_time(record[field_name],
micros=is_micros)
# Otherwise, this field was a primitive type and is left alone.

record[field_name] = time_to_epoch_time(
record[field_name],
micros=is_micros
)
return [record]
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pyarrow as pa
import logging
import datetime
from TimeUtil import datetime_to_epoch_timestamp, date_to_epoch_date, \
time_to_epoch_time


def get_pyarrow_translated_schema(string_schema):
"""
Converts string schema dict to pyarrow schema for writing to parquet.
:param string_schema:
:return: pyarrow schema
"""
type_conversions = {
'STRING': pa.string(),
'NUMERIC': pa.int64(),
'BYTE': None,
'INTEGER': pa.int64(),
'FLOAT': pa.float64(),
'NUMERIC': pa.int64(),
'BOOLEAN': pa.bool_(),
'TIMESTAMP': pa.timestamp('us'),
'DATE': pa.date32(),
'TIME': pa.time64('us'),
'DATETIME': pa.timestamp('us'),
'GEOGRAPHY': None,
'RECORD': None
}
pa_schema_list = []
for field in string_schema:
field_type = field['type']
field_name = field['name']
field_mode = field['mode']
converted_field_type = type_conversions[field_type]
if converted_field_type is None:
error_message = 'Error: json schema included a {0:s} field. ' \
'BYTE, GEOGRAPHY, and RECORD types cannot ' \
'currently be used when outputting to ' \
'parquet.'.format(field_type)
logging.error(error_message)
raise ValueError(error_message)
else:
nullable = False if field_mode == 'REQUIRED' else True
pa_field = pa.field(
name=field_name,
type=converted_field_type
#nullable=nullable
)
pa_schema_list.append(pa_field)

return pa.schema(pa_schema_list)


def fix_record_for_parquet(record, schema):
"""
Converts TIMESTAMP, DATETIME, DATE, and TIME types to their respective
types for parquet compatibility.
:param record: record of data from beam pipeline
:param schema: string schema dict.
:return: record with converted TIMESTAMP, DATETIME, DATE, and/or TIME
fields.
"""
for field in schema:
field_name = field["name"]
if field["type"] in ("TIMESTAMP", "DATETIME"):
record[field_name] = int(datetime_to_epoch_timestamp(
record[field_name]
))
elif field["type"] == "DATE":
record[field_name] = int(date_to_epoch_date(
record[field_name]
))
elif field["type"] == "TIME":
try:
record[field_name] = datetime.datetime.strptime(
record[field_name],
'%H:%M:%S'
).time()
except ValueError:
record[field_name] = datetime.datetime.strptime(
record[field_name],
'%H:%M:%S.%f'
).time()

return [record]
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,10 @@ def parse_data_generator_args(argv):
help='This is an avro schema file to use for writing'
'data to avro on gcs.', default=None)

parser.add_argument('--write_to_parquet', dest='write_to_parquet',
help='This is a flag for writing to parquet on gcs.',
action="store_true")

parser.add_argument('--gcs_output_prefix', dest='output_prefix',
help='GCS path for output', default=None)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime


def datetime_to_epoch_timestamp(timestamp, micros=True):
"""
This is a convienence function for converting datetime objects to
timestamps in either milliseconds or microseconds since the Unix
Epoch.
Args:
timestamp: (datetime.datetime) to be converted.
micros: (bool) should we use microsecond precision. Default behavior
is millisecond precision. This should be dictated by the avsc file.
"""
_UNIX_EPOCH = datetime.datetime(1970, 1, 1)
_MILLISECONDS_PER_SECOND = 10 ** 3
_MICROSECONDS_PER_SECOND = 10 ** 6

if isinstance(timestamp, unicode):
try:
timestamp = datetime.datetime.strptime(timestamp,
'%Y-%m-%dT%H:%M:%S')
except ValueError:
timestamp = datetime.datetime.strptime(timestamp,
'%Y-%m-%dT%H:%M:%S.%f')

seconds_since_epoch = (timestamp - _UNIX_EPOCH).total_seconds()

multiplier = _MICROSECONDS_PER_SECOND if micros \
else _MILLISECONDS_PER_SECOND

return long(seconds_since_epoch * multiplier)


def date_to_epoch_date(date):
"""
This is a convienence function for converting datetime objects to
timestamps in either milliseconds or microseconds since the Unix
Epoch.
Args:
date: (datetime.datetime) to be converted.
micros: (bool) should we use microsecond precision. Default behavior
is millisecond precision. This should be dictated by the avsc file.
"""
_UNIX_EPOCH = datetime.datetime(1970, 1, 1)

if isinstance(date, unicode):
date = datetime.datetime.strptime(date, '%Y-%m-%d')

days_since_epoch = (date - _UNIX_EPOCH).days

return int(days_since_epoch)


def time_to_epoch_time(time, micros=True):
"""
This is a convienence function for converting datetime objects to
timestamps in either milliseconds or microseconds since the Unix
Epoch.
Args:
time: (datetime.datetime) to be converted.
micros: (bool) should we use microsecond precision. Default behavior
is millisecond precision. This should be dictated by the avsc file.
"""
_MIDNIGHT = datetime.time(0, 0, 0)
_MILLISECONDS_PER_SECOND = 10 ** 3
_MICROSECONDS_PER_SECOND = 10 ** 6
if isinstance(time, unicode):
try:
time = datetime.datetime.strptime(time, '%H:%M:%S').time()
except ValueError:
time = datetime.datetime.strptime(time, '%H:%M:%S.%f').time()

_TODAY = datetime.date.today()

seconds_since_midnight = (datetime.datetime.combine(_TODAY, time)
- datetime.datetime.combine(_TODAY,
_MIDNIGHT)
).total_seconds()

multiplier = _MICROSECONDS_PER_SECOND if micros \
else _MILLISECONDS_PER_SECOND

return long(seconds_since_midnight * multiplier)
Loading

0 comments on commit 80aca86

Please sign in to comment.