Skip to content

Fix CloudDataTransferServiceUpdateJobOperator AWS credential injection for S3 sources#61611

Merged
shahar1 merged 2 commits intoapache:mainfrom
shahar1:fix/22021-transfer-job-update-aws-credentials
Feb 10, 2026
Merged

Fix CloudDataTransferServiceUpdateJobOperator AWS credential injection for S3 sources#61611
shahar1 merged 2 commits intoapache:mainfrom
shahar1:fix/22021-transfer-job-update-aws-credentials

Conversation

@shahar1
Copy link
Contributor

@shahar1 shahar1 commented Feb 7, 2026

Description

Fixes #22021

The CloudDataTransferServiceUpdateJobOperator was passing the full patch request body to TransferJobPreprocessor, but that body wraps the TransferJob under a transfer_job (or transferJob) key. The preprocessor looks for transferSpec at the top level and thus never found it, skipping AWS credential injection for S3 sources.

Changes

  1. Operator Fix (cloud_storage_transfer_service.py):

    • Added _get_transfer_job_body() method that extracts the inner TransferJob dict from the patch request body
    • Supports both snake_case (transfer_job) and camelCase (transferJob) key variants
    • Updated _validate_inputs() and execute() to use the extracted body for preprocessing and validation
  2. Test Coverage (test_cloud_storage_transfer_service.py):

    • Added test_job_update_with_aws_s3_source_camel_case: Verifies AWS credentials are injected with camelCase body
    • Added test_job_update_with_aws_s3_source_snake_case: Verifies AWS credentials are injected with snake_case body
    • Added test_job_update_with_aws_role_arn_does_not_inject_credentials: Verifies role ARN credentials are not overridden

Testing

All 74 tests in the operators test suite pass, including the 3 new tests and the existing update operator tests.


  • Yes (please specify the tool below)

Generated-by: GitHub Copilot following the guidelines

…on for S3 sources

Fixes apache#22021

The UpdateJobOperator was passing the full patch request body to
TransferJobPreprocessor, but that body wraps the TransferJob under a
'transfer_job' (or 'transferJob') key. The preprocessor looks for
'transferSpec' at the top level and thus never found it, skipping
AWS credential injection for S3 sources.

This fix extracts the inner TransferJob dict from the patch request body
before passing it to TransferJobPreprocessor and TransferJobValidator,
supporting both snake_case and camelCase key variants.

Added tests to verify AWS credentials are injected when updating transfer
jobs with S3 sources, and that role ARN credentials are not overridden.
@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Feb 7, 2026
@shahar1 shahar1 changed the title Fix: CloudDataTransferServiceUpdateJobOperator AWS credential injection for S3 sources Fix CloudDataTransferServiceUpdateJobOperator AWS credential injection for S3 sources Feb 7, 2026
@shahar1
Copy link
Contributor Author

shahar1 commented Feb 7, 2026

@eladkal Thanks for the approval, I'll merge it after testing on real Dag (that's why it's still drafted)

@eladkal
Copy link
Contributor

eladkal commented Feb 7, 2026

@eladkal Thanks for the approval, I'll merge it after testing on real Dag (that's why it's still drafted)

ideally we should have it also as a system test

@shahar1 shahar1 marked this pull request as ready for review February 7, 2026 22:14
@shahar1
Copy link
Contributor Author

shahar1 commented Feb 7, 2026

I created a compact Dag for testing E2E:

Click here to to see its code
from __future__ import annotations

import os
from datetime import datetime, timedelta, timezone

from airflow.models.dag import DAG
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
    AWS_S3_DATA_SOURCE,
    BUCKET_NAME,
    DESCRIPTION,
    GCS_DATA_SINK,
    PROJECT_ID,
    SCHEDULE,
    SCHEDULE_END_DATE,
    SCHEDULE_START_DATE,
    START_TIME_OF_DAY,
    STATUS,
    TRANSFER_JOB,
    TRANSFER_JOB_FIELD_MASK,
    TRANSFER_SPEC,
    GcpTransferJobsStatus,
)
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
    CloudDataTransferServiceCreateJobOperator,
    CloudDataTransferServiceDeleteJobOperator,
    CloudDataTransferServiceGetOperationOperator,
    CloudDataTransferServiceListOperationsOperator,
    CloudDataTransferServiceRunJobOperator,
    CloudDataTransferServiceUpdateJobOperator,
)

