Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: Upgrade Composer samples to Airflow 2.0 #5782

Merged
merged 31 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
442e4bc
modify dag_test_utils to point to branch; add airflow 2 changes
leahecole Apr 15, 2021
1358f27
upgrade airflow_db_cleanup to Airflow 2.0
leahecole May 3, 2021
5f99c94
upgrade hadoop tutorial to Airflow 2.0
leahecole May 3, 2021
3b7c119
upgrade kubernetespodoperator to Airflow 2.0
leahecole May 3, 2021
5a73d13
upgrade dataproc workflow template to Airflow 2.0
leahecole May 3, 2021
861a7aa
remove bashoperator python2 example
leahecole May 3, 2021
bb69630
upgrade DataflowTemplateOperator tutorial to 2.0
leahecole May 3, 2021
1412737
WIP: upgrade bq_notify to Airflow 2
leahecole May 4, 2021
14943c4
upgrade bq_copy_across_locations to Airflow 2.0
leahecole May 5, 2021
f3cc166
upgrade connections to Airflow 2
leahecole May 5, 2021
8ebece8
more airflow 2.0 migration
leahecole May 6, 2021
95c842b
db reset update
leahecole May 6, 2021
cdc0fcc
copy constraints from Airflow
leahecole May 6, 2021
c9db1ee
update noxfile config
leahecole May 6, 2021
916823d
update constraints to py-3.8
leahecole May 6, 2021
b1b4f21
add pip version override
leahecole May 14, 2021
c8f0936
upgrade simple to airflow 2
leahecole May 17, 2021
3a33687
fix lint
leahecole May 17, 2021
127379c
bqnotify cleanup
leahecole May 18, 2021
2f31bf8
add blog/ conftest
leahecole May 24, 2021
50d4b77
fix license header
leahecole May 24, 2021
5c681f4
more license header fixes
leahecole May 24, 2021
f7bf625
license again
leahecole May 24, 2021
466755a
remove unused import from blog conftest
leahecole May 24, 2021
6b72bb7
remove unused requirements
leahecole May 24, 2021
be47adf
remove script used by dag deleted in this PR
leahecole May 24, 2021
6f43148
Add clarifying comment about email operator
leahecole May 26, 2021
5fbad11
add newline back to csv
leahecole May 26, 2021
f9447fa
fix presumed typo
leahecole May 26, 2021
dffb45a
Merge branch 'master' into airflow_migration
dinagraves May 27, 2021
7574007
Merge branch 'master' into airflow_migration
leahecole May 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions composer/blog/gcp-tech-blog/unit-test-dags-cloud-build/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import os

import pytest


# this fixture initializes the Airflow DB once per session
# it is used by DAGs in this blog post code only
@pytest.fixture(scope="session")
def airflow_database():
import airflow.utils.db

# We use separate directory for local db path per session
# by setting AIRFLOW_HOME env var, which is done in noxfile_config.py.

assert('AIRFLOW_HOME' in os.environ)

airflow_home = os.environ["AIRFLOW_HOME"]
airflow_db = f"{airflow_home}/airflow.db"

# reset both resets and initializes a new database
airflow.utils.db.resetdb(rbac=None)

# Making sure we are using a data file there.
assert(os.path.isfile(airflow_db))
25 changes: 23 additions & 2 deletions composer/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Google Inc. All Rights Reserved.
# Copyright 2019 Google LLC All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,27 @@
import pytest


# this fixture initializes the Airflow DB once per session
# it is used by DAGs in both the blogs and workflows directories,
# unless there exists a conftest at a lower level
@pytest.fixture(scope="session")
def airflow_database():
import airflow.utils.db

# We use separate directory for local db path per session
# by setting AIRFLOW_HOME env var, which is done in noxfile_config.py.

assert('AIRFLOW_HOME' in os.environ)

airflow_home = os.environ["AIRFLOW_HOME"]
airflow_db = f"{airflow_home}/airflow.db"

