Fix CloudDataTransferServiceUpdateJobOperator AWS credential injection for S3 sources#61611
Merged
shahar1 merged 2 commits intoapache:mainfrom Feb 10, 2026
Merged
Conversation
…on for S3 sources Fixes apache#22021 The UpdateJobOperator was passing the full patch request body to TransferJobPreprocessor, but that body wraps the TransferJob under a 'transfer_job' (or 'transferJob') key. The preprocessor looks for 'transferSpec' at the top level and thus never found it, skipping AWS credential injection for S3 sources. This fix extracts the inner TransferJob dict from the patch request body before passing it to TransferJobPreprocessor and TransferJobValidator, supporting both snake_case and camelCase key variants. Added tests to verify AWS credentials are injected when updating transfer jobs with S3 sources, and that role ARN credentials are not overridden.
eladkal
approved these changes
Feb 7, 2026
Contributor
Author
|
@eladkal Thanks for the approval, I'll merge it after testing on real Dag (that's why it's still drafted) |
Contributor
ideally we should have it also as a system test |
Contributor
Author
|
I created a compact Dag for testing E2E: Click here to to see its codefrom __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
from airflow.models.dag import DAG
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
AWS_S3_DATA_SOURCE,
BUCKET_NAME,
DESCRIPTION,
GCS_DATA_SINK,
PROJECT_ID,
SCHEDULE,
SCHEDULE_END_DATE,
SCHEDULE_START_DATE,
START_TIME_OF_DAY,
STATUS,
TRANSFER_JOB,
TRANSFER_JOB_FIELD_MASK,
TRANSFER_SPEC,
GcpTransferJobsStatus,
)
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
CloudDataTransferServiceCreateJobOperator,
CloudDataTransferServiceDeleteJobOperator,
CloudDataTransferServiceGetOperationOperator,
CloudDataTransferServiceListOperationsOperator,
CloudDataTransferServiceRunJobOperator,
CloudDataTransferServiceUpdateJobOperator,
)
# Configuration
GCP_PROJECT_ID = "my-gcp-project"
S3_SOURCE_BUCKET = "test-s3-bucket"
GCS_DEST_BUCKET = "test-gcs-bucket"
AWS_CONN_ID = "aws_default"
# Transfer job body: S3 to GCS transfer
s3_to_gcs_transfer_body = {
DESCRIPTION: "Example S3 to GCS transfer",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2025, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2025, 12, 31).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
},
}
# Update body for the transfer job.
# The patch request wraps the TransferJob under TRANSFER_JOB key.
# When the inner TransferJob contains a transferSpec with awsS3DataSource,
# the operator now correctly injects AWS credentials (fix for #22021).
update_body = {
PROJECT_ID: GCP_PROJECT_ID,
TRANSFER_JOB: {
DESCRIPTION: "Updated S3 to GCS transfer",
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: S3_SOURCE_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCS_DEST_BUCKET},
},
},
TRANSFER_JOB_FIELD_MASK: "description,transferSpec",
}
with DAG(
"example_gcp_storage_transfer",
description="Example DAG for Google Cloud Storage Transfer Service",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "gcp", "transfer"],
) as dag:
create_transfer_job = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job",
body=s3_to_gcs_transfer_body,
aws_conn_id=AWS_CONN_ID,
)
update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer_job",
job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] }}",
body=update_body,
aws_conn_id=AWS_CONN_ID,
)
run_transfer_job = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer_job",
job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] }}",
project_id=GCP_PROJECT_ID,
)
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={"project_id": GCP_PROJECT_ID},
)
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation",
operation_name="{{ task_instance.xcom_pull('list_operations')[0]['name'] }}",
)
delete_transfer_job = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job",
job_name="{{ task_instance.xcom_pull('create_transfer_job')['name'] }}",
project_id=GCP_PROJECT_ID,
)
create_transfer_job >> update_transfer_job >> run_transfer_job >> [
list_operations,
# delete_transfer_job,
]
list_operations >> get_operation
Based on the compact Dag, I updated the existing sytem test (I assume that it's not part of Google's automated runs due to the AWS dependency), and ran it manually:
Also updated docs to reflect the changes. CC: @VladaZakharova @MaksYermak - FYI, if you don't have any objections, I will merge it by Tuesday for the upcoming release. |
MaksYermak
approved these changes
Feb 9, 2026
81 tasks
Ratasa143
pushed a commit
to Ratasa143/airflow
that referenced
this pull request
Feb 15, 2026
choo121600
pushed a commit
to choo121600/airflow
that referenced
this pull request
Feb 22, 2026
AkshayArali
pushed a commit
to AkshayArali/airflow_630
that referenced
this pull request
Feb 27, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Description
Fixes #22021
The
CloudDataTransferServiceUpdateJobOperatorwas passing the full patch request body toTransferJobPreprocessor, but that body wraps theTransferJobunder atransfer_job(ortransferJob) key. The preprocessor looks fortransferSpecat the top level and thus never found it, skipping AWS credential injection for S3 sources.Changes
Operator Fix (
cloud_storage_transfer_service.py):_get_transfer_job_body()method that extracts the innerTransferJobdict from the patch request bodytransfer_job) and camelCase (transferJob) key variants_validate_inputs()andexecute()to use the extracted body for preprocessing and validationTest Coverage (
test_cloud_storage_transfer_service.py):test_job_update_with_aws_s3_source_camel_case: Verifies AWS credentials are injected with camelCase bodytest_job_update_with_aws_s3_source_snake_case: Verifies AWS credentials are injected with snake_case bodytest_job_update_with_aws_role_arn_does_not_inject_credentials: Verifies role ARN credentials are not overriddenTesting
All 74 tests in the operators test suite pass, including the 3 new tests and the existing update operator tests.
Generated-by: GitHub Copilot following the guidelines