Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/providers/amazon/aws/transfers/local_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ def __init__(
self.gzip = gzip
self.acl_policy = acl_policy

def _check_inputs(self):
if 's3://' in self.dest_key and self.dest_bucket is not None:
raise TypeError('dest_bucket should be None when dest_key is provided as a full s3:// file path.')

def execute(self, context):
self._check_inputs()
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
s3_hook.load_file(
self.filename,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def __init__(
self.gzip = gzip
self.google_impersonation_chain = google_impersonation_chain

if dest_gcs and not gcs_object_is_directory(self.dest_gcs):
def _check_inputs(self) -> None:
if self.dest_gcs and not gcs_object_is_directory(self.dest_gcs):
self.log.info(
'Destination Google Cloud Storage path is not a valid '
'"directory", define a path that ends with a slash "/" or '
Expand All @@ -114,6 +115,7 @@ def __init__(
)

def execute(self, context):
self._check_inputs()
azure_fileshare_hook = AzureFileShareHook(self.azure_fileshare_conn_id)
files = azure_fileshare_hook.list_files(
share_name=self.share_name, directory_name=self.directory_name
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/transfers/s3_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def __init__(
self.gzip = gzip
self.google_impersonation_chain = google_impersonation_chain

if dest_gcs and not gcs_object_is_directory(self.dest_gcs):
def _check_inputs(self) -> None:
if self.dest_gcs and not gcs_object_is_directory(self.dest_gcs):
self.log.info(
'Destination Google Cloud Storage path is not a valid '
'"directory", define a path that ends with a slash "/" or '
Expand All @@ -158,6 +159,7 @@ def __init__(
)

def execute(self, context):
self._check_inputs()
# use the super method to list all the files in an S3 bucket/key
files = super().execute(context)

Expand Down
19 changes: 10 additions & 9 deletions tests/providers/amazon/aws/transfers/test_local_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ def test_init(self):
assert operator.encrypt == self._config['encrypt']
assert operator.gzip == self._config['gzip']

def test_init_exception(self):
def test_execute_exception(self):
operator = LocalFilesystemToS3Operator(
task_id='file_to_s3_operatro_exception',
dag=self.dag,
filename=self.testfile1,
dest_key=f's3://dummy/{self.dest_key}',
dest_bucket=self.dest_bucket,
**self._config,
)
with self.assertRaises(TypeError):
LocalFilesystemToS3Operator(
task_id='file_to_s3_operatro_exception',
dag=self.dag,
filename=self.testfile1,
dest_key=f's3://dummy/{self.dest_key}',
dest_bucket=self.dest_bucket,
**self._config,
)
operator.execute(None)

@mock_s3
def test_execute(self):
Expand Down