Skip to content

Commit 38fd65d

Browse files
authored
Rename DataSync Hook and Operator (#20328)
1 parent af0fb4f commit 38fd65d

File tree

7 files changed

+100
-67
lines changed

7 files changed

+100
-67
lines changed

airflow/providers/amazon/aws/example_dags/example_datasync_1.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from os import getenv
3131

3232
from airflow import models
33-
from airflow.providers.amazon.aws.operators.datasync import AWSDataSyncOperator
33+
from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
3434

3535
# [START howto_operator_datasync_1_args_1]
3636
TASK_ARN = getenv("TASK_ARN", "my_aws_datasync_task_arn")
@@ -52,7 +52,7 @@
5252
) as dag:
5353

5454
# [START howto_operator_datasync_1_1]
55-
datasync_task_1 = AWSDataSyncOperator(task_id="datasync_task_1", task_arn=TASK_ARN)
55+
datasync_task_1 = DataSyncOperator(task_id="datasync_task_1", task_arn=TASK_ARN)
5656
# [END howto_operator_datasync_1_1]
5757

5858
with models.DAG(
@@ -61,7 +61,7 @@
6161
schedule_interval=None, # Override to match your needs
6262
) as dag:
6363
# [START howto_operator_datasync_1_2]
64-
datasync_task_2 = AWSDataSyncOperator(
64+
datasync_task_2 = DataSyncOperator(
6565
task_id="datasync_task_2",
6666
source_location_uri=SOURCE_LOCATION_URI,
6767
destination_location_uri=DESTINATION_LOCATION_URI,

airflow/providers/amazon/aws/example_dags/example_datasync_2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from os import getenv
4040

4141
from airflow import models
42-
from airflow.providers.amazon.aws.operators.datasync import AWSDataSyncOperator
42+
from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
4343

4444
# [START howto_operator_datasync_2_args]
4545
SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")
@@ -78,7 +78,7 @@
7878
) as dag:
7979

8080
# [START howto_operator_datasync_2]
81-
datasync_task = AWSDataSyncOperator(
81+
datasync_task = DataSyncOperator(
8282
task_id="datasync_task",
8383
source_location_uri=SOURCE_LOCATION_URI,
8484
destination_location_uri=DESTINATION_LOCATION_URI,

airflow/providers/amazon/aws/hooks/datasync.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
"""Interact with AWS DataSync, using the AWS ``boto3`` library."""
1919

2020
import time
21+
import warnings
2122
from typing import List, Optional
2223

2324
from airflow.exceptions import AirflowBadRequest, AirflowException, AirflowTaskTimeout
2425
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
2526

2627

27-
class AWSDataSyncHook(AwsBaseHook):
28+
class DataSyncHook(AwsBaseHook):
2829
"""
2930
Interact with AWS DataSync.
3031
@@ -33,7 +34,7 @@ class AWSDataSyncHook(AwsBaseHook):
3334
3435
.. seealso::
3536
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
36-
:class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator`
37+
:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
3738
3839
:param wait_interval_seconds: Time to wait between two
3940
consecutive calls to check TaskExecution status. Defaults to 30 seconds.
@@ -316,3 +317,18 @@ def wait_for_task_execution(self, task_execution_arn: str, max_iterations: int =
316317
if iterations <= 0:
317318
raise AirflowTaskTimeout("Max iterations exceeded!")
318319
raise AirflowException(f"Unknown status: {status}") # Should never happen
320+
321+
322+
class AWSDataSyncHook(DataSyncHook):
323+
"""
324+
This hook is deprecated.
325+
Please use :class:`airflow.providers.amazon.aws.hooks.datasync.DataSyncHook`.
326+
"""
327+
328+
def __init__(self, *args, **kwargs):
329+
warnings.warn(
330+
"This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.datasync.DataSyncHook`.",
331+
DeprecationWarning,
332+
stacklevel=2,
333+
)
334+
super().__init__(*args, **kwargs)

airflow/providers/amazon/aws/operators/datasync.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@
1919

2020
import logging
2121
import random
22+
import warnings
2223
from typing import List, Optional
2324

2425
from airflow.exceptions import AirflowException, AirflowTaskTimeout
2526
from airflow.models import BaseOperator
26-
from airflow.providers.amazon.aws.hooks.datasync import AWSDataSyncHook
27+
from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
2728

2829

29-
class AWSDataSyncOperator(BaseOperator):
30+
class DataSyncOperator(BaseOperator):
3031
r"""Find, Create, Update, Execute and Delete AWS DataSync Tasks.
3132
3233
If ``do_xcom_push`` is True, then the DataSync TaskArn and TaskExecutionArn
3334
which were executed will be pushed to an XCom.
3435
3536
.. seealso::
3637
For more information on how to use this operator, take a look at the guide:
37-
:ref:`howto/operator:AWSDataSyncOperator`
38+
:ref:`howto/operator:DataSyncOperator`
3839
3940
.. note:: There may be 0, 1, or many existing DataSync Tasks defined in your AWS
4041
environment. The default behavior is to create a new Task if there are 0, or
@@ -185,7 +186,7 @@ def __init__(
185186
)
186187

187188
# Others
188-
self.hook: Optional[AWSDataSyncHook] = None
189+
self.hook: Optional[DataSyncHook] = None
189190
# Candidates - these are found in AWS as possible things
190191
# for us to use
191192
self.candidate_source_location_arns: Optional[List[str]] = None
@@ -196,15 +197,15 @@ def __init__(
196197
self.destination_location_arn: Optional[str] = None
197198
self.task_execution_arn: Optional[str] = None
198199

199-
def get_hook(self) -> AWSDataSyncHook:
200-
"""Create and return AWSDataSyncHook.
200+
def get_hook(self) -> DataSyncHook:
201+
"""Create and return DataSyncHook.
201202
202-
:return AWSDataSyncHook: An AWSDataSyncHook instance.
203+
:return DataSyncHook: An DataSyncHook instance.
203204
"""
204205
if self.hook:
205206
return self.hook
206207

207-
self.hook = AWSDataSyncHook(
208+
self.hook = DataSyncHook(
208209
aws_conn_id=self.aws_conn_id,
209210
wait_interval_seconds=self.wait_interval_seconds,
210211
)
@@ -345,7 +346,7 @@ def _update_datasync_task(self) -> None:
345346
self.log.info("Updated TaskArn %s", self.task_arn)
346347

347348
def _execute_datasync_task(self) -> None:
348-
"""Create and monitor an AWSDataSync TaskExecution for a Task."""
349+
"""Create and monitor an AWS DataSync TaskExecution for a Task."""
349350
if not self.task_arn:
350351
raise AirflowException("Missing TaskArn")
351352

@@ -407,3 +408,19 @@ def _get_location_arns(self, location_uri) -> List[str]:
407408
location_arns = self.get_hook().get_location_arns(location_uri)
408409
self.log.info("Found LocationArns %s for LocationUri %s", location_arns, location_uri)
409410
return location_arns
411+
412+
413+
class AWSDataSyncOperator(DataSyncOperator):
414+
"""
415+
This operator is deprecated.
416+
Please use :class:`airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`.
417+
"""
418+
419+
def __init__(self, *args, **kwargs):
420+
warnings.warn(
421+
"This operator is deprecated. Please use "
422+
"`airflow.providers.amazon.aws.operators.datasync.DataSyncHook`.",
423+
DeprecationWarning,
424+
stacklevel=2,
425+
)
426+
super().__init__(*args, **kwargs)

docs/apache-airflow-providers-amazon/operators/datasync.rst

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
under the License.
1717
1818
19-
.. _howto/operator:AWSDataSyncOperator:
19+
.. _howto/operator:DataSyncOperator:
2020

2121
AWS DataSync Operator
2222
=====================
@@ -25,13 +25,13 @@ Overview
2525
--------
2626

2727
Two example_dags are provided which showcase the
28-
:class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator`
28+
:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
2929
in action.
3030

3131
- example_datasync_1.py
3232
- example_datasync_2.py
3333

34-
Both examples use the :class:`~airflow.providers.amazon.aws.hooks.datasync.AWSDataSyncHook`
34+
Both examples use the :class:`~airflow.providers.amazon.aws.hooks.datasync.DataSyncHook`
3535
to create a boto3 DataSync client. This hook in turn uses the :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
3636

3737
Note this guide differentiates between an *Airflow task* (identified by a task_id on Airflow),
@@ -65,7 +65,7 @@ These examples rely on the following variables, which can be passed via OS envir
6565
Get DataSync Tasks
6666
""""""""""""""""""
6767

68-
The :class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator` can execute a specific
68+
The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` can execute a specific
6969
TaskArn by specifying the ``task_arn`` parameter. This is useful when you know the TaskArn you want to execute.
7070

7171
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync_1.py
@@ -80,7 +80,7 @@ Alternatively, the operator can search in AWS DataSync for a Task based on
8080

8181
In AWS, DataSync Tasks are linked to source and destination Locations. A location has a LocationURI and
8282
is referenced by a LocationArn much like other AWS resources.
83-
The :class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator`
83+
The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
8484
can iterate all DataSync Tasks for their source and destination LocationArns. Then it checks
8585
each LocationArn to see if its the URIs match the desired source / destination URI.
8686

@@ -103,7 +103,7 @@ Purpose
103103
"""""""
104104

105105
Show how DataSync Tasks and Locations can be automatically created, deleted and updated using the
106-
:class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator`.
106+
:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`.
107107

108108
Find and update a DataSync Task, or create one if it doesn't exist. Update the Task, then execute it.
109109
Finally, delete it.
@@ -121,7 +121,7 @@ This example relies on the following variables, which can be passed via OS envir
121121
Get, Create, Update, Run and Delete DataSync Tasks
122122
""""""""""""""""""""""""""""""""""""""""""""""""""
123123

124-
The :class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator` is used
124+
The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` is used
125125
as before but with some extra arguments.
126126

127127
Most of the arguments (``CREATE_*_KWARGS``) provide a way for the operator to automatically create a Task
@@ -149,7 +149,7 @@ Operator behaviour
149149
DataSync Task execution behaviour
150150
"""""""""""""""""""""""""""""""""
151151

152-
Once the :class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator` has identified
152+
Once the :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` has identified
153153
the correct TaskArn to run (either because you specified it, or because it was found), it will then be
154154
executed. Whenever an AWS DataSync Task is executed it creates an AWS DataSync TaskExecution, identified
155155
by a TaskExecutionArn.
@@ -163,7 +163,7 @@ for inspection.
163163
Finally, both the TaskArn and the TaskExecutionArn are returned from the ``execute()`` method, and pushed to
164164
an XCom automatically if ``do_xcom_push=True``.
165165

166-
The :class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator` supports
166+
The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` supports
167167
optional passing of additional kwargs to the underlying ``boto3.start_task_execution()`` API.
168168
This is done with the ``task_execution_kwargs`` parameter.
169169
This is useful for example to limit bandwidth or filter included files - refer to the boto3 Datasync
@@ -172,12 +172,12 @@ documentation for more details.
172172
TaskArn selection behaviour
173173
"""""""""""""""""""""""""""
174174

175-
The :class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator`
175+
The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
176176
may find 0, 1, or many AWS DataSync Tasks with a matching ``source_location_uri`` and
177177
``destination_location_uri``. The operator must decide what to do in each of these scenarios.
178178

179179
To override the default behaviour, simply create an operator which inherits
180-
:class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator`
180+
:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
181181
and re-implement the ``choose_task`` and ``choose_location`` methods
182182
to suit your use case.
183183

@@ -204,7 +204,7 @@ TaskArn creation behaviour
204204
"""""""""""""""""""""""""""
205205

206206
When creating a Task, the
207-
:class:`~airflow.providers.amazon.aws.operators.datasync.AWSDataSyncOperator` will try to find
207+
:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` will try to find
208208
and use existing LocationArns rather than creating new ones. If multiple LocationArns match the
209209
specified URIs then we need to choose one to use. In this scenario, the operator behaves similarly
210210
to how it chooses a single Task from many Tasks:

tests/providers/amazon/aws/hooks/test_datasync.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,22 @@
2424
from moto import mock_datasync
2525

2626
from airflow.exceptions import AirflowTaskTimeout
27-
from airflow.providers.amazon.aws.hooks.datasync import AWSDataSyncHook
27+
from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
2828

2929

3030
@mock_datasync
31-
class TestAwsDataSyncHook(unittest.TestCase):
31+
class TestDataSyncHook(unittest.TestCase):
3232
def test_get_conn(self):
33-
hook = AWSDataSyncHook(aws_conn_id="aws_default")
33+
hook = DataSyncHook(aws_conn_id="aws_default")
3434
assert hook.get_conn() is not None
3535

3636

37-
# Explanation of: @mock.patch.object(AWSDataSyncHook, 'get_conn')
37+
# Explanation of: @mock.patch.object(DataSyncHook, 'get_conn')
3838
# base_aws.py fiddles with config files and changes the region
3939
# If you have any ~/.credentials then aws_hook uses it for the region
4040
# This region might not match us-east-1 used for the mocked self.client
4141

42-
# Once patched, the AWSDataSyncHook.get_conn method is mocked and passed to the test as
42+
# Once patched, the DataSyncHook.get_conn method is mocked and passed to the test as
4343
# mock_get_conn. We then override it to just return the locally created self.client instead of
4444
# the one created by the AWS self.hook.
4545

@@ -48,8 +48,8 @@ def test_get_conn(self):
4848

4949

5050
@mock_datasync
51-
@mock.patch.object(AWSDataSyncHook, "get_conn")
52-
class TestAWSDataSyncHookMocked(unittest.TestCase):
51+
@mock.patch.object(DataSyncHook, "get_conn")
52+
class TestDataSyncHookMocked(unittest.TestCase):
5353
def __init__(self, *args, **kwargs):
5454
super().__init__(*args, **kwargs)
5555
self.source_server_hostname = "host"
@@ -65,7 +65,7 @@ def __init__(self, *args, **kwargs):
6565

6666
def setUp(self):
6767
self.client = boto3.client("datasync", region_name="us-east-1")
68-
self.hook = AWSDataSyncHook(aws_conn_id="aws_default", wait_interval_seconds=0)
68+
self.hook = DataSyncHook(aws_conn_id="aws_default", wait_interval_seconds=0)
6969

7070
# Create default locations and tasks
7171
self.source_location_arn = self.client.create_location_smb(

0 commit comments

Comments
 (0)