Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
henryken authored Jun 26, 2020
2 parents 74a7947 + 5e11a3e commit 94695ae
Show file tree
Hide file tree
Showing 141 changed files with 4,355 additions and 665 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The examples folder contains example solutions across a variety of Google Cloud
* [QAOA](examples/qaoa) - Examples of parsing a max-SAT problem in a proprietary format.
* [Redis Cluster on GKE Example](examples/redis-cluster-gke) - Deploying Redis cluster on GKE.
* [Spinnaker](examples/spinnaker) - Example pipelines for a Canary / Production deployment process.
* [TensorFlow Unit Testing](examples/tensorflow-unit-testing) - Examples how to write unit tests for TensorFlow ML models.
* [Uploading files directly to Google Cloud Storage by using Signed URL](examples/direct-upload-to-gcs) - Example architecture to enable uploading files directly to GCS by using [Signed URL](https://cloud.google.com/storage/docs/access-control/signed-urls).

## Tools
Expand Down Expand Up @@ -91,6 +92,7 @@ The tools folder contains ready-made utilities which can simpilfy Google Cloud P
* [Site Verification Group Sync](tools/site-verification-group-sync) - A tool to provision "verified owner" permissions (to create GCS buckets with custom dns) based on membership of a Google Group.
* [SLO Generator](tools/slo-generator/) - A Python package that automates computation of Service Level Objectives, Error Budgets and Burn Rates on GCP, and export the computation results to available exporters (e.g: PubSub, BigQuery, Stackdriver Monitoring), using policies written in JSON format.
* [Snowflake_to_BQ](tools/snowflake2bq/) - A shell script to transfer tables (schema & data) from Snowflake to BigQuery.
* [Webhook Ingestion Data Pipeline](tools/webhook-ingestion-pipeline) - A deployable app to accept and ingest unauthenticated webhook data to BigQuery.

## Contributing
See the contributing [instructions](/CONTRIBUTING.md) to get started contributing.
Expand Down
18 changes: 16 additions & 2 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
steps:
- name: 'gcr.io/$PROJECT_ID/make'
args: ['test']
- id: lint
name: gcr.io/$PROJECT_ID/make
args: [test]
waitFor: ['-']
- id: tools/hive-bigquery
name: google/cloud-sdk
args: [gcloud, builds, submit, tools/hive-bigquery/, --config=tools/hive-bigquery/cloudbuild.yaml]
waitFor: ['-']
- id: examples/cloud-composer-examples
name: google/cloud-sdk
args: [gcloud, builds, submit, examples/cloud-composer-examples/,
--config=examples/cloud-composer-examples/cloudbuild.yaml]
waitFor: ['-']
- id: examples/dataflow-python-examples
name: google/cloud-sdk
args: [gcloud, builds, submit, examples/dataflow-python-examples, --config=examples/dataflow-python-examples/cloudbuild.yaml]
waitFor: ['-']
144 changes: 144 additions & 0 deletions examples/cloud-composer-examples/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Ignore fileds downloaded by code lab
get_client_id.py
iap_requirements.txt
make_iap_request.py


# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/
7 changes: 7 additions & 0 deletions examples/cloud-composer-examples/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ARG BEAM_VERSION=2.19.0

FROM apache/beam_python3.7_sdk:${BEAM_VERSION}

COPY . ./

RUN pip3 install -r requirements.txt
25 changes: 25 additions & 0 deletions examples/cloud-composer-examples/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
steps:
###########################################################
# Step 0: Pull image if exists
###########################################################
- name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args:
- '-c'
- |
docker pull gcr.io/$PROJECT_ID/cloud-composer-examples:latest || exit 0
###########################################################
# Step 1: Create a Docker image with Python installed
###########################################################
- name: 'gcr.io/cloud-builders/docker'
args:
- 'build'
- '--tag=gcr.io/$PROJECT_ID/cloud-composer-examples:latest'
- '--cache-from=gcr.io/$PROJECT_ID/cloud-composer-examples:latest'
- '.'
###########################################################
# Step 2: Run unit tests
###########################################################
- name: 'gcr.io/$PROJECT_ID/cloud-composer-examples'
entrypoint: 'bash'
args: ["./run_tests.sh"]
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@
# path.) #
##################################################################

# These are stored as a Variables in our Airflow Environment.
BUCKET = Variable.get('gcs_bucket') # GCS bucket with our data.
OUTPUT_TABLE = Variable.get(
'bq_output_table') # BigQuery table to which results will be written

# Path to python script that does data manipulation
PYSPARK_JOB = 'gs://' + BUCKET + '/spark-jobs/spark_avg_speed.py'

# Airflow parameters, see https://airflow.incubator.apache.org/code.html
DEFAULT_DAG_ARGS = {
'owner': 'airflow', # The owner of the task.
Expand Down Expand Up @@ -86,31 +78,33 @@
# ds_nodash is an airflow macro for "[Execution] Date string no dashes"
# in YYYYMMDD format. See docs https://airflow.apache.org/code.html?highlight=macros#macros
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
image_version='1.5-debian10',
num_workers=2,
storage_bucket=Variable.get('dataproc_bucket'),
zone=Variable.get('gce_zone'))

# Submit the PySpark job.
submit_pyspark = DataProcPySparkOperator(
task_id='run_dataproc_pyspark',
main=PYSPARK_JOB,
main='gs://' + Variable.get('gcs_bucket') +
'/spark-jobs/spark_avg_speed.py',
# Obviously needs to match the name of cluster created in the prior Operator.
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
# Let's template our arguments for the pyspark job from the POST payload.
arguments=[
"--gcs_path_raw={{ dag_run.conf['raw_path'] }}",
"--gcs_path_transformed=gs://" + BUCKET +
"--gcs_path_transformed=gs://{{ var.value.gcs_bucket}}" +
"/{{ dag_run.conf['transformed_path'] }}"
])

# Load the transformed files to a BigQuery table.
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='GCS_to_BigQuery',
bucket=BUCKET,
bucket='{{ var.value.gcs_bucket }}',
# Wildcard for objects created by spark job to be written to BigQuery
# Reads the relative path to the objects transformed by the spark job from the POST message.
source_objects=["{{ dag_run.conf['transformed_path'] }}/part-*"],
destination_project_dataset_table=OUTPUT_TABLE,
destination_project_dataset_table='{{ var.value.bq_output_table }}',
schema_fields=None,
schema_object=
'schemas/nyc-tlc-yellow.json', # Relative gcs path to schema file.
Expand All @@ -132,17 +126,18 @@
# Delete gcs files in the timestamped transformed folder.
delete_transformed_files = BashOperator(
task_id='delete_transformed_files',
bash_command="gsutil -m rm -r gs://" + BUCKET +
bash_command="gsutil -m rm -r gs://{{ var.value.gcs_bucket }}" +
"/{{ dag_run.conf['transformed_path'] }}/")

# If the spark job or BQ Load fails we rename the timestamped raw path to
# a timestamped failed path.
move_failed_files = BashOperator(task_id='move_failed_files',
bash_command="gsutil mv gs://" + BUCKET +
"/{{ dag_run.conf['raw_path'] }}/ " +
"gs://" + BUCKET +
"/{{ dag_run.conf['failed_path'] }}/",
trigger_rule=TriggerRule.ONE_FAILED)
move_failed_files = BashOperator(
task_id='move_failed_files',
bash_command="gsutil mv gs://{{ var.value.gcs_bucket }}" +
"/{{ dag_run.conf['raw_path'] }}/ " +
"gs://{{ var.value.gcs_bucket}}" +
"/{{ dag_run.conf['failed_path'] }}/",
trigger_rule=TriggerRule.ONE_FAILED)
# Set the dag property of the first Operators, this will be inherited by downstream Operators.

create_cluster.dag = dag
Expand Down
Loading

0 comments on commit 94695ae

Please sign in to comment.