Skip to content
Merged

Demo #45

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ help: ## Prints help for targets with comments

.PHONY: test
test: ## Test if all files are properly formatted
@$$SHELL ./helpers/check_format.sh && python3 -m flake8 --max-line-length=80 && ./helpers/run_tests.sh
@$$SHELL ./helpers/check_format.sh && python3 -m flake8 --max-line-length=100 && ./helpers/run_tests.sh

.PHONY: precommit
precommit: ## Test if all files are properly formatted
@$$SHELL ./helpers/check_format.sh && python3 -m flake8 --max-line-length=80 && ./helpers/run_relevant_cloudbuilds.sh precommit_cloudbuild.yaml
@$$SHELL ./helpers/check_format.sh && python3 -m flake8 --max-line-length=100 && ./helpers/run_relevant_cloudbuilds.sh precommit_cloudbuild.yaml

.PHONY: push_ci_image
push_ci_image:
@cd ci && gcloud builds submit --project=datapipelines-ci --tag gcr.io/datapipelines-ci/make .
@cd ci && gcloud builds submit --project=datapipelines-ci-282719 --tag gcr.io/datapipelines-ci-282719/make .

.PHONY: push_deploydags_image
push_deploydags_image:
@cd composer/cloudbuild/go/dagsdeployer && gcloud builds submit --project=datapipelines-ci --tag gcr.io/datapipelines-ci/deploydags .
@cd composer/cloudbuild/go/dagsdeployer && gcloud builds submit --project=datapipelines-ci-282719 --tag gcr.io/datapipelines-ci-282719/deploydags .

