Skip to content

Commit

Permalink
[AIRFLOW-6973] Make GCSCreateBucketOperator idempotent (#7609)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek authored Mar 4, 2020
1 parent 09fea3c commit cb2f339
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/example_dags/example_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
list_buckets >> delete_bucket_1
upload_file >> delete_bucket_1
create_bucket1 >> upload_file >> delete_bucket_1
transform_file >> delete_bucket_1
upload_file >> transform_file >> delete_bucket_1
gcs_bucket_create_acl_entry_task >> delete_bucket_1
gcs_object_create_acl_entry_task >> delete_bucket_1
download_file >> delete_bucket_1
Expand Down
18 changes: 11 additions & 7 deletions airflow/providers/google/cloud/operators/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from tempfile import NamedTemporaryFile
from typing import Dict, Iterable, List, Optional, Union

from google.api_core.exceptions import Conflict

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.models.xcom import MAX_XCOM_SIZE
Expand Down Expand Up @@ -133,13 +135,15 @@ def execute(self, context):
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)

hook.create_bucket(bucket_name=self.bucket_name,
resource=self.resource,
storage_class=self.storage_class,
location=self.location,
project_id=self.project_id,
labels=self.labels)
try:
hook.create_bucket(bucket_name=self.bucket_name,
resource=self.resource,
storage_class=self.storage_class,
location=self.location,
project_id=self.project_id,
labels=self.labels)
except Conflict: # HTTP 409
self.log.warning("Bucket %s already exists", self.bucket_name)


class GCSListObjectsOperator(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from airflow.providers.postgres.hooks.postgres import PostgresHook
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
from tests.test_utils.gcp_system_helpers import GoogleSystemTest, provide_gcp_context
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context

GCS_BUCKET = "postgres_to_gcs_example"
CREATE_QUERY = """
Expand Down Expand Up @@ -79,7 +79,7 @@ def setUp(self):

@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_dag(self):
self.run_dag('example_postgres_to_gcs', 'airflow/example_dags')
self.run_dag('example_postgres_to_gcs', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_GCS_KEY)
def tearDown(self):
Expand Down

0 comments on commit cb2f339

Please sign in to comment.