Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ The function accepts time in two formats:

- as an :class:`~datetime.time` object

If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: :doc:`apache-airflow-providers-amazon:connections/aws`
The selected connection for AWS can be indicated by the parameter ``aws_conn_id``.
If you want to create a transfer job that copies data from AWS S3, you must have an AWS connection configured.
Information about configuration for AWS is available in :doc:`apache-airflow-providers-amazon:connections/aws`.

For parameter definition, take a look at
:class:`~airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceCreateJobOperator`.
Expand All @@ -67,6 +67,8 @@ Using the operator
:start-after: [START howto_operator_gcp_transfer_create_job_body_gcp]
:end-before: [END howto_operator_gcp_transfer_create_job_body_gcp]

.. note:: For AWS S3 sources, pass ``aws_conn_id`` to the operator.

.. exampleinclude:: /../../google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:start-after: [START howto_operator_gcp_transfer_create_job_body_aws]
Expand Down Expand Up @@ -128,7 +130,7 @@ More information
See `Google Cloud Transfer Service - REST Resource: transferJobs - Status
<https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#Status>`_

.. _howto/operator:CloudDataTransferServiceUpdateJobOperator:
.. _howto/operator:CloudDataTransferServiceRunJobOperator:

CloudDataTransferServiceRunJobOperator
-----------------------------------------
Expand Down Expand Up @@ -163,13 +165,16 @@ More information
See `Google Cloud Transfer Service - REST Resource: transferJobs - Run
<https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/run>`_

.. _howto/operator:CloudDataTransferServiceRunJobOperator:
.. _howto/operator:CloudDataTransferServiceUpdateJobOperator:

CloudDataTransferServiceUpdateJobOperator
-----------------------------------------

Updates a transfer job.

For AWS S3 sources you must have an AWS connection configured.
Information about configuration for AWS is available in :doc:`apache-airflow-providers-amazon:connections/aws`.

For parameter definition, take a look at
:class:`~airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceUpdateJobOperator`.

Expand All @@ -188,6 +193,20 @@ Using the operator
:start-after: [START howto_operator_gcp_transfer_update_job]
:end-before: [END howto_operator_gcp_transfer_update_job]

.. note:: For AWS S3 updates, pass ``aws_conn_id`` and include ``transferSpec`` in the update payload.
If your spec uses an IAM role (for example ``roleArn``), static credentials are not injected.

.. exampleinclude:: /../../google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:start-after: [START howto_operator_gcp_transfer_update_job_body_aws]
:end-before: [END howto_operator_gcp_transfer_update_job_body_aws]

.. exampleinclude:: /../../google/tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_update_job]
:end-before: [END howto_operator_gcp_transfer_update_job]

Templating
""""""""""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
SECRET_ACCESS_KEY,
START_TIME_OF_DAY,
STATUS,
TRANSFER_JOB,
TRANSFER_OPTIONS,
TRANSFER_SPEC,
YEAR,
Expand Down Expand Up @@ -353,13 +354,31 @@ def __init__(
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()

def _get_transfer_job_body(self) -> dict:
"""
Extract the transfer job dict from the update request body.