2 changes: 1 addition & 1 deletion cd/prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ steps:
'deploy-wordcount-jar'
]
args: [
'-dagList=./config/ci_dags.txt',
'-dagList=./config/running_dags.txt',
'-dagsFolder=./dags',
'-project=${PROJECT_ID}',
'-region=${_COMPOSER_REGION}',
Expand Down
4 changes: 3 additions & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ FROM python:buster
# install core tools
RUN apt-get update && apt-get install -y build-essential

RUN curl -sSL https://sdk.cloud.google.com | bash

# install shellcheck
RUN apt-get install shellcheck

# install yapf
RUN pip3 install yapf flake8 pytest
RUN pip3 install yapf flake8 pytest apache-airflow[gcp]==1.10.6

# install golang (+gofmt)
RUN apt-get install -y golang
Expand Down
4 changes: 2 additions & 2 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ steps:
dir: './composer/cloudbuild/go/dagsdeployer'
args: [
'build',
'-t', 'gcr.io/${_ARTIFACTS_PROJECT_ID}/deploydags:latest',
'-t', 'gcr.io/${PROJECT_ID}/deploydags:latest',
'--cache-from', 'gcr.io/${_ARTIFACTS_PROJECT_ID}/deploydags:latest',
'.'
]
Expand All @@ -201,7 +201,7 @@ steps:
'clean-up-data-dir-dags',
]
args: [
'-dagList=./config/ci_dags.txt',
'-dagList=./config/running_dags.txt',
'-dagsFolder=./dags',
'-project=${PROJECT_ID}',
'-region=${_COMPOSER_REGION}',
Expand Down
2 changes: 1 addition & 1 deletion composer/cloudbuild/bin/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function install_airflow() {
# $1 relative path to directory containing bigquery sql.
# $2 relative path to JSON file contianing Airflow Variables.
main() {
setup_local_airflow "$1" "$2" "$3"
setup_local_airflow "$1" "$2" "$3"
run_tests
TEST_STATUS=$?
clean_up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ func readCommentScrubbedLines(path string) ([]string, error) {
lines = append(lines, candidate)
}
}
log.Printf("scrubbed lines: %#v", lines)

return lines, scanner.Err()
}
Expand All @@ -212,15 +211,18 @@ func FindDagFilesInLocalTree(dagsRoot string, dagNames map[string]bool) (map[str
// This should map a dir to the ignore patterns in it's airflow ignore if relevant
// this allows us to easily identify the patterns relevant to this dir and it's parents, grandparents, etc.
airflowignoreTree := make(map[string][]string)
files, err := ioutil.ReadDir(dagsRoot)
_, err := ioutil.ReadDir(dagsRoot)
if err != nil {
return matches, fmt.Errorf("error reading dagRoot: %v. %v", dagsRoot, err)
}
log.Printf("found files at dagRott: %v", files)
filepath.Walk(dagsRoot, func(path string, info os.FileInfo, err error) error {
dagID := strings.TrimSuffix(info.Name(), ".py")
relPath, err := filepath.Rel(dagsRoot, path)

if info == nil {
dur, _ := time.ParseDuration("5s")
time.Sleep(dur)
}
// resepect .airflowignore
if info.Name() == ".airflowignore" {
log.Printf("found %v, adding to airflowignoreTree", path)
Expand Down Expand Up @@ -325,7 +327,6 @@ func FindDagFilesInLocalTree(dagsRoot string, dagNames map[string]bool) (map[str
}
}
if !alreadyMatched {
log.Printf("new match for %v: %v", dagID, relPath)
matches[dagID] = append(matches[dagID], relPath)
}
}
Expand Down Expand Up @@ -461,8 +462,6 @@ func (c *ComposerEnv) stopDag(dag string, relPath string, pauseOnly bool, wg *sy
if err != nil {
return fmt.Errorf("error pausing dag %v: %v", dag, string(out))
}
log.Printf("pauseOnly: %#v", pauseOnly)
log.Printf("!pauseOnly: %#v", !pauseOnly)
if !pauseOnly {
log.Printf("parsing gcs url %v", c.DagBucketPrefix)
gcs, err := url.Parse(c.DagBucketPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func parseGcsPath(gcsPath string) (bucket string, path string, err error) {
}
func gcsMD5(gcsPath string) ([]byte, error) {
bktName, path, err := parseGcsPath(gcsPath)
log.Printf("taking hash of bucket: %s, path: %s", bktName, path)
if err != nil {
log.Fatalf("%s", err)
}
Expand Down
6 changes: 5 additions & 1 deletion composer/config/AirflowVariables.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@
"gcs_input_bucket": "${INPUT_BUCKET}",
"gcs_ref_bucket": "${REF_BUCKET}",
"gcs_output_bucket": "${RESULT_BUCKET}",
"dataflow_staging_bucket": "${DATAFLOW_STAGING_BUCKET}"
"dataflow_staging_bucket": "${DATAFLOW_STAGING_BUCKET}",
"dataproc_bucket": "${DATFLOW_STAGING_BUCKET}",
"gce_zone": "${COMPOSER_REGION}-a",
"gcs_bucket": "spark_bucket",
"bq_output_table": "${GCP_PROJECT_ID}.nyc_taxi.avg_speed"
}
2 changes: 0 additions & 2 deletions composer/config/ci_dags.txt

This file was deleted.

22 changes: 22 additions & 0 deletions composer/config/running_dags.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
wordcount_dag
tutorial
ephemeral_dataproc_spark_dag
bash_operator
branch_operator
branch_python_dop_operator_3
complex
http_operator
kubernetes_executor_config
latest_only
latest_only_with_trigger
nested_branch_dag
passing_params_via_test_command
pig_operator
python_operator
short_circuit_operator
skip_dag
subdag_operator
trigger_controller_dag
trigger_target_dag
tutorial
xcom
160 changes: 160 additions & 0 deletions composer/dags/ephemeral_dataproc_spark_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.dataproc_operator import (
DataprocClusterCreateOperator,
DataProcPySparkOperator,
DataprocClusterDeleteOperator)
from airflow.contrib.operators.gcs_to_bq import (
GoogleCloudStorageToBigQueryOperator)
from airflow.operators import BashOperator
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule

##################################################################
# This file defines the DAG for the logic pictured below. #
##################################################################
# #
# create_cluster #
# | #
# V #
# submit_pyspark....... #
# | . #
# / \ V #
# / \ move_failed_files #
# / \ ^ #
# | | . #
# V V . #
# delete_cluster bq_load..... #
# | #
# V #
# delete_transformed_files #
# #
# (Note: Dotted lines indicate conditional trigger rule on #
# failure of the up stream tasks. In this case the files in the #
# raw-{timestamp}/ GCS path will be moved to a failed-{timestamp}#
# path.) #
##################################################################

# Airflow parameters, see https://airflow.incubator.apache.org/code.html
DEFAULT_DAG_ARGS = {
'owner': 'jferriero@google.com', # The owner of the task.
# Task instance should not rely on the previous task's schedule to succeed.
'depends_on_past': False,
# We use this in combination with schedule_interval=None to only trigger the
# DAG with a POST to the REST API.
# Alternatively, we could set this to yesterday and the dag will be
# triggered upon upload to the dag folder.
'start_date': datetime(2020, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # Retry once before failing the task.
'retry_delay': timedelta(minutes=5), # Time between retries.
'project_id': Variable.get('gcp_project'), # Cloud Composer project ID.
# We only want the DAG to run when we POST to the api.
# Alternatively, this could be set to '@daily' to run the job once a day.
# more options at https://airflow.apache.org/scheduler.html#dag-runs
}

# Create Directed Acyclic Graph for Airflow
with DAG('ephemeral_dataproc_spark_dag', default_args=DEFAULT_DAG_ARGS,
schedule_interval=None) as dag: # Here we are using dag as context.
# Create the Cloud Dataproc cluster.
# Note: this operator will be flagged a success if the cluster by this name
# already exists.
create_cluster = DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# ds_nodash is an airflow macro for "[Execution] Date string no dashes"
# in YYYYMMDD format.
# See docs https://airflow.apache.org/code.html?highlight=macros#macros
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
image_version='1.5-debian10',
num_workers=2,
storage_bucket=Variable.get('dataproc_bucket'),
zone=Variable.get('gce_zone'))

# Submit the PySpark job.
submit_pyspark = DataProcPySparkOperator(
task_id='run_dataproc_pyspark',
main='gs://' + Variable.get('gcs_bucket') +
'/spark-jobs/spark_avg_speed.py',
# Obviously needs to match the name of cluster created in the prior
# Operator.
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
# Let's template our arguments for the pyspark job from the POST
# payload.
arguments=[
"--gcs_path_raw={{ dag_run.conf['raw_path'] }}",
"--gcs_path_transformed=gs://{{ var.value.gcs_bucket}}" +
"/{{ dag_run.conf['transformed_path'] }}"
])

# Load the transformed files to a BigQuery table.
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='GCS_to_BigQuery',
bucket='{{ var.value.gcs_bucket }}',
# Wildcard for objects created by spark job to be written to BigQuery
# Reads the relative path to the objects transformed by the spark job
# from the POST message.
source_objects=["{{ dag_run.conf['transformed_path'] }}/part-*"],
destination_project_dataset_table='{{ var.value.bq_output_table }}',
schema_fields=None,
# Relative gcs path to schema file.
schema_object='schemas/nyc-tlc-yellow.json',
# Note that our spark job does json -> csv conversion.
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_TRUNCATE', # If the table exists, overwrite it
max_bad_records=0)

# Delete the Cloud Dataproc cluster.
delete_cluster = DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
# Obviously needs to match the name of cluster created in the prior two
# Operators.
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
# This will tear down the cluster even if there are failures in upstream
# tasks.
trigger_rule=TriggerRule.ALL_DONE)

# Delete gcs files in the timestamped transformed folder.
delete_transformed_files = BashOperator(
task_id='delete_transformed_files',
bash_command="gsutil -m rm -r gs://{{ var.value.gcs_bucket }}" +
"/{{ dag_run.conf['transformed_path'] }}/")

# If the spark job or BQ Load fails we rename the timestamped raw path to
# a timestamped failed path.
move_failed_files = BashOperator(
task_id='move_failed_files',
bash_command="gsutil mv gs://{{ var.value.gcs_bucket }}" +
"/{{ dag_run.conf['raw_path'] }}/ " + "gs://{{ var.value.gcs_bucket}}" +
"/{{ dag_run.conf['failed_path'] }}/",
trigger_rule=TriggerRule.ONE_FAILED)
# Set the dag property of the first Operators, this will be inherited by
# downstream Operators.

create_cluster.dag = dag

create_cluster.set_downstream(submit_pyspark)

submit_pyspark.set_downstream([delete_cluster, bq_load])

bq_load.set_downstream(delete_transformed_files)

move_failed_files.set_upstream([bq_load, submit_pyspark])
18 changes: 18 additions & 0 deletions composer/dags/example_dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading