Skip to content

Commit

Permalink
Merge branch 'master' into support-routing
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelwsherman authored Jun 25, 2020
2 parents 1c1ce4d + 5e11a3e commit 7619311
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 221 deletions.
21 changes: 15 additions & 6 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
steps:
- id: 'lint'
name: 'gcr.io/$PROJECT_ID/make'
args: ['test']
- id: lint
name: gcr.io/$PROJECT_ID/make
args: [test]
waitFor: ['-']
- id: 'tools/hive-bigquery'
name: 'google/cloud-sdk'
args: ['gcloud', 'builds', 'submit', 'tools/hive-bigquery/', '--config=tools/hive-bigquery/cloudbuild.yaml']
- id: tools/hive-bigquery
name: google/cloud-sdk
args: [gcloud, builds, submit, tools/hive-bigquery/, --config=tools/hive-bigquery/cloudbuild.yaml]
waitFor: ['-']
- id: examples/cloud-composer-examples
name: google/cloud-sdk
args: [gcloud, builds, submit, examples/cloud-composer-examples/,
--config=examples/cloud-composer-examples/cloudbuild.yaml]
waitFor: ['-']
- id: examples/dataflow-python-examples
name: google/cloud-sdk
args: [gcloud, builds, submit, examples/dataflow-python-examples, --config=examples/dataflow-python-examples/cloudbuild.yaml]
waitFor: ['-']
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ FROM apache/beam_python3.7_sdk:${BEAM_VERSION}

COPY . ./

ENV PIP_DISABLE_PIP_VERSION_CHECK=1
RUN pip3 install -r requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" data_enrichment.py demonstrates a Dataflow pipeline which reads a file and
writes its contents to a BigQuery table. Along the way, data from BigQuery
is read in as a side input and joined in with the primary data from the file.
"""


import argparse
import csv
import logging
Expand All @@ -40,7 +38,8 @@ def __init__(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
self.schema_str = ''
# This is the schema of the destination table in BigQuery.
schema_file = os.path.join(dir_path, 'resources', 'usa_names_with_full_state_name.json')
schema_file = os.path.join(dir_path, 'resources',
'usa_names_with_full_state_name.json')
with open(schema_file) \
as f:
data = f.read()
Expand Down Expand Up @@ -113,16 +112,20 @@ def run(argv=None):
# Here we add some specific command line arguments we expect. Specifically
# we have the input file to load and the output table to write to.
parser.add_argument(
'--input', dest='input', required=False,
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# Useful for quickly debugging on a small set of data
default='gs://python-dataflow-example/data_files/head_usa_names.csv')
# The output defaults to the lake dataset in your BigQuery project. You'll have
# to create the lake dataset yourself using this command:
# bq mk lake
parser.add_argument('--output', dest='output', required=False,
parser.add_argument('--output',
dest='output',
required=False,
help='Output BQ table to write results to.',
default='lake.usa_names_enriched')

Expand Down Expand Up @@ -157,14 +160,13 @@ def add_full_state_name(row, short_to_long_name_map):
`python-dataflow-example.example_data.state_abbreviations`"""

state_abbreviations = (
p
| 'Read from BigQuery' >> beam.io.Read(
p | 'Read from BigQuery' >> beam.io.Read(
beam.io.BigQuerySource(query=read_query, use_standard_sql=True))
# We must create a python tuple of key to value pairs here in order to
# use the data as a side input. Dataflow will use the keys to distribute the
# work to the correct worker.
| 'Abbreviation to Full Name' >> beam.Map(
lambda row: (row['state_abbreviation'], row['state_name'])))
| 'Abbreviation to Full Name' >>
beam.Map(lambda row: (row['state_abbreviation'], row['state_name'])))

