Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion airflow/providers/amazon/aws/operators/rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ class RdsStartExportTaskOperator(RdsBaseOperator):
:param s3_prefix: The Amazon S3 bucket prefix to use as the file name and path of the exported snapshot.
:param export_only: The data to be exported from the snapshot.
:param wait_for_completion: If True, waits for the DB snapshot export to complete. (default: True)
:param waiter_interval: The number of seconds to wait before checking the export status. (default: 30)
:param waiter_max_attempts: The number of attempts to make before failing. (default: 40)
"""

template_fields = (
Expand All @@ -351,6 +353,8 @@ def __init__(
s3_prefix: str = "",
export_only: list[str] | None = None,
wait_for_completion: bool = True,
waiter_interval: int = 30,
waiter_max_attempts: int = 40,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -363,6 +367,8 @@ def __init__(
self.s3_prefix = s3_prefix
self.export_only = export_only or []
self.wait_for_completion = wait_for_completion
self.waiter_interval = waiter_interval
self.waiter_max_attempts = waiter_max_attempts

def execute(self, context: Context) -> str:
self.log.info("Starting export task %s for snapshot %s", self.export_task_identifier, self.source_arn)
Expand All @@ -378,7 +384,12 @@ def execute(self, context: Context) -> str:
)

if self.wait_for_completion:
self.hook.wait_for_export_task_state(self.export_task_identifier, target_state="complete")
self.hook.wait_for_export_task_state(
export_task_id=self.export_task_identifier,
target_state="complete",
check_interval=self.waiter_interval,
max_attempts=self.waiter_max_attempts,
)
return json.dumps(start_export, default=str)


Expand Down