-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix AWS system test example_dynamodb_to_s3 #31362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,9 +97,14 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): | |
|
|
||
| template_fields: Sequence[str] = ( | ||
| *AwsToAwsBaseOperator.template_fields, | ||
| "dynamodb_table_name", | ||
| "s3_bucket_name", | ||
| "file_size", | ||
| "dynamodb_scan_kwargs", | ||
| "s3_key_prefix", | ||
| "dynamodb_table_name", | ||
| "process_func", | ||
| "export_time", | ||
| "export_format", | ||
| ) | ||
|
|
||
| template_fields_renderers = { | ||
|
|
@@ -129,9 +134,6 @@ def __init__( | |
| self.export_time = export_time | ||
| self.export_format = export_format | ||
|
|
||
| if self.export_time and self.export_time > datetime.now(): | ||
| raise ValueError("The export_time parameter cannot be a future time.") | ||
|
|
||
| @cached_property | ||
| def hook(self): | ||
| """Create DynamoDBHook""" | ||
|
|
@@ -148,6 +150,9 @@ def _export_table_to_point_in_time(self): | |
| Export data from start of epoc till `export_time`. Table export will be a snapshot of the table's | ||
| state at this point in time. | ||
| """ | ||
| if self.export_time and self.export_time > datetime.now(self.export_time.tzinfo): | ||
| raise ValueError("The export_time parameter cannot be a future time.") | ||
|
|
||
| client = self.hook.conn.meta.client | ||
| table_description = client.describe_table(TableName=self.dynamodb_table_name) | ||
| response = client.export_table_to_point_in_time( | ||
|
|
@@ -163,7 +168,7 @@ def _export_table_to_point_in_time(self): | |
|
|
||
| def _export_entire_data(self): | ||
| """Export all data from the table.""" | ||
| table = self.hook.get_conn().Table(self.dynamodb_table_name) | ||
| table = self.hook.conn.Table(self.dynamodb_table_name) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. |
||
| scan_kwargs = copy(self.dynamodb_scan_kwargs) if self.dynamodb_scan_kwargs else {} | ||
| err = None | ||
| f: IO[Any] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,9 +56,24 @@ def set_up_table(table_name: str): | |
| boto3.client("dynamodb").get_waiter("table_exists").wait( | ||
| TableName=table_name, WaiterConfig={"Delay": 10, "MaxAttempts": 10} | ||
| ) | ||
| boto3.client("dynamodb").update_continuous_backups( | ||
| TableName=table_name, | ||
| PointInTimeRecoverySpecification={ | ||
| "PointInTimeRecoveryEnabled": True, | ||
| }, | ||
| ) | ||
| table.put_item(Item={"ID": "123", "Value": "Testing"}) | ||
|
|
||
|
|
||
| @task | ||
| def get_export_time(table_name: str): | ||
| r = boto3.client("dynamodb").describe_continuous_backups( | ||
| TableName=table_name, | ||
| ) | ||
|
|
||
| return r["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]["EarliestRestorableDateTime"] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, that's nicer than my created time idea. Nice find.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks good! thanks for taking care of this. |
||
|
|
||
|
|
||
| @task | ||
| def wait_for_bucket(s3_bucket_name): | ||
| waiter = boto3.client("s3").get_waiter("bucket_exists") | ||
|
|
@@ -128,13 +143,14 @@ def delete_dynamodb_table(table_name: str): | |
| ) | ||
| # [END howto_transfer_dynamodb_to_s3_segmented] | ||
|
|
||
| export_time = get_export_time(table_name) | ||
| # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time] | ||
| backup_db_to_point_in_time = DynamoDBToS3Operator( | ||
| task_id="backup_db_to_point_in_time", | ||
| dynamodb_table_name=table_name, | ||
| file_size=1000, | ||
| s3_bucket_name=bucket_name, | ||
| export_time=datetime.utcnow() - datetime.timedelta(days=7), | ||
| export_time=export_time, | ||
| s3_key_prefix=f"{S3_KEY_PREFIX}-3-", | ||
| ) | ||
| # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time] | ||
|
|
@@ -158,6 +174,7 @@ def delete_dynamodb_table(table_name: str): | |
| backup_db, | ||
| backup_db_segment_1, | ||
| backup_db_segment_2, | ||
| export_time, | ||
| backup_db_to_point_in_time, | ||
| # TEST TEARDOWN | ||
| delete_table, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.