From ddf9013098b09176d7b34861b2357ded50b9fe26 Mon Sep 17 00:00:00 2001 From: chethanuk-plutoflume Date: Sun, 5 Jun 2022 10:18:04 +0100 Subject: [PATCH] AIP-47 - Migrate databricks DAGs to new design #22442 (#24203) --- .../databricks/example_dags/__init__.py | 17 ----------------- .../index.rst | 2 +- .../operators/copy_into.rst | 2 +- .../operators/repos_create.rst | 2 +- .../operators/repos_delete.rst | 2 +- .../operators/repos_update.rst | 2 +- .../operators/sql.rst | 8 ++++---- .../operators/submit_run.rst | 4 ++-- .../providers/databricks}/example_databricks.py | 17 ++++++++++++++++- .../databricks}/example_databricks_repos.py | 17 ++++++++++++++++- .../databricks}/example_databricks_sql.py | 17 ++++++++++++++++- 11 files changed, 59 insertions(+), 31 deletions(-) delete mode 100644 airflow/providers/databricks/example_dags/__init__.py rename {airflow/providers/databricks/example_dags => tests/system/providers/databricks}/example_databricks.py (84%) rename {airflow/providers/databricks/example_dags => tests/system/providers/databricks}/example_databricks_repos.py (84%) rename {airflow/providers/databricks/example_dags => tests/system/providers/databricks}/example_databricks_sql.py (89%) diff --git a/airflow/providers/databricks/example_dags/__init__.py b/airflow/providers/databricks/example_dags/__init__.py deleted file mode 100644 index 217e5db9607827..00000000000000 --- a/airflow/providers/databricks/example_dags/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/docs/apache-airflow-providers-databricks/index.rst b/docs/apache-airflow-providers-databricks/index.rst index fed33f60c9953c..f41834a95b2688 100644 --- a/docs/apache-airflow-providers-databricks/index.rst +++ b/docs/apache-airflow-providers-databricks/index.rst @@ -39,7 +39,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources diff --git a/docs/apache-airflow-providers-databricks/operators/copy_into.rst b/docs/apache-airflow-providers-databricks/operators/copy_into.rst index 79716c256f6a90..71a3fa9e89ad8c 100644 --- a/docs/apache-airflow-providers-databricks/operators/copy_into.rst +++ b/docs/apache-airflow-providers-databricks/operators/copy_into.rst @@ -46,7 +46,7 @@ Importing CSV data An example usage of the DatabricksCopyIntoOperator to import CSV data into a table is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py :language: python :start-after: [START howto_operator_databricks_copy_into] :end-before: [END howto_operator_databricks_copy_into] diff --git a/docs/apache-airflow-providers-databricks/operators/repos_create.rst b/docs/apache-airflow-providers-databricks/operators/repos_create.rst index fc04340796d495..6611a51cd6c155 100644 --- a/docs/apache-airflow-providers-databricks/operators/repos_create.rst +++ b/docs/apache-airflow-providers-databricks/operators/repos_create.rst @@ -63,7 +63,7 @@ Create a Databricks Repo An example usage of the DatabricksReposCreateOperator is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py :language: python :start-after: [START howto_operator_databricks_repo_create] :end-before: [END howto_operator_databricks_repo_create] diff --git a/docs/apache-airflow-providers-databricks/operators/repos_delete.rst b/docs/apache-airflow-providers-databricks/operators/repos_delete.rst index e359deb7c9170d..74d4b62972a146 100644 --- a/docs/apache-airflow-providers-databricks/operators/repos_delete.rst +++ b/docs/apache-airflow-providers-databricks/operators/repos_delete.rst @@ -55,7 +55,7 @@ Deleting Databricks Repo by specifying path An example usage of the DatabricksReposDeleteOperator is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py :language: python :start-after: [START howto_operator_databricks_repo_delete] :end-before: [END howto_operator_databricks_repo_delete] diff --git a/docs/apache-airflow-providers-databricks/operators/repos_update.rst b/docs/apache-airflow-providers-databricks/operators/repos_update.rst index 0f63c2468551ec..56af4edabbcf65 100644 --- a/docs/apache-airflow-providers-databricks/operators/repos_update.rst +++ b/docs/apache-airflow-providers-databricks/operators/repos_update.rst @@ -60,7 +60,7 @@ Updating Databricks Repo by specifying path An example usage of the DatabricksReposUpdateOperator is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py :language: python :start-after: [START howto_operator_databricks_repo_update] :end-before: [END howto_operator_databricks_repo_update] diff --git a/docs/apache-airflow-providers-databricks/operators/sql.rst b/docs/apache-airflow-providers-databricks/operators/sql.rst index 93a3b88007a44e..7e80a6b7cffd08 100644 --- a/docs/apache-airflow-providers-databricks/operators/sql.rst +++ b/docs/apache-airflow-providers-databricks/operators/sql.rst @@ -49,7 +49,7 @@ Selecting data An example usage of the DatabricksSqlOperator to select data from a table is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py :language: python :start-after: [START howto_operator_databricks_sql_select] :end-before: [END howto_operator_databricks_sql_select] @@ -59,7 +59,7 @@ Selecting data into a file An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py :language: python :start-after: [START howto_operator_databricks_sql_select_file] :end-before: [END howto_operator_databricks_sql_select_file] @@ -69,7 +69,7 @@ Executing multiple statements An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py :language: python :start-after: [START howto_operator_databricks_sql_multiple] :end-before: [END howto_operator_databricks_sql_multiple] @@ -80,7 +80,7 @@ Executing multiple statements from a file An example usage of the DatabricksSqlOperator to perform statements from a file is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py :language: python :start-after: [START howto_operator_databricks_sql_multiple_file] :end-before: [END howto_operator_databricks_sql_multiple_file] diff --git a/docs/apache-airflow-providers-databricks/operators/submit_run.rst b/docs/apache-airflow-providers-databricks/operators/submit_run.rst index 81f9dfd32f382c..1b32b771de7527 100644 --- a/docs/apache-airflow-providers-databricks/operators/submit_run.rst +++ b/docs/apache-airflow-providers-databricks/operators/submit_run.rst @@ -61,7 +61,7 @@ Specifying parameters as JSON An example usage of the DatabricksSubmitRunOperator is as follows: -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py :language: python :start-after: [START howto_operator_databricks_json] :end-before: [END howto_operator_databricks_json] @@ -71,7 +71,7 @@ Using named parameters You can also use named parameters to initialize the operator and run the job. -.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py +.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py :language: python :start-after: [START howto_operator_databricks_named] :end-before: [END howto_operator_databricks_named] diff --git a/airflow/providers/databricks/example_dags/example_databricks.py b/tests/system/providers/databricks/example_databricks.py similarity index 84% rename from airflow/providers/databricks/example_dags/example_databricks.py rename to tests/system/providers/databricks/example_databricks.py index bea9038afeb0a0..48a3ca922a0bba 100644 --- a/airflow/providers/databricks/example_dags/example_databricks.py +++ b/tests/system/providers/databricks/example_databricks.py @@ -31,13 +31,17 @@ https://docs.databricks.com/api/latest/jobs.html#runstate """ +import os from datetime import datetime from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_databricks_operator" + with DAG( - dag_id='example_databricks_operator', + dag_id=DAG_ID, schedule_interval='@daily', start_date=datetime(2021, 1, 1), tags=['example'], @@ -73,3 +77,14 @@ ) # [END howto_operator_databricks_named] notebook_task >> spark_jar_task + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/databricks/example_dags/example_databricks_repos.py b/tests/system/providers/databricks/example_databricks_repos.py similarity index 84% rename from airflow/providers/databricks/example_dags/example_databricks_repos.py rename to tests/system/providers/databricks/example_databricks_repos.py index e33d32044f5dfc..eb76abcf198ac4 100644 --- a/airflow/providers/databricks/example_dags/example_databricks_repos.py +++ b/tests/system/providers/databricks/example_databricks_repos.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import os from datetime import datetime from airflow import DAG @@ -30,8 +31,11 @@ 'databricks_conn_id': 'databricks', } +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_databricks_repos_operator" + with DAG( - dag_id='example_databricks_repos_operator', + dag_id=DAG_ID, schedule_interval='@daily', start_date=datetime(2021, 1, 1), default_args=default_args, @@ -72,3 +76,14 @@ # [END howto_operator_databricks_repo_delete] (create_repo >> update_repo >> notebook_task >> delete_repo) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/databricks/example_dags/example_databricks_sql.py b/tests/system/providers/databricks/example_databricks_sql.py similarity index 89% rename from airflow/providers/databricks/example_dags/example_databricks_sql.py rename to tests/system/providers/databricks/example_databricks_sql.py index 6032c0fb030322..33ee37c9728562 100644 --- a/airflow/providers/databricks/example_dags/example_databricks_sql.py +++ b/tests/system/providers/databricks/example_databricks_sql.py @@ -31,6 +31,7 @@ https://docs.databricks.com/api/latest/jobs.html#runstate """ +import os from datetime import datetime from airflow import DAG @@ -39,8 +40,11 @@ DatabricksSqlOperator, ) +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_databricks_sql_operator" + with DAG( - dag_id='example_databricks_sql_operator', + dag_id=DAG_ID, schedule_interval='@daily', start_date=datetime(2021, 1, 1), tags=['example'], @@ -111,3 +115,14 @@ # [END howto_operator_databricks_copy_into] (create >> create_file >> import_csv >> select >> select_into_file) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)