Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):

template_fields: Sequence[str] = (
*AwsToAwsBaseOperator.template_fields,
"dynamodb_table_name",
"s3_bucket_name",
"file_size",
"dynamodb_scan_kwargs",
"s3_key_prefix",
"dynamodb_table_name",
"process_func",
"export_time",
"export_format",
Comment on lines +100 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

)

template_fields_renderers = {
Expand Down Expand Up @@ -129,9 +134,6 @@ def __init__(
self.export_time = export_time
self.export_format = export_format

if self.export_time and self.export_time > datetime.now():
raise ValueError("The export_time parameter cannot be a future time.")

@cached_property
def hook(self):
"""Create DynamoDBHook"""
Expand All @@ -148,6 +150,9 @@ def _export_table_to_point_in_time(self):
Export data from start of epoc till `export_time`. Table export will be a snapshot of the table's
state at this point in time.
"""
if self.export_time and self.export_time > datetime.now(self.export_time.tzinfo):
raise ValueError("The export_time parameter cannot be a future time.")

client = self.hook.conn.meta.client
table_description = client.describe_table(TableName=self.dynamodb_table_name)
response = client.export_table_to_point_in_time(
Expand All @@ -163,7 +168,7 @@ def _export_table_to_point_in_time(self):

def _export_entire_data(self):
"""Export all data from the table."""
table = self.hook.get_conn().Table(self.dynamodb_table_name)
table = self.hook.conn.Table(self.dynamodb_table_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

scan_kwargs = copy(self.dynamodb_scan_kwargs) if self.dynamodb_scan_kwargs else {}
err = None
f: IO[Any]
Expand Down
12 changes: 6 additions & 6 deletions tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook):
]
table = MagicMock()
table.return_value.scan.side_effect = responses
mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
mock_aws_dynamodb_hook.return_value.conn.Table = table

s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_dynamodb_to_s3_success_with_decimal(self, mock_aws_dynamodb_hook, mock_
]
table = MagicMock()
table.return_value.scan.side_effect = responses
mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
mock_aws_dynamodb_hook.return_value.conn.Table = table

s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
Expand Down Expand Up @@ -198,7 +198,7 @@ def test_dynamodb_to_s3_with_different_aws_conn_id(self, mock_aws_dynamodb_hook,
]
table = MagicMock()
table.return_value.scan.side_effect = responses
mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
mock_aws_dynamodb_hook.return_value.conn.Table = table

s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
Expand Down Expand Up @@ -234,7 +234,7 @@ def test_dynamodb_to_s3_with_two_different_connections(self, mock_aws_dynamodb_h
]
table = MagicMock()
table.return_value.scan.side_effect = responses
mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
mock_aws_dynamodb_hook.return_value.conn.Table = table

s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
Expand Down Expand Up @@ -272,7 +272,7 @@ def test_dynamodb_to_s3_with_just_dest_aws_conn_id(self, mock_aws_dynamodb_hook,
]
table = MagicMock()
table.return_value.scan.side_effect = responses
mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
mock_aws_dynamodb_hook.return_value.conn.Table = table

s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
Expand Down Expand Up @@ -356,4 +356,4 @@ def test_dynamodb_with_future_date(self):
s3_bucket_name="airflow-bucket",
file_size=4000,
export_time=datetime(year=3000, month=1, day=1),
)
).execute(context={})
19 changes: 18 additions & 1 deletion tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,24 @@ def set_up_table(table_name: str):
boto3.client("dynamodb").get_waiter("table_exists").wait(
TableName=table_name, WaiterConfig={"Delay": 10, "MaxAttempts": 10}
)
boto3.client("dynamodb").update_continuous_backups(
TableName=table_name,
PointInTimeRecoverySpecification={
"PointInTimeRecoveryEnabled": True,
},
)
table.put_item(Item={"ID": "123", "Value": "Testing"})


@task
def get_export_time(table_name: str):
r = boto3.client("dynamodb").describe_continuous_backups(
TableName=table_name,
)

return r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's nicer than my created time idea. Nice find.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good! thanks for taking care of this.



@task
def wait_for_bucket(s3_bucket_name):
waiter = boto3.client("s3").get_waiter("bucket_exists")
Expand Down Expand Up @@ -128,13 +143,14 @@ def delete_dynamodb_table(table_name: str):
)
# [END howto_transfer_dynamodb_to_s3_segmented]

export_time = get_export_time(table_name)
# [START howto_transfer_dynamodb_to_s3_in_some_point_in_time]
backup_db_to_point_in_time = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time",
dynamodb_table_name=table_name,
file_size=1000,
s3_bucket_name=bucket_name,
export_time=datetime.utcnow() - datetime.timedelta(days=7),
export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
# [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
Expand All @@ -158,6 +174,7 @@ def delete_dynamodb_table(table_name: str):
backup_db,
backup_db_segment_1,
backup_db_segment_2,
export_time,
backup_db_to_point_in_time,
# TEST TEARDOWN
delete_table,
Expand Down