diff --git a/composer/blog/gcp-tech-blog/unit-test-dags-cloud-build/conftest.py b/composer/blog/gcp-tech-blog/unit-test-dags-cloud-build/conftest.py new file mode 100644 index 000000000000..ae2105370999 --- /dev/null +++ b/composer/blog/gcp-tech-blog/unit-test-dags-cloud-build/conftest.py @@ -0,0 +1,39 @@ +# Copyright 2021 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# https://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +import pytest + + +# this fixture initializes the Airflow DB once per session +# it is used by DAGs in this blog post code only +@pytest.fixture(scope="session") +def airflow_database(): + import airflow.utils.db + + # We use separate directory for local db path per session + # by setting AIRFLOW_HOME env var, which is done in noxfile_config.py. + + assert('AIRFLOW_HOME' in os.environ) + + airflow_home = os.environ["AIRFLOW_HOME"] + airflow_db = f"{airflow_home}/airflow.db" + + # reset both resets and initializes a new database + airflow.utils.db.resetdb(rbac=None) + + # Making sure we are using a data file there. + assert(os.path.isfile(airflow_db)) diff --git a/composer/conftest.py b/composer/conftest.py index 908edb39419b..0bdc0dbd4f07 100644 --- a/composer/conftest.py +++ b/composer/conftest.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2019 Google LLC All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,27 @@ import pytest +# this fixture initializes the Airflow DB once per session +# it is used by DAGs in both the blogs and workflows directories, +# unless there exists a conftest at a lower level +@pytest.fixture(scope="session") +def airflow_database(): + import airflow.utils.db + + # We use separate directory for local db path per session + # by setting AIRFLOW_HOME env var, which is done in noxfile_config.py. + + assert('AIRFLOW_HOME' in os.environ) + + airflow_home = os.environ["AIRFLOW_HOME"] + airflow_db = f"{airflow_home}/airflow.db" + + # reset both resets and initializes a new database + airflow.utils.db.resetdb() + + # Making sure we are using a data file there. + assert(os.path.isfile(airflow_db)) + # this fixture initializes the Airflow DB once per session # it is used by DAGs in both the blogs and workflows directories @pytest.fixture(scope="session") @@ -34,7 +55,7 @@ def airflow_database(): airflow_db = f"{airflow_home}/airflow.db" # reset both resets and initializes a new database - airflow.utils.db.resetdb(rbac=None) # this command will change in Airflow 2.0 + airflow.utils.db.resetdb() # Making sure we are using a data file there. assert(os.path.isfile(airflow_db)) diff --git a/composer/data/python2_script.py b/composer/data/python2_script.py deleted file mode 100644 index aec80592f75f..000000000000 --- a/composer/data/python2_script.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2018 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -print 'Output from Python 2.' # noqa diff --git a/composer/workflows/airflow_db_cleanup.py b/composer/workflows/airflow_db_cleanup.py index a2e90de8fe70..75799745dfd9 100644 --- a/composer/workflows/airflow_db_cleanup.py +++ b/composer/workflows/airflow_db_cleanup.py @@ -54,10 +54,10 @@ import airflow from airflow import settings from airflow.configuration import conf -from airflow.jobs import BaseJob +from airflow.jobs.base_job import BaseJob from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \ TaskInstance, Variable, XCom -from airflow.operators.python_operator import PythonOperator +from airflow.operators.python import PythonOperator import dateutil.parser from sqlalchemy import and_, func from sqlalchemy.exc import ProgrammingError diff --git a/composer/workflows/bashoperator_python2.py b/composer/workflows/bashoperator_python2.py deleted file mode 100644 index 29bcdb7af795..000000000000 --- a/composer/workflows/bashoperator_python2.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2018 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START composer_bashoperator_python2] -import datetime - -from airflow import models -from airflow.operators import bash_operator - - -yesterday = datetime.datetime.combine( - datetime.datetime.today() - datetime.timedelta(1), - datetime.datetime.min.time()) - - -default_dag_args = { - # Setting start date as yesterday starts the DAG immediately when it is - # detected in the Cloud Storage bucket. - 'start_date': yesterday, -} - -with models.DAG( - 'composer_sample_bashoperator_python2', - schedule_interval=datetime.timedelta(days=1), - default_args=default_dag_args) as dag: - - run_python2 = bash_operator.BashOperator( - task_id='run_python2', - # This example runs a Python script from the data folder to prevent - # Airflow from attempting to parse the script as a DAG. - bash_command='python2 /home/airflow/gcs/data/python2_script.py', - ) -# [END composer_bashoperator_python2] diff --git a/composer/workflows/bashoperator_python2_test.py b/composer/workflows/bashoperator_python2_test.py deleted file mode 100644 index d666b84f1ec7..000000000000 --- a/composer/workflows/bashoperator_python2_test.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2018 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import internal_unit_testing - - -def test_dag_import(): - """Test that the DAG file can be successfully imported. - - This tests that the DAG can be parsed, but does not run it in an Airflow - environment. This is a recommended confidence check by the official Airflow - docs: https://airflow.incubator.apache.org/tutorial.html#testing - """ - from . import bashoperator_python2 as module - internal_unit_testing.assert_has_valid_dag(module) diff --git a/composer/workflows/bq_copy_across_locations.py b/composer/workflows/bq_copy_across_locations.py index dc68a7d67dba..dcebf3cdac11 100644 --- a/composer/workflows/bq_copy_across_locations.py +++ b/composer/workflows/bq_copy_across_locations.py @@ -40,10 +40,10 @@ import logging from airflow import models -from airflow.contrib.operators import bigquery_to_gcs -from airflow.contrib.operators import gcs_to_bq -from airflow.contrib.operators import gcs_to_gcs -from airflow.operators import dummy_operator +from airflow.operators import dummy +from airflow.providers.google.cloud.transfers import bigquery_to_gcs +from airflow.providers.google.cloud.transfers import gcs_to_bigquery +from airflow.providers.google.cloud.transfers import gcs_to_gcs # -------------------------------------------------------------------------------- @@ -127,12 +127,12 @@ def read_table_list(table_list_file): 'composer_sample_bq_copy_across_locations', default_args=default_args, schedule_interval=None) as dag: - start = dummy_operator.DummyOperator( + start = dummy.DummyOperator( task_id='start', trigger_rule='all_success' ) - end = dummy_operator.DummyOperator( + end = dummy.DummyOperator( task_id='end', trigger_rule='all_success' ) @@ -148,7 +148,7 @@ def read_table_list(table_list_file): table_source = record['table_source'] table_dest = record['table_dest'] - BQ_to_GCS = bigquery_to_gcs.BigQueryToCloudStorageOperator( + BQ_to_GCS = bigquery_to_gcs.BigQueryToGCSOperator( # Replace ":" with valid character for Airflow task task_id='{}_BQ_to_GCS'.format(table_source.replace(":", "_")), source_project_dataset_table=table_source, @@ -157,7 +157,7 @@ def read_table_list(table_list_file): export_format='AVRO' ) - GCS_to_GCS = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator( + GCS_to_GCS = gcs_to_gcs.GCSToGCSOperator( # Replace ":" with valid character for Airflow task task_id='{}_GCS_to_GCS'.format(table_source.replace(":", "_")), source_bucket=source_bucket, @@ -166,7 +166,7 @@ def read_table_list(table_list_file): # destination_object='{}-*.avro'.format(table_dest) ) - GCS_to_BQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + GCS_to_BQ = gcs_to_bigquery.GCSToBigQueryOperator( # Replace ":" with valid character for Airflow task task_id='{}_GCS_to_BQ'.format(table_dest.replace(":", "_")), bucket=dest_bucket, diff --git a/composer/workflows/bq_copy_eu_to_us_sample.csv b/composer/workflows/bq_copy_eu_to_us_sample.csv index fd529d9cde8c..f4d96b32bbda 100644 --- a/composer/workflows/bq_copy_eu_to_us_sample.csv +++ b/composer/workflows/bq_copy_eu_to_us_sample.csv @@ -1,3 +1,3 @@ Source, Target -nyc-tlc:green.trips_2014,nyc_tlc_EU.trips_2014 -nyc-tlc:green.trips_2015,nyc_tlc_EU.trips_2015 \ No newline at end of file +nyc-tlc.green.trips_2014,nyc_tlc_EU.trips_2014 +nyc-tlc.green.trips_2015,nyc_tlc_EU.trips_2015 diff --git a/composer/workflows/bq_notify.py b/composer/workflows/bq_notify.py index 9c1ec1575b80..af72b574fe72 100644 --- a/composer/workflows/bq_notify.py +++ b/composer/workflows/bq_notify.py @@ -33,26 +33,26 @@ # [START composer_notify_failure] from airflow import models # [END composer_notify_failure] -from airflow.contrib.operators import bigquery_get_data -# [START composer_bigquery] -from airflow.contrib.operators import bigquery_operator -# [END composer_bigquery] -from airflow.contrib.operators import bigquery_to_gcs # [START composer_bash_bq] -from airflow.operators import bash_operator +from airflow.operators import bash # [END composer_bash_bq] # [START composer_email] -from airflow.operators import email_operator +from airflow.operators import email # [END composer_email] +# [START composer_bigquery] +from airflow.providers.google.cloud.operators import bigquery +from airflow.providers.google.cloud.transfers import bigquery_to_gcs +# [END composer_bigquery] from airflow.utils import trigger_rule bq_dataset_name = 'airflow_bq_notify_dataset_{{ ds_nodash }}' -bq_recent_questions_table_id = bq_dataset_name + '.recent_questions' -BQ_MOST_POPULAR_TABLE_NAME = 'most_popular' -bq_most_popular_table_id = bq_dataset_name + '.' + BQ_MOST_POPULAR_TABLE_NAME -output_file = 'gs://{gcs_bucket}/recent_questionsS.csv'.format( - gcs_bucket=models.Variable.get('gcs_bucket')) +bq_recent_questions_table_id = 'recent_questions' +bq_most_popular_table_id = 'most_popular' +gcs_bucket = models.Variable.get('gcs_bucket') +output_file = f'{gcs_bucket}/recent_questions.csv' +location = 'US' +project_id = models.Variable.get('gcp_project') # Data from the month of January 2018 # You may change the query dates to get data from a different time range. You @@ -63,6 +63,22 @@ max_query_date = '2018-02-01' min_query_date = '2018-01-01' +RECENT_QUESTIONS_QUERY = f""" + SELECT owner_display_name, title, view_count + FROM `bigquery-public-data.stackoverflow.posts_questions` + WHERE creation_date < CAST('{max_query_date}' AS TIMESTAMP) + AND creation_date >= CAST('{min_query_date}' AS TIMESTAMP) + ORDER BY view_count DESC + LIMIT 100 + """ + +MOST_POPULAR_QUERY = f""" + SELECT title, view_count + FROM `{project_id}.{bq_dataset_name}.{bq_recent_questions_table_id}` + ORDER BY view_count DESC + LIMIT 1 + """ + yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) @@ -76,7 +92,7 @@ 'email_on_retry': False, 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), - 'project_id': models.Variable.get('gcp_project') + 'project_id': project_id } with models.DAG( @@ -87,62 +103,70 @@ # [START composer_bash_bq] # Create BigQuery output dataset. - make_bq_dataset = bash_operator.BashOperator( + make_bq_dataset = bash.BashOperator( task_id='make_bq_dataset', # Executing 'bq' command requires Google Cloud SDK which comes # preinstalled in Cloud Composer. - bash_command='bq ls {} || bq mk {}'.format( - bq_dataset_name, bq_dataset_name)) + bash_command=f'bq ls {bq_dataset_name} || bq mk {bq_dataset_name}') # [END composer_bash_bq] # [START composer_bigquery] - # Query recent StackOverflow questions. - bq_recent_questions_query = bigquery_operator.BigQueryOperator( - task_id='bq_recent_questions_query', - sql=""" - SELECT owner_display_name, title, view_count - FROM `bigquery-public-data.stackoverflow.posts_questions` - WHERE creation_date < CAST('{max_date}' AS TIMESTAMP) - AND creation_date >= CAST('{min_date}' AS TIMESTAMP) - ORDER BY view_count DESC - LIMIT 100 - """.format(max_date=max_query_date, min_date=min_query_date), - use_legacy_sql=False, - destination_dataset_table=bq_recent_questions_table_id) + bq_recent_questions_query = bigquery.BigQueryInsertJobOperator( + task_id="bq_recent_questions_query", + configuration={ + "query": { + "query": RECENT_QUESTIONS_QUERY, + "useLegacySql": False, + "destinationTable": { + "projectId": project_id, + "datasetId": bq_dataset_name, + "tableId": bq_recent_questions_table_id + } + } + }, + location=location, + ) # [END composer_bigquery] # Export query result to Cloud Storage. - export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator( + export_questions_to_gcs = bigquery_to_gcs.BigQueryToGCSOperator( task_id='export_recent_questions_to_gcs', - source_project_dataset_table=bq_recent_questions_table_id, + source_project_dataset_table=f"{project_id}.{bq_dataset_name}.{bq_recent_questions_table_id}", destination_cloud_storage_uris=[output_file], export_format='CSV') # Perform most popular question query. - bq_most_popular_query = bigquery_operator.BigQueryOperator( - task_id='bq_most_popular_question_query', - sql=""" - SELECT title, view_count - FROM `{table}` - ORDER BY view_count DESC - LIMIT 1 - """.format(table=bq_recent_questions_table_id), - use_legacy_sql=False, - destination_dataset_table=bq_most_popular_table_id) + bq_most_popular_query = bigquery.BigQueryInsertJobOperator( + task_id="bq_most_popular_question_query", + configuration={ + "query": { + "query": MOST_POPULAR_QUERY, + "useLegacySql": False, + "destinationTable": { + "projectId": project_id, + "datasetId": bq_dataset_name, + "tableId": bq_most_popular_table_id + } + } + }, + location=location, + ) # Read most popular question from BigQuery to XCom output. # XCom is the best way to communicate between operators, but can only # transfer small amounts of data. For passing large amounts of data, store # the data in Cloud Storage and pass the path to the data if necessary. # https://airflow.apache.org/concepts.html#xcoms - bq_read_most_popular = bigquery_get_data.BigQueryGetDataOperator( + bq_read_most_popular = bigquery.BigQueryGetDataOperator( task_id='bq_read_most_popular', dataset_id=bq_dataset_name, - table_id=BQ_MOST_POPULAR_TABLE_NAME) + table_id=bq_most_popular_table_id) # [START composer_email] - # Send email confirmation - email_summary = email_operator.EmailOperator( + # Send email confirmation (you will need to set up the email operator + # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification + # for more info on configuring the email operator in Cloud Composer) + email_summary = email.EmailOperator( task_id='email_summary', to=models.Variable.get('email'), subject='Sample BigQuery notify data ready', @@ -167,7 +191,7 @@ # Delete BigQuery dataset # Delete the bq table - delete_bq_dataset = bash_operator.BashOperator( + delete_bq_dataset = bash.BashOperator( task_id='delete_bq_dataset', bash_command='bq rm -r -f %s' % bq_dataset_name, trigger_rule=trigger_rule.TriggerRule.ALL_DONE) diff --git a/composer/workflows/connections.py b/composer/workflows/connections.py index 129a4286978e..ea98c03ba797 100644 --- a/composer/workflows/connections.py +++ b/composer/workflows/connections.py @@ -17,7 +17,7 @@ import datetime from airflow import models -from airflow.contrib.operators import bigquery_operator +from airflow.providers.google.cloud.operators import bigquery yesterday = datetime.datetime.combine( @@ -38,21 +38,39 @@ schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: # [START composer_connections_default] - task_default = bigquery_operator.BigQueryOperator( + task_default = bigquery.BigQueryInsertJobOperator( task_id='task_default_connection', - sql='SELECT 1', use_legacy_sql=False) + configuration={ + "query": { + "query": 'SELECT 1', + "useLegacySql": False + } + } + ) # [END composer_connections_default] # [START composer_connections_explicit] - task_explicit = bigquery_operator.BigQueryOperator( + # Composer creates a 'google_cloud_default' connection by default. + task_explicit = bigquery.BigQueryInsertJobOperator( task_id='task_explicit_connection', - sql='SELECT 1', use_legacy_sql=False, - # Composer creates a 'google_cloud_default' connection by default. - bigquery_conn_id='google_cloud_default') + gcp_conn_id='google_cloud_default', + configuration={ + "query": { + "query": 'SELECT 1', + "useLegacySql": False + } + } + ) # [END composer_connections_explicit] # [START composer_connections_custom] - task_custom = bigquery_operator.BigQueryOperator( + # Set a gcp_conn_id to use a connection that you have created. + task_custom = bigquery.BigQueryInsertJobOperator( task_id='task_custom_connection', - sql='SELECT 1', use_legacy_sql=False, - # Set a connection ID to use a connection that you have created. - bigquery_conn_id='my_gcp_connection') + gcp_conn_id='my_gcp_connection', + configuration={ + "query": { + "query": 'SELECT 1', + "useLegacySql": False + } + } + ) # [END composer_connections_custom] diff --git a/composer/workflows/constraints.txt b/composer/workflows/constraints.txt index ca0e831ad6aa..819204e3f98f 100644 --- a/composer/workflows/constraints.txt +++ b/composer/workflows/constraints.txt @@ -1,2 +1,488 @@ -pandas-gbq==0.14.1 # must be under 0.15.0 until https://github.com/apache/airflow/issues/15113 is addressed -SQLAlchemy==1.3.23 # must be under 1.4 until at least Airflow 2.0 (check airflow setup.py for restrictions) +# This file is from https://raw.githubusercontent.com/apache/airflow/constraints-2.0.1/constraints-3.8.txt +APScheduler==3.6.3 +Authlib==0.15.3 +Babel==2.9.0 +Flask-AppBuilder==3.1.1 +Flask-Babel==1.0.0 +Flask-Bcrypt==0.7.1 +Flask-Caching==1.9.0 +Flask-JWT-Extended==3.25.1 +Flask-Login==0.4.1 +Flask-OAuthlib==0.9.5 +Flask-OpenID==1.2.5 +Flask-SQLAlchemy==2.4.4 +Flask-WTF==0.14.3 +Flask==1.1.2 +GitPython==3.1.13 +HeapDict==1.0.1 +JPype1==1.2.1 +JayDeBeApi==1.2.3 +Jinja2==2.11.3 +Mako==1.1.4 +Markdown==3.3.3 +MarkupSafe==1.1.1 +PyHive==0.6.3 +PyJWT==1.7.1 +PyNaCl==1.4.0 +PySmbClient==0.1.5 +PyYAML==5.4.1 +Pygments==2.8.0 +SQLAlchemy-JSONField==1.0.0 +SQLAlchemy-Utils==0.36.8 +SQLAlchemy==1.3.23 +Sphinx==3.5.0 +Unidecode==1.2.0 +WTForms==2.3.3 +Werkzeug==1.0.1 +adal==1.2.6 +aiohttp==3.7.3 +alabaster==0.7.12 +alembic==1.5.4 +amqp==2.6.1 +analytics-python==1.2.9 +ansiwrap==0.8.4 +apache-airflow-providers-amazon==1.1.0 +apache-airflow-providers-apache-cassandra==1.0.1 +apache-airflow-providers-apache-druid==1.0.1 +apache-airflow-providers-apache-hdfs==1.0.1 +apache-airflow-providers-apache-hive==1.0.1 +apache-airflow-providers-apache-kylin==1.0.1 +apache-airflow-providers-apache-livy==1.0.1 +apache-airflow-providers-apache-pig==1.0.1 +apache-airflow-providers-apache-pinot==1.0.1 +apache-airflow-providers-apache-spark==1.0.1 +apache-airflow-providers-apache-sqoop==1.0.1 +apache-airflow-providers-celery==1.0.1 +apache-airflow-providers-cloudant==1.0.1 +apache-airflow-providers-cncf-kubernetes==1.0.1 +apache-airflow-providers-databricks==1.0.1 +apache-airflow-providers-datadog==1.0.1 +apache-airflow-providers-dingding==1.0.1 +apache-airflow-providers-discord==1.0.1 +apache-airflow-providers-docker==1.0.1 +apache-airflow-providers-elasticsearch==1.0.1 +apache-airflow-providers-exasol==1.1.0 +apache-airflow-providers-facebook==1.0.1 +apache-airflow-providers-ftp==1.0.1 +apache-airflow-providers-google==1.0.0 +apache-airflow-providers-grpc==1.0.1 +apache-airflow-providers-hashicorp==1.0.1 +apache-airflow-providers-http==1.1.0 +apache-airflow-providers-imap==1.0.1 +apache-airflow-providers-jdbc==1.0.1 +apache-airflow-providers-jenkins==1.0.1 +apache-airflow-providers-jira==1.0.1 +apache-airflow-providers-microsoft-azure==1.1.0 +apache-airflow-providers-microsoft-mssql==1.0.1 +apache-airflow-providers-microsoft-winrm==1.0.1 +apache-airflow-providers-mongo==1.0.1 +apache-airflow-providers-mysql==1.0.1 +apache-airflow-providers-odbc==1.0.1 +apache-airflow-providers-openfaas==1.1.0 +apache-airflow-providers-opsgenie==1.0.1 +apache-airflow-providers-oracle==1.0.1 +apache-airflow-providers-pagerduty==1.0.1 +apache-airflow-providers-papermill==1.0.1 +apache-airflow-providers-plexus==1.0.1 +apache-airflow-providers-postgres==1.0.1 +apache-airflow-providers-presto==1.0.1 +apache-airflow-providers-qubole==1.0.1 +apache-airflow-providers-redis==1.0.1 +apache-airflow-providers-salesforce==1.0.1 +apache-airflow-providers-samba==1.0.1 +apache-airflow-providers-segment==1.0.1 +apache-airflow-providers-sendgrid==1.0.1 +apache-airflow-providers-sftp==1.1.0 +apache-airflow-providers-singularity==1.0.1 +apache-airflow-providers-slack==2.0.0 +apache-airflow-providers-snowflake==1.1.0 +apache-airflow-providers-sqlite==1.0.1 +apache-airflow-providers-ssh==1.1.0 +apache-airflow-providers-telegram==1.0.1 +apache-airflow-providers-vertica==1.0.1 +apache-airflow-providers-yandex==1.0.1 +apache-airflow-providers-zendesk==1.0.1 +apipkg==1.5 +apispec==3.3.2 +appdirs==1.4.4 +argcomplete==1.12.2 +arrow==0.17.0 +asn1crypto==1.4.0 +astroid==2.4.2 +async-generator==1.10 +async-timeout==3.0.1 +atlasclient==1.0.0 +attrs==20.3.0 +aws-sam-translator==1.34.0 +aws-xray-sdk==2.6.0 +azure-batch==10.0.0 +azure-common==1.1.26 +azure-core==1.11.0 +azure-cosmos==3.2.0 +azure-datalake-store==0.0.51 +azure-identity==1.5.0 +azure-keyvault-certificates==4.2.1 +azure-keyvault-keys==4.3.1 +azure-keyvault-secrets==4.2.0 +azure-keyvault==4.1.0 +azure-kusto-data==0.0.45 +azure-mgmt-containerinstance==1.5.0 +azure-mgmt-core==1.2.2 +azure-mgmt-datalake-nspkg==3.0.1 +azure-mgmt-datalake-store==0.5.0 +azure-mgmt-nspkg==3.0.2 +azure-mgmt-resource==15.0.0 +azure-nspkg==3.0.2 +azure-storage-blob==12.7.1 +azure-storage-common==2.1.0 +azure-storage-file==2.1.0 +backcall==0.2.0 +bcrypt==3.2.0 +beautifulsoup4==4.7.1 +billiard==3.6.3.0 +black==20.8b1 +blinker==1.4 +boto3==1.15.18 +boto==2.49.0 +botocore==1.18.18 +bowler==0.9.0 +cached-property==1.5.2 +cachetools==4.2.1 +cassandra-driver==3.20.2 +cattrs==1.2.0 +celery==4.4.7 +certifi==2020.12.5 +cffi==1.14.5 +cfgv==3.2.0 +cfn-lint==0.44.7 +cgroupspy==0.1.6 +chardet==3.0.4 +click==7.1.2 +clickclick==20.10.2 +cloudant==2.14.0 +cloudpickle==1.4.1 +colorama==0.4.4 +colorlog==4.7.2 +commonmark==0.9.1 +connexion==2.7.0 +coverage==5.4 +croniter==0.3.37 +cryptography==3.4.5 +curlify==2.2.1 +cx-Oracle==8.1.0 +dask==2021.2.0 +datadog==0.40.0 +decorator==4.4.2 +defusedxml==0.6.0 +dill==0.3.3 +distlib==0.3.1 +distributed==2.19.0 +dnspython==1.16.0 +docker-pycreds==0.4.0 +docker==3.7.3 +docopt==0.6.2 +docutils==0.16 +ecdsa==0.14.1 +elasticsearch-dbapi==0.1.0 +elasticsearch-dsl==7.3.0 +elasticsearch==7.5.1 +email-validator==1.1.2 +entrypoints==0.3 +eventlet==0.30.1 +execnet==1.8.0 +facebook-business==9.0.2 +fastavro==1.3.1 +filelock==3.0.12 +fissix==20.8.0 +flake8-colors==0.1.9 +flake8==3.8.4 +flaky==3.7.0 +flower==0.9.7 +freezegun==1.1.0 +fsspec==0.8.5 +future==0.18.2 +gcsfs==0.7.2 +gevent==21.1.2 +gitdb==4.0.5 +github3.py==1.3.0 +google-ads==7.0.0 +google-api-core==1.26.0 +google-api-python-client==1.12.8 +google-auth-httplib2==0.0.4 +google-auth-oauthlib==0.4.2 +google-auth==1.26.1 +google-cloud-automl==1.0.1 +google-cloud-bigquery-datatransfer==1.1.1 +google-cloud-bigquery-storage==2.2.1 +google-cloud-bigquery==2.8.0 +google-cloud-bigtable==1.7.0 +google-cloud-container==1.0.1 +google-cloud-core==1.6.0 +google-cloud-datacatalog==0.7.0 +google-cloud-dataproc==1.1.1 +google-cloud-dlp==1.0.0 +google-cloud-kms==1.4.0 +google-cloud-language==1.3.0 +google-cloud-logging==1.15.1 +google-cloud-memcache==0.3.0 +google-cloud-monitoring==1.1.0 +google-cloud-os-login==1.0.0 +google-cloud-pubsub==1.7.0 +google-cloud-redis==1.0.0 +google-cloud-secret-manager==1.0.0 +google-cloud-spanner==1.19.1 +google-cloud-speech==1.3.2 +google-cloud-storage==1.36.0 +google-cloud-tasks==1.5.0 +google-cloud-texttospeech==1.0.1 +google-cloud-translate==1.7.0 +google-cloud-videointelligence==1.16.1 +google-cloud-vision==1.0.0 +google-crc32c==1.1.2 +google-resumable-media==1.2.0 +googleapis-common-protos==1.52.0 +graphviz==0.16 +greenlet==1.0.0 +grpc-google-iam-v1==0.12.3 +grpcio-gcp==0.2.2 +grpcio==1.35.0 +gunicorn==19.10.0 +hdfs==2.5.8 +hmsclient==0.1.1 +httplib2==0.19.0 +humanize==3.2.0 +hvac==0.10.8 +identify==1.5.13 +idna==2.10 +imagesize==1.2.0 +importlib-metadata==1.7.0 +importlib-resources==1.5.0 +inflection==0.5.1 +iniconfig==1.1.1 +ipdb==0.13.4 +ipython-genutils==0.2.0 +ipython==7.20.0 +iso8601==0.1.14 +isodate==0.6.0 +isort==5.7.0 +itsdangerous==1.1.0 +jedi==0.18.0 +jira==2.0.0 +jmespath==0.10.0 +json-merge-patch==0.2 +jsondiff==1.2.0 +jsonpatch==1.28 +jsonpath-ng==1.5.2 +jsonpickle==2.0.0 +jsonpointer==2.0 +jsonschema==3.2.0 +junit-xml==1.9 +jupyter-client==6.1.11 +jupyter-core==4.7.1 +jwcrypto==0.8 +kombu==4.6.11 +kubernetes==11.0.0 +kylinpy==2.8.4 +lazy-object-proxy==1.4.3 +ldap3==2.9 +libcst==0.3.17 +lockfile==0.12.2 +marshmallow-enum==1.5.1 +marshmallow-oneofschema==2.1.0 +marshmallow-sqlalchemy==0.23.1 +marshmallow==3.10.0 +mccabe==0.6.1 +mock==4.0.2 +mongomock==3.22.1 +more-itertools==8.7.0 +moreorless==0.3.0 +moto==1.3.16 +msal-extensions==0.3.0 +msal==1.9.0 +msgpack==1.0.2 +msrest==0.6.21 +msrestazure==0.6.4 +multi-key-dict==2.0.3 +multidict==5.1.0 +mypy-extensions==0.4.3 +mypy==0.770 +mysql-connector-python==8.0.22 +mysqlclient==1.3.14 +natsort==7.1.1 +nbclient==0.5.2 +nbformat==5.1.2 +nest-asyncio==1.5.1 +networkx==2.5 +nodeenv==1.5.0 +nteract-scrapbook==0.4.2 +ntlm-auth==1.5.0 +numpy==1.20.1 +oauthlib==2.1.0 +openapi-spec-validator==0.2.9 +oscrypto==1.2.1 +packaging==20.9 +pandas-gbq==0.14.1 +pandas==1.2.2 +papermill==2.3.2 +parameterized==0.8.1 +paramiko==2.7.2 +parso==0.8.1 +pathspec==0.8.1 +pbr==5.5.1 +pdpyras==4.1.2 +pendulum==2.1.2 +pexpect==4.8.0 +pickleshare==0.7.5 +pinotdb==0.3.3 +pipdeptree==2.0.0 +pluggy==0.13.1 +ply==3.11 +portalocker==1.7.1 +pre-commit==2.10.1 +presto-python-client==0.7.0 +prison==0.1.3 +prometheus-client==0.8.0 +prompt-toolkit==3.0.16 +proto-plus==1.13.0 +protobuf==3.14.0 +psutil==5.8.0 +psycopg2-binary==2.8.6 +ptyprocess==0.7.0 +py4j==0.10.9 +py==1.10.0 +pyOpenSSL==19.1.0 +pyarrow==3.0.0 +pyasn1-modules==0.2.8 +pyasn1==0.4.8 +pycodestyle==2.6.0 +pycountry==20.7.3 +pycparser==2.20 +pycryptodomex==3.10.1 +pydata-google-auth==1.1.0 +pydruid==0.6.2 +pyenchant==3.2.0 +pyexasol==0.17.0 +pyflakes==2.2.0 +pykerberos==1.2.1 +pylint==2.6.0 +pymongo==3.11.3 +pymssql==2.1.5 +pyodbc==4.0.30 +pyparsing==2.4.7 +pyrsistent==0.17.3 +pysftp==0.2.9 +pyspark==3.0.1 +pytest-cov==2.11.1 +pytest-forked==1.3.0 +pytest-instafail==0.4.2 +pytest-rerunfailures==9.1.1 +pytest-timeouts==1.2.1 +pytest-xdist==2.2.1 +pytest==6.2.2 +python-daemon==2.2.4 +python-dateutil==2.8.1 +python-editor==1.0.4 +python-http-client==3.3.2 +python-jenkins==1.7.0 +python-jose==3.2.0 +python-ldap==3.3.1 +python-nvd3==0.15.0 +python-slugify==4.0.1 +python-telegram-bot==13.0 +python3-openid==3.2.0 +pytz==2020.5 +pytzdata==2020.1 +pywinrm==0.4.1 +pyzmq==22.0.3 +qds-sdk==1.16.1 +redis==3.5.3 +regex==2020.11.13 +requests-kerberos==0.12.0 +requests-mock==1.8.0 +requests-ntlm==1.1.0 +requests-oauthlib==1.1.0 +requests-toolbelt==0.9.1 +requests==2.23.0 +responses==0.12.1 +rich==9.2.0 +rsa==4.7 +s3transfer==0.3.4 +sasl==0.2.1 +semver==2.13.0 +sendgrid==6.6.0 +sentinels==1.0.0 +sentry-sdk==0.20.1 +setproctitle==1.2.2 +simple-salesforce==1.10.1 +six==1.15.0 +slack-sdk==3.3.2 +slackclient==2.9.3 +smmap==3.0.5 +snakebite-py3==3.0.5 +snowballstemmer==2.1.0 +snowflake-connector-python==2.3.10 +snowflake-sqlalchemy==1.2.4 +sortedcontainers==2.3.0 +soupsieve==2.2 +sphinx-airflow-theme==0.0.2 +sphinx-argparse==0.2.5 +sphinx-autoapi==1.0.0 +sphinx-copybutton==0.3.1 +sphinx-jinja==1.1.1 +sphinx-rtd-theme==0.5.1 +sphinxcontrib-applehelp==1.0.2 +sphinxcontrib-devhelp==1.0.2 +sphinxcontrib-dotnetdomain==0.4 +sphinxcontrib-golangdomain==0.2.0.dev0 +sphinxcontrib-htmlhelp==1.0.3 +sphinxcontrib-httpdomain==1.7.0 +sphinxcontrib-jsmath==1.0.1 +sphinxcontrib-qthelp==1.0.3 +sphinxcontrib-redoc==1.6.0 +sphinxcontrib-serializinghtml==1.1.4 +sphinxcontrib-spelling==5.2.1 +spython==0.0.85 +sshpubkeys==3.3.1 +sshtunnel==0.1.5 +starkbank-ecdsa==1.1.0 +statsd==3.3.0 +swagger-ui-bundle==0.0.8 +tableauserverclient==0.14.1 +tabulate==0.8.7 +tblib==1.7.0 +tenacity==6.2.0 +termcolor==1.1.0 +testfixtures==6.17.1 +text-unidecode==1.3 +textwrap3==0.9.2 +thrift-sasl==0.4.2 +thrift==0.13.0 +toml==0.10.2 +toolz==0.11.1 +tornado==6.1 +tqdm==4.56.2 +traitlets==5.0.5 +typed-ast==1.4.2 +typing-extensions==3.7.4.3 +typing-inspect==0.6.0 +tzlocal==2.1 +unicodecsv==0.14.1 +uritemplate==3.0.1 +urllib3==1.25.11 +vertica-python==1.0.1 +vine==1.3.0 +virtualenv==20.4.2 +volatile==2.1.0 +watchtower==0.7.3 +wcwidth==0.2.5 +websocket-client==0.57.0 +wrapt==1.12.1 +xmltodict==0.12.0 +yamllint==1.26.0 +yandexcloud==0.73.0 +yarl==1.6.3 +zdesk==2.7.1 +zict==2.0.0 +zipp==3.4.0 +zope.event==4.5.0 +zope.interface==5.2.0 \ No newline at end of file diff --git a/composer/workflows/dataflowtemplateoperator_tutorial.py b/composer/workflows/dataflowtemplateoperator_tutorial.py index 34d79c3cb3e4..44c5c21ee8d6 100644 --- a/composer/workflows/dataflowtemplateoperator_tutorial.py +++ b/composer/workflows/dataflowtemplateoperator_tutorial.py @@ -34,13 +34,12 @@ import datetime from airflow import models -from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator +from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator from airflow.utils.dates import days_ago bucket_path = models.Variable.get("bucket_path") project_id = models.Variable.get("project_id") gce_zone = models.Variable.get("gce_zone") -gce_region = models.Variable.get("gce_region") default_args = { @@ -48,8 +47,6 @@ "start_date": days_ago(1), "dataflow_default_options": { "project": project_id, - # Set to your region - "region": gce_region, # Set to your zone "zone": gce_zone, # This is a subfolder for storing temporary files, like the staged pipeline job. @@ -68,7 +65,7 @@ schedule_interval=datetime.timedelta(days=1), # Override to match your needs ) as dag: - start_template_job = DataflowTemplateOperator( + start_template_job = DataflowTemplatedJobStartOperator( # The task id of your job task_id="dataflow_operator_transform_csv_to_bq", # The name of the template that you're using. diff --git a/composer/workflows/dataflowtemplateoperator_tutorial_test.py b/composer/workflows/dataflowtemplateoperator_tutorial_test.py index 69724c7c3399..19cc39ea599d 100644 --- a/composer/workflows/dataflowtemplateoperator_tutorial_test.py +++ b/composer/workflows/dataflowtemplateoperator_tutorial_test.py @@ -23,12 +23,10 @@ def set_variables(airflow_database): models.Variable.set("bucket_path", "gs://example_bucket") models.Variable.set("project_id", "example-project") models.Variable.set("gce_zone", "us-central1-f") - models.Variable.set("gce_region", "us-central1-f") yield models.Variable.delete('bucket_path') models.Variable.delete('project_id') models.Variable.delete('gce_zone') - models.Variable.delete('gce_region') def test_dag_import(): diff --git a/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial.py b/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial.py index b970e3de6ae7..315a2ff4fde2 100644 --- a/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial.py +++ b/composer/workflows/dataproc_workflow_template_instantiate_operator_tutorial.py @@ -25,7 +25,7 @@ import datetime from airflow import models -from airflow.contrib.operators import dataproc_operator +from airflow.providers.google.cloud.operators.dataproc import DataprocInstantiateWorkflowTemplateOperator from airflow.utils.dates import days_ago project_id = models.Variable.get("project_id") @@ -48,7 +48,7 @@ schedule_interval=datetime.timedelta(days=1), # Override to match your needs ) as dag: - start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator( + start_template_job = DataprocInstantiateWorkflowTemplateOperator( # The task id of your job task_id="dataproc_workflow_dag", # The template id of your workflow diff --git a/composer/workflows/hadoop_tutorial.py b/composer/workflows/hadoop_tutorial.py index 2906005ac1e6..303e1ecbcf6f 100644 --- a/composer/workflows/hadoop_tutorial.py +++ b/composer/workflows/hadoop_tutorial.py @@ -30,7 +30,7 @@ import os from airflow import models -from airflow.contrib.operators import dataproc_operator +from airflow.providers.google.cloud.operators import dataproc from airflow.utils import trigger_rule # Output file for Cloud Dataproc job. @@ -45,6 +45,26 @@ input_file = 'gs://pub/shakespeare/rose.txt' wordcount_args = ['wordcount', input_file, output_file] +HADOOP_JOB = { + "reference": {"project_id": models.Variable.get('gcp_project')}, + "placement": {"cluster_name": 'composer-hadoop-tutorial-cluster-{{ ds_nodash }}'}, + "hadoop_job": { + "main_jar_file_uri": WORDCOUNT_JAR, + "args": wordcount_args, + }, +} + +CLUSTER_CONFIG = { + "master_config": { + "num_instances": 1, + "machine_type_uri": "n1-standard-2" + }, + "worker_config": { + "num_instances": 2, + "machine_type_uri": "n1-standard-2" + }, +} + yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) @@ -60,9 +80,12 @@ # If a task fails, retry it once after waiting at least 5 minutes 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), - 'project_id': models.Variable.get('gcp_project') + 'project_id': models.Variable.get('gcp_project'), + 'location': models.Variable.get('gce_region'), + } + # [START composer_hadoop_schedule] with models.DAG( 'composer_hadoop_tutorial', @@ -72,28 +95,25 @@ # [END composer_hadoop_schedule] # Create a Cloud Dataproc cluster. - create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator( + create_dataproc_cluster = dataproc.DataprocCreateClusterOperator( task_id='create_dataproc_cluster', # Give the cluster a unique name by appending the date scheduled. # See https://airflow.apache.org/code.html#default-variables cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', - num_workers=2, - zone=models.Variable.get('gce_zone'), - master_machine_type='n1-standard-2', - worker_machine_type='n1-standard-2') + cluster_config=CLUSTER_CONFIG, + region=models.Variable.get('gce_region')) # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster # master node. - run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator( + run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator( task_id='run_dataproc_hadoop', - main_jar=WORDCOUNT_JAR, - cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', - arguments=wordcount_args) + job=HADOOP_JOB) # Delete Cloud Dataproc cluster. - delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator( + delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator( task_id='delete_dataproc_cluster', cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', + region=models.Variable.get('gce_region'), # Setting trigger_rule to ALL_DONE causes the cluster to be deleted # even if the Dataproc job fails. trigger_rule=trigger_rule.TriggerRule.ALL_DONE) diff --git a/composer/workflows/hadoop_tutorial_test.py b/composer/workflows/hadoop_tutorial_test.py index eaa7fe8372e8..0ef5ab889237 100644 --- a/composer/workflows/hadoop_tutorial_test.py +++ b/composer/workflows/hadoop_tutorial_test.py @@ -22,11 +22,11 @@ def set_variables(airflow_database): models.Variable.set('gcs_bucket', 'example-bucket') models.Variable.set('gcp_project', 'example-project') - models.Variable.set('gce_zone', 'us-central1-f') + models.Variable.set('gce_region', 'us-central1') yield models.Variable.delete('gcs_bucket') models.Variable.delete('gcp_project') - models.Variable.delete('gce_zone') + models.Variable.delete('gce_region') def test_dag_import(): diff --git a/composer/workflows/kubernetes_pod_operator.py b/composer/workflows/kubernetes_pod_operator.py index a5933d5418ba..e76f1e7407b2 100644 --- a/composer/workflows/kubernetes_pod_operator.py +++ b/composer/workflows/kubernetes_pod_operator.py @@ -18,8 +18,8 @@ import datetime from airflow import models -from airflow.contrib.kubernetes import secret -from airflow.contrib.operators import kubernetes_pod_operator +from airflow.kubernetes.secret import Secret +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator # A Secret is an object that contains a small amount of sensitive data such as @@ -29,7 +29,7 @@ # exposure. # [START composer_kubernetespodoperator_secretobject] -secret_env = secret.Secret( +secret_env = Secret( # Expose the secret as environment variable. deploy_type='env', # The name of the environment variable, since deploy_type is `env` rather @@ -39,7 +39,7 @@ secret='airflow-secrets', # Key of a secret stored in this Secret object key='sql_alchemy_conn') -secret_volume = secret.Secret( +secret_volume = Secret( deploy_type='volume', # Path where we mount the secret as volume deploy_target='/var/secrets/google', @@ -66,7 +66,7 @@ # created upon environment creation. # [START composer_kubernetespodoperator_minconfig] - kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator( + kubernetes_min_pod = KubernetesPodOperator( # The ID specified for the task. task_id='pod-ex-minimum', # Name of task you want to run, used to generate Pod ID. @@ -90,7 +90,7 @@ image='gcr.io/gcp-runtimes/ubuntu_18_0_4') # [END composer_kubernetespodoperator_minconfig] # [START composer_kubernetespodoperator_templateconfig] - kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator( + kubenetes_template_ex = KubernetesPodOperator( task_id='ex-kube-templates', name='ex-kube-templates', namespace='default', @@ -119,7 +119,7 @@ config_file="{{ conf.get('core', 'kube_config') }}") # [END composer_kubernetespodoperator_templateconfig] # [START composer_kubernetespodoperator_secretconfig] - kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator( + kubernetes_secret_vars_ex = KubernetesPodOperator( task_id='ex-kube-secrets', name='ex-kube-secrets', namespace='default', @@ -135,7 +135,7 @@ 'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'}) # [END composer_kubernetespodoperator_secretconfig] # [START composer_kubernetespodaffinity] - kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator( + kubernetes_affinity_ex = KubernetesPodOperator( task_id='ex-pod-affinity', name='ex-pod-affinity', namespace='default', @@ -177,7 +177,7 @@ }) # [END composer_kubernetespodaffinity] # [START composer_kubernetespodoperator_fullconfig] - kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator( + kubernetes_full_pod = KubernetesPodOperator( task_id='ex-all-configs', name='pi', namespace='default', diff --git a/composer/workflows/noxfile_config.py b/composer/workflows/noxfile_config.py index bc8c408b1fb0..5b78c2458b23 100644 --- a/composer/workflows/noxfile_config.py +++ b/composer/workflows/noxfile_config.py @@ -30,8 +30,7 @@ TEST_CONFIG_OVERRIDE = { # You can opt out from the test for specific Python versions. - # Skipping for Python 3.9 due to numpy compilation failure. - "ignored_versions": ["2.7", "3.9"], + 'ignored_versions': ["2.7", "3.6", "3.7", "3.9"], # Composer w/ Airflow 2 only supports Python 3.8 # Old samples are opted out of enforcing Python type hints # All new samples should feature them "enforce_type_hints": False, @@ -41,6 +40,10 @@ # to use your own Cloud project. "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": "20.2.4", # A dictionary you want to inject into your test. Don't put any # secrets here. These values will override predefined values. "envs": {"AIRFLOW_HOME": _tmpdir.name}, diff --git a/composer/workflows/pythonvirtualenvoperator_python2.py b/composer/workflows/pythonvirtualenvoperator_python2.py deleted file mode 100644 index af8c983a9ddd..000000000000 --- a/composer/workflows/pythonvirtualenvoperator_python2.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2018 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START composer_pythonvirtualenvoperator_python2] -import datetime - -from airflow import models -from airflow.operators import python_operator - - -def python2_function(): - """A function which has not been converted to Python 3.""" - # Use the global variable virtualenv_string_args to pass in values when the - # Python version differs from that used by the Airflow process. - global virtualenv_string_args - - # Imports must happen within the function when run with the - # PythonVirtualenvOperator. - import cStringIO - import logging - - arg0 = virtualenv_string_args[0] - buffer = cStringIO.StringIO() - buffer.write('Wrote an ASCII string to buffer:\n') - buffer.write(arg0) - logging.info(buffer.getvalue()) - - -yesterday = datetime.datetime.combine( - datetime.datetime.today() - datetime.timedelta(1), - datetime.datetime.min.time()) - - -default_dag_args = { - # Setting start date as yesterday starts the DAG immediately when it is - # detected in the Cloud Storage bucket. - 'start_date': yesterday, -} - -with models.DAG( - 'composer_sample_pythonvirtualenvoperator_python2', - schedule_interval=datetime.timedelta(days=1), - default_args=default_dag_args) as dag: - - # Use the PythonVirtualenvOperator to select an explicit python_version. - run_python2 = python_operator.PythonVirtualenvOperator( - task_id='run_python2', - python_callable=python2_function, - python_version='2', - string_args=['An example input string'], - ) -# [END composer_pythonvirtualenvoperator_python2] diff --git a/composer/workflows/pythonvirtualenvoperator_python2_test.py b/composer/workflows/pythonvirtualenvoperator_python2_test.py deleted file mode 100644 index cf9d8085d1e3..000000000000 --- a/composer/workflows/pythonvirtualenvoperator_python2_test.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2018 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import internal_unit_testing - - -def test_dag_import(): - """Test that the DAG file can be successfully imported. - - This tests that the DAG can be parsed, but does not run it in an Airflow - environment. This is a recommended confidence check by the official Airflow - docs: https://airflow.incubator.apache.org/tutorial.html#testing - """ - from . import pythonvirtualenvoperator_python2 as module - internal_unit_testing.assert_has_valid_dag(module) diff --git a/composer/workflows/quickstart.py b/composer/workflows/quickstart.py index 13a68fb738de..cbad953bea31 100644 --- a/composer/workflows/quickstart.py +++ b/composer/workflows/quickstart.py @@ -16,7 +16,7 @@ import datetime import airflow -from airflow.operators import bash_operator +from airflow.operators import bash YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1) @@ -38,6 +38,6 @@ schedule_interval=datetime.timedelta(days=1)) as dag: # Print the dag_run id from the Airflow logs - print_dag_run_conf = bash_operator.BashOperator( + print_dag_run_conf = bash.BashOperator( task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}') # [END composer_quickstart] diff --git a/composer/workflows/requirements-test.txt b/composer/workflows/requirements-test.txt index 31e58332d69f..736228fce0c4 100644 --- a/composer/workflows/requirements-test.txt +++ b/composer/workflows/requirements-test.txt @@ -1,2 +1,2 @@ pytest==6.2.4 -cloud-composer-dag-test-utils==0.0.1 +cloud-composer-dag-test-utils==1.0.0 diff --git a/composer/workflows/requirements.txt b/composer/workflows/requirements.txt index 545f3928e1f1..db269b932f81 100644 --- a/composer/workflows/requirements.txt +++ b/composer/workflows/requirements.txt @@ -1,6 +1,3 @@ -apache-airflow[gcp]==1.10.14 -kubernetes==17.17.0 -scipy==1.4.1; python_version > '3.0' -scipy==1.2.3; python_version < '3.0' -numpy==1.19.5; python_version > '3.0' -numpy==1.16.6; python_version < '3.0' +apache-airflow[google]==2.0.1 +apache-airflow-providers-cncf-kubernetes==1.2.0 +scipy==1.4.1; python_version > '3.0' \ No newline at end of file diff --git a/composer/workflows/unit_testing_cycle.py b/composer/workflows/unit_testing_cycle.py index ae30f015026a..aa8a609c46b8 100644 --- a/composer/workflows/unit_testing_cycle.py +++ b/composer/workflows/unit_testing_cycle.py @@ -17,7 +17,7 @@ import datetime from airflow import models -from airflow.operators import dummy_operator +from airflow.operators import dummy yesterday = datetime.datetime.now() - datetime.timedelta(days=1) @@ -30,6 +30,5 @@ 'composer_sample_cycle', schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: - start = dummy_operator.DummyOperator(task_id='oops_a_cycle') - end = dummy_operator.DummyOperator(task_id='oops_a_cycle') - start >> end + start = dummy.DummyOperator(task_id='oops_a_cycle') + start >> start diff --git a/composer/workflows/unit_testing_variables.py b/composer/workflows/unit_testing_variables.py index c1c6e06a48b3..5a8a3750e15d 100644 --- a/composer/workflows/unit_testing_variables.py +++ b/composer/workflows/unit_testing_variables.py @@ -17,8 +17,8 @@ import datetime from airflow import models -from airflow.operators import bash_operator -from airflow.operators import dummy_operator +from airflow.operators import bash +from airflow.operators import dummy yesterday = datetime.datetime.now() - datetime.timedelta(days=1) @@ -28,12 +28,12 @@ } with models.DAG( - 'composer_sample_cycle', + 'composer_sample_variables', schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: - start = dummy_operator.DummyOperator(task_id='start') - end = dummy_operator.DummyOperator(task_id='end') - variable_example = bash_operator.BashOperator( + start = dummy.DummyOperator(task_id='start') + end = dummy.DummyOperator(task_id='end') + variable_example = bash.BashOperator( task_id='variable_example', bash_command='echo project_id=' + models.Variable.get('gcp_project')) start >> variable_example >> end