# reset both resets and initializes a new database
airflow.utils.db.resetdb()
leahecole marked this conversation as resolved.
Show resolved Hide resolved

# Making sure we are using a data file there.
assert(os.path.isfile(airflow_db))

# this fixture initializes the Airflow DB once per session
# it is used by DAGs in both the blogs and workflows directories
@pytest.fixture(scope="session")
Expand All @@ -34,7 +55,7 @@ def airflow_database():
airflow_db = f"{airflow_home}/airflow.db"

# reset both resets and initializes a new database
airflow.utils.db.resetdb(rbac=None) # this command will change in Airflow 2.0
airflow.utils.db.resetdb()

# Making sure we are using a data file there.
assert(os.path.isfile(airflow_db))
15 changes: 0 additions & 15 deletions composer/data/python2_script.py

This file was deleted.

4 changes: 2 additions & 2 deletions composer/workflows/airflow_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
import airflow
from airflow import settings
from airflow.configuration import conf
from airflow.jobs import BaseJob
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \
TaskInstance, Variable, XCom
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
import dateutil.parser
from sqlalchemy import and_, func
from sqlalchemy.exc import ProgrammingError
Expand Down
44 changes: 0 additions & 44 deletions composer/workflows/bashoperator_python2.py

This file was deleted.

26 changes: 0 additions & 26 deletions composer/workflows/bashoperator_python2_test.py

This file was deleted.

18 changes: 9 additions & 9 deletions composer/workflows/bq_copy_across_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import logging

from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import gcs_to_gcs
from airflow.operators import dummy_operator
from airflow.operators import dummy
from airflow.providers.google.cloud.transfers import bigquery_to_gcs
from airflow.providers.google.cloud.transfers import gcs_to_bigquery
from airflow.providers.google.cloud.transfers import gcs_to_gcs


# --------------------------------------------------------------------------------
Expand Down Expand Up @@ -127,12 +127,12 @@ def read_table_list(table_list_file):
'composer_sample_bq_copy_across_locations',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy_operator.DummyOperator(
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)

end = dummy_operator.DummyOperator(
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
Expand All @@ -148,7 +148,7 @@ def read_table_list(table_list_file):
table_source = record['table_source']
table_dest = record['table_dest']

BQ_to_GCS = bigquery_to_gcs.BigQueryToCloudStorageOperator(
BQ_to_GCS = bigquery_to_gcs.BigQueryToGCSOperator(
# Replace ":" with valid character for Airflow task
task_id='{}_BQ_to_GCS'.format(table_source.replace(":", "_")),
source_project_dataset_table=table_source,
Expand All @@ -157,7 +157,7 @@ def read_table_list(table_list_file):
export_format='AVRO'
)

GCS_to_GCS = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(
GCS_to_GCS = gcs_to_gcs.GCSToGCSOperator(
# Replace ":" with valid character for Airflow task
task_id='{}_GCS_to_GCS'.format(table_source.replace(":", "_")),
source_bucket=source_bucket,
Expand All @@ -166,7 +166,7 @@ def read_table_list(table_list_file):
# destination_object='{}-*.avro'.format(table_dest)
)

GCS_to_BQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
GCS_to_BQ = gcs_to_bigquery.GCSToBigQueryOperator(
# Replace ":" with valid character for Airflow task
task_id='{}_GCS_to_BQ'.format(table_dest.replace(":", "_")),
bucket=dest_bucket,
Expand Down
4 changes: 2 additions & 2 deletions composer/workflows/bq_copy_eu_to_us_sample.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Source, Target
nyc-tlc:green.trips_2014,nyc_tlc_EU.trips_2014
nyc-tlc:green.trips_2015,nyc_tlc_EU.trips_2015
nyc-tlc.green.trips_2014,nyc_tlc_EU.trips_2014
nyc-tlc.green.trips_2015,nyc_tlc_EU.trips_2015
Loading