Skip to content

Commit

Permalink
Remove GKEStartPodOperator when backporting (#7908)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek authored Mar 29, 2020
1 parent daad60b commit 0f19a93
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google Kubernetes Engine.
"""

import os

from airflow import models
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKECreateClusterOperator, GKEDeleteClusterOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
GCP_LOCATION = os.environ.get("GCP_GKE_LOCATION", "europe-north1-a")
CLUSTER_NAME = os.environ.get("GCP_GKE_CLUSTER_NAME", "cluster-name")

CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}

default_args = {"start_date": days_ago(1)}


with models.DAG(
"example_gcp_gke_setup",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)

delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)

create_cluster >> delete_cluster
17 changes: 17 additions & 0 deletions backport_packages/setup_backport_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ def add_provide_context_to_python_operator(node: LN, capture: Capture, filename:
provide_context_arg.prefix = fn_args.children[0].prefix
fn_args.append_child(provide_context_arg)

def remove_class(qry, class_name) -> None:
def _remover(node: LN, capture: Capture, filename: Filename) -> None:
if node.type == 300:
for ch in node.post_order():
if isinstance(ch, Leaf) and ch.value == class_name:
if ch.next_sibling and ch.next_sibling.value == ",":
ch.next_sibling.remove()
ch.remove()
elif node.type == 311:
node.parent.remove()
else:
node.remove()

qry.select_class(class_name).modify(_remover)

changes = [
("airflow.operators.bash", "airflow.operators.bash_operator"),
("airflow.operators.python", "airflow.operators.python_operator"),
Expand Down Expand Up @@ -228,6 +243,8 @@ def add_provide_context_to_python_operator(node: LN, capture: Capture, filename:
.modify(add_provide_context_to_python_operator)
)

remove_class(qry, "GKEStartPodOperator")

qry.execute(write=True, silent=False, interactive=False)


Expand Down
12 changes: 11 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,14 @@ def pytest_configure(config):
"markers", "system(name): mark test to run with named system"
)
config.addinivalue_line(
"markers", "long_running(name): mark test that run for a long time (many minutes)"
"markers", "long_running: mark test that run for a long time (many minutes)"
)
config.addinivalue_line(
"markers", "credential_file(name): mark tests that require credential file in CREDENTIALS_DIR"
)
config.addinivalue_line(
"markers", "airflow_2: mark tests that works only on Airflow 2.0 / master"
)


def skip_if_not_marked_with_integration(selected_integrations, item):
Expand Down Expand Up @@ -292,6 +295,12 @@ def skip_if_credential_file_missing(item):
format(path=credential_path, item=item))


def skip_if_airflow_2_test(item):
for _ in item.iter_markers(name="airflow_2"):
if os.environ.get("RUN_AIRFLOW_1_10") == "true":
pytest.skip("The test works only with Airflow 2.0 / master branch")


def pytest_runtest_setup(item):
selected_integrations = item.config.getoption("--integrations")
selected_integrations_list = selected_integrations.split(",") if selected_integrations else []
Expand Down Expand Up @@ -319,3 +328,4 @@ def pytest_runtest_setup(item):
if not include_long_running:
skip_long_running_test(item)
skip_if_credential_file_missing(item)
skip_if_airflow_2_test(item)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
@pytest.mark.backend("mysql", "postgres")
@pytest.mark.credential_file(GCP_GKE_KEY)
class KubernetesEngineExampleDagTest(GoogleSystemTest):
@pytest.mark.airflow_2
@provide_gcp_context(GCP_GKE_KEY)
def test_run_example_dag(self):
def test_run_example_gcp_gke(self):
self.run_dag('example_gcp_gke', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_GKE_KEY)
def test_run_example_gcp_gke_setup(self):
self.run_dag('example_gcp_gke_setup', CLOUD_DAG_FOLDER)

0 comments on commit 0f19a93

Please sign in to comment.