Skip to content

Commit

Permalink
Add option to delete by prefix to S3DeleteObjectsOperator (#9350)
Browse files Browse the repository at this point in the history
Co-authored-by: Felix Uellendall <feluelle@users.noreply.github.com>
GitOrigin-RevId: 52b6efe1ecaae74b9c2497f565e116305d575a76
  • Loading branch information
2 people authored and Cloud Composer Team committed Sep 12, 2024
1 parent dba0128 commit 12be5b2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
18 changes: 15 additions & 3 deletions airflow/providers/amazon/aws/operators/s3_delete_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class S3DeleteObjectsOperator(BaseOperator):
You may specify up to 1000 keys.
:type keys: str or list
:param prefix: Prefix of objects to delete. (templated)
All objects matching this prefix in the bucket will be deleted.
:type prefix: str
:param aws_conn_id: Connection id of the S3 connection to use
:type aws_conn_id: str
:param verify: Whether or not to verify SSL certificates for S3 connection.
Expand All @@ -56,22 +59,31 @@ class S3DeleteObjectsOperator(BaseOperator):
:type verify: bool or str
"""

template_fields = ('keys', 'bucket')
template_fields = ('keys', 'bucket', 'prefix')

@apply_defaults
def __init__(
self,
bucket,
keys,
keys=None,
prefix=None,
aws_conn_id='aws_default',
verify=None,
*args, **kwargs):

if not bool(keys) ^ bool(prefix):
raise ValueError("Either keys or prefix should be set.")

super().__init__(*args, **kwargs)
self.bucket = bucket
self.keys = keys
self.prefix = prefix
self.aws_conn_id = aws_conn_id
self.verify = verify

def execute(self, context):
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)

keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
if keys:
s3_hook.delete_objects(bucket=self.bucket, keys=keys)
30 changes: 30 additions & 0 deletions tests/providers/amazon/aws/operators/test_s3_delete_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,33 @@ def test_s3_delete_multiple_objects(self):
# There should be no object found in the bucket created earlier
self.assertFalse('Contents' in conn.list_objects(Bucket=bucket,
Prefix=key_pattern))

@mock_s3
def test_s3_delete_prefix(self):
bucket = "testbucket"
key_pattern = "path/data"
n_keys = 3
keys = [key_pattern + str(i) for i in range(n_keys)]

conn = boto3.client('s3')
conn.create_bucket(Bucket=bucket)
for k in keys:
conn.upload_fileobj(Bucket=bucket,
Key=k,
Fileobj=io.BytesIO(b"input"))

# The objects should be detected before the DELETE action is taken
objects_in_dest_bucket = conn.list_objects(Bucket=bucket,
Prefix=key_pattern)
self.assertEqual(len(objects_in_dest_bucket['Contents']), n_keys)
self.assertEqual(sorted([x['Key'] for x in objects_in_dest_bucket['Contents']]),
sorted(keys))

op = S3DeleteObjectsOperator(task_id="test_task_s3_delete_prefix",
bucket=bucket,
prefix=key_pattern)
op.execute(None)

# There should be no object found in the bucket created earlier
self.assertFalse('Contents' in conn.list_objects(Bucket=bucket,
Prefix=key_pattern))

0 comments on commit 12be5b2

Please sign in to comment.