Skip to content

Commit

Permalink
cleanup: Upgrade Composer samples to Airflow 2.0 (#5782)
Browse files Browse the repository at this point in the history
* modify dag_test_utils to point to branch; add airflow 2 changes

* upgrade airflow_db_cleanup to Airflow 2.0

* upgrade hadoop tutorial to Airflow 2.0

* upgrade kubernetespodoperator to Airflow 2.0

* upgrade dataproc workflow template to Airflow 2.0

* remove bashoperator python2 example

* upgrade DataflowTemplateOperator tutorial to 2.0

* WIP: upgrade bq_notify to Airflow 2

* upgrade bq_copy_across_locations to Airflow 2.0

* upgrade connections to Airflow 2

* more airflow 2.0 migration

* db reset update

* copy constraints from Airflow

* update noxfile config

* update constraints to py-3.8

* add pip version override

* upgrade simple to airflow 2

* fix lint

* bqnotify cleanup

* add blog/ conftest

* fix license header

* more license header fixes

* license again

* remove unused import from blog conftest

* remove unused requirements

* remove script used by dag deleted in this PR

* Add clarifying comment about email operator

* add newline back to csv

* fix presumed typo

Co-authored-by: Dina Graves Portman <dinagraves@google.com>
  • Loading branch information
leahecole and dinagraves authored May 27, 2021
1 parent 5eecc5e commit 675e55f
Show file tree
Hide file tree
Showing 25 changed files with 729 additions and 301 deletions.
39 changes: 39 additions & 0 deletions composer/blog/gcp-tech-blog/unit-test-dags-cloud-build/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import os

import pytest


# this fixture initializes the Airflow DB once per session
# it is used by DAGs in this blog post code only
@pytest.fixture(scope="session")
def airflow_database():
import airflow.utils.db

# We use separate directory for local db path per session
# by setting AIRFLOW_HOME env var, which is done in noxfile_config.py.

assert('AIRFLOW_HOME' in os.environ)

airflow_home = os.environ["AIRFLOW_HOME"]
airflow_db = f"{airflow_home}/airflow.db"

# reset both resets and initializes a new database
airflow.utils.db.resetdb(rbac=None)

# Making sure we are using a data file there.
assert(os.path.isfile(airflow_db))
25 changes: 23 additions & 2 deletions composer/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2019 Google LLC All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,27 @@
import pytest


# this fixture initializes the Airflow DB once per session
# it is used by DAGs in both the blogs and workflows directories,
# unless there exists a conftest at a lower level
@pytest.fixture(scope="session")
def airflow_database():
import airflow.utils.db

# We use separate directory for local db path per session
# by setting AIRFLOW_HOME env var, which is done in noxfile_config.py.

assert('AIRFLOW_HOME' in os.environ)

airflow_home = os.environ["AIRFLOW_HOME"]
airflow_db = f"{airflow_home}/airflow.db"

# reset both resets and initializes a new database
airflow.utils.db.resetdb()

# Making sure we are using a data file there.
assert(os.path.isfile(airflow_db))

# this fixture initializes the Airflow DB once per session
# it is used by DAGs in both the blogs and workflows directories
@pytest.fixture(scope="session")
Expand All @@ -34,7 +55,7 @@ def airflow_database():
airflow_db = f"{airflow_home}/airflow.db"

# reset both resets and initializes a new database
airflow.utils.db.resetdb(rbac=None) # this command will change in Airflow 2.0
airflow.utils.db.resetdb()

# Making sure we are using a data file there.
assert(os.path.isfile(airflow_db))
15 changes: 0 additions & 15 deletions composer/data/python2_script.py

This file was deleted.

4 changes: 2 additions & 2 deletions composer/workflows/airflow_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
import airflow
from airflow import settings
from airflow.configuration import conf
from airflow.jobs import BaseJob
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \
TaskInstance, Variable, XCom
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
import dateutil.parser
from sqlalchemy import and_, func
from sqlalchemy.exc import ProgrammingError
Expand Down
44 changes: 0 additions & 44 deletions composer/workflows/bashoperator_python2.py

This file was deleted.

26 changes: 0 additions & 26 deletions composer/workflows/bashoperator_python2_test.py

This file was deleted.

18 changes: 9 additions & 9 deletions composer/workflows/bq_copy_across_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import logging

from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import gcs_to_gcs
from airflow.operators import dummy_operator
from airflow.operators import dummy
from airflow.providers.google.cloud.transfers import bigquery_to_gcs
from airflow.providers.google.cloud.transfers import gcs_to_bigquery
from airflow.providers.google.cloud.transfers import gcs_to_gcs


# --------------------------------------------------------------------------------
Expand Down Expand Up @@ -127,12 +127,12 @@ def read_table_list(table_list_file):
'composer_sample_bq_copy_across_locations',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy_operator.DummyOperator(
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)

end = dummy_operator.DummyOperator(
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
Expand All @@ -148,7 +148,7 @@ def read_table_list(table_list_file):
table_source = record['table_source']
table_dest = record['table_dest']

BQ_to_GCS = bigquery_to_gcs.BigQueryToCloudStorageOperator(
BQ_to_GCS = bigquery_to_gcs.BigQueryToGCSOperator(
# Replace ":" with valid character for Airflow task
task_id='{}_BQ_to_GCS'.format(table_source.replace(":", "_")),
source_project_dataset_table=table_source,
Expand All @@ -157,7 +157,7 @@ def read_table_list(table_list_file):
export_format='AVRO'
)

GCS_to_GCS = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(
GCS_to_GCS = gcs_to_gcs.GCSToGCSOperator(
# Replace ":" with valid character for Airflow task
task_id='{}_GCS_to_GCS'.format(table_source.replace(":", "_")),
source_bucket=source_bucket,
Expand All @@ -166,7 +166,7 @@ def read_table_list(table_list_file):
# destination_object='{}-*.avro'.format(table_dest)
)

GCS_to_BQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
GCS_to_BQ = gcs_to_bigquery.GCSToBigQueryOperator(
# Replace ":" with valid character for Airflow task
task_id='{}_GCS_to_BQ'.format(table_dest.replace(":", "_")),
bucket=dest_bucket,
Expand Down
4 changes: 2 additions & 2 deletions composer/workflows/bq_copy_eu_to_us_sample.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Source, Target
nyc-tlc:green.trips_2014,nyc_tlc_EU.trips_2014
nyc-tlc:green.trips_2015,nyc_tlc_EU.trips_2015
nyc-tlc.green.trips_2014,nyc_tlc_EU.trips_2014
nyc-tlc.green.trips_2015,nyc_tlc_EU.trips_2015
Loading

0 comments on commit 675e55f

Please sign in to comment.