(p
# Read the file. This is the source of the pipeline. All further
Expand All @@ -176,26 +178,25 @@ def add_full_state_name(row, short_to_long_name_map):
# Translates from the raw string data in the CSV to a dictionary.
# The dictionary is a keyed by column names with the values being the values
# we want to store in BigQuery.
| 'String to BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'String to BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
# Here we pass in a side input, which is data that comes from outside our
# CSV source. The side input contains a map of states to their full name.
| 'Join Data' >> beam.Map(add_full_state_name, AsDict(
state_abbreviations))
| 'Join Data' >> beam.Map(add_full_state_name, AsDict(state_abbreviations))
# This is the final stage of the pipeline, where we define the destination
# of the data. In this case we are writing to BigQuery.
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the JSON schema read in from a JSON file.
# Specifying the schema allows the API to create the table correctly if it does not yet exist.
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the JSON schema read in from a JSON file.
# Specifying the schema allows the API to create the table correctly if it does not yet exist.
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
This example does not do any transformation on the data.
"""


import argparse
import logging
import re

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class DataIngestion:
"""A helper class which contains the logic to translate the file into
a format BigQuery will accept."""

def parse_method(self, string_input):
"""This method translates a single line of comma separated values to a
dictionary which can be loaded into BigQuery.
Expand All @@ -51,8 +52,8 @@ def parse_method(self, string_input):
}
"""
# Strip out carriage return, newline and quote characters.
values = re.split(",",
re.sub('\r\n', '', re.sub('"', '', string_input)))
values = re.split(",", re.sub('\r\n', '', re.sub('"', '',
string_input)))
row = dict(
zip(('state', 'gender', 'year', 'name', 'number', 'created_date'),
values))
Expand Down Expand Up @@ -112,8 +113,8 @@ def run(argv=None):
# be run in parallel on different workers using input from the
# previous stage of the pipeline.
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.Map(lambda s: data_ingestion.parse_method(s)) |
'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,19 @@
import json
import logging
import os

import apache_beam as beam

from collections import OrderedDict

from apache_beam.io.gcp.internal.clients.bigquery import (TableSchema,
TableFieldSchema)
import apache_beam as beam
from apache_beam.io.gcp.internal.clients.bigquery import (TableFieldSchema,
TableSchema)
from google.api_core.exceptions import InvalidArgument
from google.auth.exceptions import GoogleAuthError
from google.cloud import datastore


class FileCoder:
"""Encode and decode CSV data coming from the files."""

def __init__(self, columns):
self._columns = columns
self._num_columns = len(columns)
Expand All @@ -53,24 +52,28 @@ def encode(self, value):
import csv
import io
st = io.StringIO()
cw = csv.DictWriter(st, self._columns,
delimiter=self._delimiter,
quotechar='"',
quoting=csv.QUOTE_MINIMAL)
cw = csv.DictWriter(st,
self._columns,
delimiter=self._delimiter,
quotechar='"',
quoting=csv.QUOTE_MINIMAL)
cw.writerow(value)
return st.getvalue().strip('\r\n')

def decode(self, value):
import csv
import io
st = io.StringIO(value)
cr = csv.DictWriter(st, self._columns,
delimiter=self._delimiter,
quotechar='"',
quoting=csv.QUOTE_MINIMAL)
cr = csv.DictWriter(st,
self._columns,
delimiter=self._delimiter,
quotechar='"',
quoting=csv.QUOTE_MINIMAL)
return next(cr)


class PrepareFieldTypes(beam.DoFn):

def __init__(self, encoding='utf-8', time_format='%Y-%m-%d %H:%M:%S %Z'):
import importlib
self._encoding = encoding
Expand Down Expand Up @@ -124,26 +127,28 @@ def process(self, element, fields):
for fmt in (self._time_format):
try:
v = int(self._tm.mktime(self._tm.strptime(v, fmt)))
except ValueError as e:
except ValueError:
pass
else:
break
if not isinstance(v, int):
logging.warn('Cannot convert date %s. Error: %s' %
(v, e))
logging.warning(
'Cannot convert date %s. Expected value of type int'
% v)
v = self._return_default_value(ftype)
else:
logging.warn('Unknown field type %s' % ftype)
logging.warning('Unknown field type %s' % ftype)
v = self._return_default_value(ftype)
except (TypeError, ValueError) as e:
logging.warn('Cannot convert type %s for element %s: '
'%s. Returning default value.' % (ftype, v, e))
logging.warning('Cannot convert type %s for element %s: '
'%s. Returning default value.' % (ftype, v, e))
v = self._return_default_value(ftype)
element[k] = v
return [element]


class InjectTimestamp(beam.DoFn):

def process(self, element):
import time
element['_RAWTIMESTAMP'] = int(time.mktime(time.gmtime()))
Expand Down Expand Up @@ -175,12 +180,11 @@ def run(argv=None):
"""The main function which creates the pipeline and runs it"""

parser = argparse.ArgumentParser()
parser.add_argument(
'--input-bucket',
dest='input_bucket',
required=True,
default='data-daimlr',
help='GS bucket_name where the input files are present')
parser.add_argument('--input-bucket',
dest='input_bucket',
required=True,
default='data-daimlr',
help='GS bucket_name where the input files are present')
parser.add_argument(
'--input-path',
dest='input_path',
Expand All @@ -191,12 +195,11 @@ def run(argv=None):
dest='input_files',
required=True,
help='Comma delimited names of all input files to be imported')
parser.add_argument(
'--bq-dataset',
dest='bq_dataset',
required=True,
default='rawdata',
help='Output BQ dataset to write the results to')
parser.add_argument('--bq-dataset',
dest='bq_dataset',
required=True,
default='rawdata',
help='Output BQ dataset to write the results to')

# Parse arguments from the command line
known_args, pipeline_args = parser.parse_known_args(argv)
Expand Down Expand Up @@ -228,13 +231,12 @@ def run(argv=None):
])
logging.info('GS path being read from: %s' % (gs_path))

(p
| 'Read From Text - ' + input_file >> beam.io.ReadFromText(
gs_path, coder=FileCoder(list(fields.keys())), skip_header_lines=1)
(p | 'Read From Text - ' + input_file >> beam.io.ReadFromText(
gs_path, coder=FileCoder(list(fields.keys())), skip_header_lines=1)
| 'Prepare Field Types - ' + input_file >> beam.ParDo(
PrepareFieldTypes(), fields)
| 'Inject Timestamp - ' + input_file >> beam.ParDo(InjectTimestamp())
| 'Write to BigQuery - ' + input_file >> beam.io.Write(
PrepareFieldTypes(), fields) |
'Inject Timestamp - ' + input_file >> beam.ParDo(InjectTimestamp()) |
'Write to BigQuery - ' + input_file >> beam.io.Write(
beam.io.BigQuerySink(
# The table name passed in from the command line
known_args.bq_dataset + '.' + table_name,
Expand Down
Loading

0 comments on commit 7619311

Please sign in to comment.