# Configuration
GCP_PROJECT_ID = "my-gcp-project"
S3_SOURCE_BUCKET = "test-s3-bucket"
GCS_DEST_BUCKET = "test-gcs-bucket"
AWS_CONN_ID = "aws_default"

# Transfer job body: S3 to GCS transfer
s3_to_gcs_transfer_body = {
    DESCRIPTION: "Example S3 to GCS transfer",
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2025, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2025, 12, 31).date(),
        START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
        GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
    },
}

# Update body for the transfer job.
# The patch request wraps the TransferJob under TRANSFER_JOB key.
# When the inner TransferJob contains a transferSpec with awsS3DataSource,
# the operator now correctly injects AWS credentials (fix for #22021).
update_body = {
    PROJECT_ID: GCP_PROJECT_ID,
    TRANSFER_JOB: {
        DESCRIPTION: "Updated S3 to GCS transfer",
        TRANSFER_SPEC: {
            AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
            GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
        },
    },
    TRANSFER_JOB_FIELD_MASK: "description,transferSpec",
}

with DAG(
    "example_gcp_storage_transfer",
    description="Example DAG for Google Cloud Storage Transfer Service",
    schedule=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "gcp", "transfer"],
) as dag:

    create_transfer_job = CloudDataTransferServiceCreateJobOperator(
        task_id="create_transfer_job",
        body=s3_to_gcs_transfer_body,
        aws_conn_id=AWS_CONN_ID,
    )

    update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
        task_id="update_transfer_job",
        job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] }}",
        body=update_body,
        aws_conn_id=AWS_CONN_ID,
    )

    run_transfer_job = CloudDataTransferServiceRunJobOperator(
        task_id="run_transfer_job",
        job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] }}",
        project_id=GCP_PROJECT_ID,
    )

    list_operations = CloudDataTransferServiceListOperationsOperator(
        task_id="list_operations",
        request_filter={"project_id": GCP_PROJECT_ID},
    )

    get_operation = CloudDataTransferServiceGetOperationOperator(
        task_id="get_operation",
        operation_name="{{ task_instance.xcom_pull('list_operations')[0]['name'] }}",
    )

    delete_transfer_job = CloudDataTransferServiceDeleteJobOperator(
         task_id="delete_transfer_job",
         job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] }}",
         project_id=GCP_PROJECT_ID,
     )

    create_transfer_job >> update_transfer_job >> run_transfer_job >> [
        list_operations,
        # delete_transfer_job,
    ]
    list_operations >> get_operation
image
  • 1st run - E2E after applying the change to ensure that it works
  • 2nd run - E2E without the deletion step, to check that the files actually present
  • 3rd run - from main, expected to fail on the update step

Based on the compact Dag, I updated the existing sytem test (I assume that it's not part of Google's automated runs due to the AWS dependency), and ran it manually:

image

Also updated docs to reflect the changes.

CC: @VladaZakharova @MaksYermak - FYI, if you don't have any objections, I will merge it by Tuesday for the upcoming release.

@shahar1 shahar1 merged commit 246a386 into apache:main Feb 10, 2026
115 of 123 checks passed
@shahar1 shahar1 deleted the fix/22021-transfer-job-update-aws-credentials branch February 10, 2026 06:35
Ratasa143 pushed a commit to Ratasa143/airflow that referenced this pull request Feb 15, 2026
choo121600 pushed a commit to choo121600/airflow that referenced this pull request Feb 22, 2026
AkshayArali pushed a commit to AkshayArali/airflow_630 that referenced this pull request Feb 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Google Cloud Transfer operators: bug in transfer service update job operator

3 participants