The update/patch request body wraps the ``TransferJob`` under a
``transfer_job`` (or ``transferJob``) key. Both snake_case and
camelCase variants are supported because the google-api-python-client
accepts either form.
"""
if TRANSFER_JOB in self.body:
return self.body[TRANSFER_JOB]
# camelCase variant ("transferJob") used in the REST API
if "transferJob" in self.body:
return self.body["transferJob"]
return self.body

def _validate_inputs(self) -> None:
TransferJobValidator(body=self.body).validate_body()
TransferJobValidator(body=self._get_transfer_job_body()).validate_body()
if not self.job_name:
raise AirflowException("The required parameter 'job_name' is empty or None")

def execute(self, context: Context) -> dict:
TransferJobPreprocessor(body=self.body, aws_conn_id=self.aws_conn_id).process_body()
TransferJobPreprocessor(
body=self._get_transfer_job_body(), aws_conn_id=self.aws_conn_id
).process_body()
hook = CloudDataTransferServiceHook(
api_version=self.api_version,
gcp_conn_id=self.gcp_conn_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
SCHEDULE_START_DATE,
START_TIME_OF_DAY,
STATUS,
TRANSFER_JOB,
TRANSFER_JOB_FIELD_MASK,
TRANSFER_OPTIONS,
TRANSFER_SPEC,
GcpTransferJobsStatus,
Expand All @@ -56,6 +58,7 @@
CloudDataTransferServiceListOperationsOperator,
CloudDataTransferServicePauseOperationOperator,
CloudDataTransferServiceResumeOperationOperator,
CloudDataTransferServiceUpdateJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service import (
Expand Down Expand Up @@ -107,6 +110,21 @@
aws_to_gcs_transfer_body_2 = deepcopy(aws_to_gcs_transfer_body)
aws_to_gcs_transfer_body_2[JOB_NAME] = GCP_TRANSFER_JOB_2_NAME

# [START howto_operator_gcp_transfer_update_job_body_aws]
update_body = {
PROJECT_ID: GCP_PROJECT_ID,
TRANSFER_JOB: {
DESCRIPTION: "description_updated",
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
},
TRANSFER_JOB_FIELD_MASK: "description,transferSpec",
}
# [END howto_operator_gcp_transfer_update_job_body_aws]

with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
Expand Down Expand Up @@ -187,6 +205,14 @@
)
# [END howto_operator_gcp_transfer_wait_operation]

# [START howto_operator_gcp_transfer_update_job]
update_transfer_job_s3_to_gcs = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
body=update_body,
)
# [END howto_operator_gcp_transfer_update_job]

create_second_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_s3_to_gcs_2", body=aws_to_gcs_transfer_body_2
)
Expand Down Expand Up @@ -247,6 +273,9 @@
>> get_operation
>> resume_operation
>> wait_for_operation_to_end
# [START howto_operator_gcp_transfer_update_job_aws]
>> update_transfer_job_s3_to_gcs
# [END howto_operator_gcp_transfer_update_job_aws]
>> create_second_transfer_job_from_aws
>> wait_for_operation_to_start_2
>> cancel_operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
SECRET_ACCESS_KEY,
START_TIME_OF_DAY,
STATUS,
TRANSFER_JOB,
TRANSFER_SPEC,
)
from airflow.providers.google.cloud.openlineage.facets import (
Expand Down Expand Up @@ -433,6 +434,88 @@ def test_job_update(self, mock_hook):
mock_hook.return_value.update_transfer_job.assert_called_once_with(job_name=JOB_NAME, body=body)
assert result == VALID_TRANSFER_JOB_GCS

@pytest.mark.skipif(boto3 is None, reason="Skipping test because boto3 is not available")
@mock.patch(
"airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceHook"
)
@mock.patch("airflow.providers.google.cloud.operators.cloud_storage_transfer_service.AwsBaseHook")
def test_job_update_with_aws_s3_source_camel_case(self, aws_hook, mock_hook):
"""Test that AWS credentials are injected for update body with camelCase 'transferJob' key."""
aws_hook.return_value.get_credentials.return_value = Credentials(
TEST_AWS_ACCESS_KEY_ID, TEST_AWS_ACCESS_SECRET, None
)
mock_hook.return_value.update_transfer_job.return_value = VALID_TRANSFER_JOB_AWS

body = {
"transferJob": {
TRANSFER_SPEC: deepcopy(SOURCE_AWS),
},
"updateTransferJobFieldMask": TRANSFER_SPEC,
}
op = CloudDataTransferServiceUpdateJobOperator(
job_name=JOB_NAME,
body=body,
task_id=TASK_ID,
)
op.execute(context=mock.MagicMock())

# Verify AWS credentials were injected into the inner transfer job body
assert body["transferJob"][TRANSFER_SPEC][AWS_S3_DATA_SOURCE][AWS_ACCESS_KEY] == TEST_AWS_ACCESS_KEY
mock_hook.return_value.update_transfer_job.assert_called_once_with(job_name=JOB_NAME, body=body)

@pytest.mark.skipif(boto3 is None, reason="Skipping test because boto3 is not available")
@mock.patch(
"airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceHook"
)
@mock.patch("airflow.providers.google.cloud.operators.cloud_storage_transfer_service.AwsBaseHook")
def test_job_update_with_aws_s3_source_snake_case(self, aws_hook, mock_hook):
"""Test that AWS credentials are injected for update body with snake_case 'transfer_job' key."""
aws_hook.return_value.get_credentials.return_value = Credentials(
TEST_AWS_ACCESS_KEY_ID, TEST_AWS_ACCESS_SECRET, None
)
mock_hook.return_value.update_transfer_job.return_value = VALID_TRANSFER_JOB_AWS

body = {
TRANSFER_JOB: {
TRANSFER_SPEC: deepcopy(SOURCE_AWS),
},
"update_transfer_job_field_mask": TRANSFER_SPEC,
}
op = CloudDataTransferServiceUpdateJobOperator(
job_name=JOB_NAME,
body=body,
task_id=TASK_ID,
)
op.execute(context=mock.MagicMock())

# Verify AWS credentials were injected into the inner transfer job body
assert body[TRANSFER_JOB][TRANSFER_SPEC][AWS_S3_DATA_SOURCE][AWS_ACCESS_KEY] == TEST_AWS_ACCESS_KEY
mock_hook.return_value.update_transfer_job.assert_called_once_with(job_name=JOB_NAME, body=body)

@mock.patch(
"airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceHook"
)
@mock.patch("airflow.providers.google.cloud.operators.cloud_storage_transfer_service.AwsBaseHook")
def test_job_update_with_aws_role_arn_does_not_inject_credentials(self, aws_hook, mock_hook):
"""Test that AWS credentials are NOT injected when roleArn is present."""
mock_hook.return_value.update_transfer_job.return_value = VALID_TRANSFER_JOB_AWS_ROLE_ARN

body = {
"transferJob": {
TRANSFER_SPEC: deepcopy(SOURCE_AWS_ROLE_ARN),
},
"updateTransferJobFieldMask": TRANSFER_SPEC,
}
op = CloudDataTransferServiceUpdateJobOperator(
job_name=JOB_NAME,
body=body,
task_id=TASK_ID,
)
op.execute(context=mock.MagicMock())

# Verify AWS credentials were NOT injected
assert AWS_ACCESS_KEY not in body["transferJob"][TRANSFER_SPEC][AWS_S3_DATA_SOURCE]

# Setting all the operator's input parameters as templated dag_ids
# (could be anything else) just to test if the templating works for all
# fields
Expand Down
Loading