Skip to content

Commit

Permalink
[AIRFLOW-6118] [AIP-21] Rename Pubsub operators and hook (#7046)
Browse files Browse the repository at this point in the history
PR contains changes regarding AIP-21 (renaming GCP operators and hooks):

* renamed GCP modules
* adde deprecation warnings to the contrib modules
* fixed tests
* updated UPDATING.md
  • Loading branch information
michalslowikowski00 authored and potiuk committed Jan 8, 2020
1 parent 78d8fe6 commit 4f8592a
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 62 deletions.
10 changes: 5 additions & 5 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,11 +605,11 @@ The following table shows changes in import paths.
|airflow.contrib.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator |airflow.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator |
|airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator |airflow.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator |
|airflow.contrib.operators.postgres_to_gcs_operator.PostgresToGoogleCloudStorageOperator |airflow.operators.postgres_to_gcs.PostgresToGoogleCloudStorageOperator |
|airflow.contrib.operators.pubsub_operator.PubSubPublishOperator |airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubPublishOperator |airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator |
|airflow.contrib.operators.sql_to_gcs.BaseSQLToGoogleCloudStorageOperator |airflow.operators.sql_to_gcs.BaseSQLToGoogleCloudStorageOperator |
|airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor |airflow.gcp.sensors.bigquery.BigQueryTableExistenceSensor |
|airflow.contrib.sensors.gcp_transfer_sensor.GCPTransferServiceWaitForJobStatusSensor |airflow.gcp.sensors.cloud_storage_transfer_service.DataTransferServiceJobStatusSensor |
Expand Down
96 changes: 89 additions & 7 deletions airflow/contrib/operators/pubsub_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,99 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.providers.google.cloud.operators.pubsub`."""
"""This module is deprecated.
Please use `airflow.providers.google.cloud.operators.pubsub`."""

import warnings

# pylint: disable=unused-import
from airflow.providers.google.cloud.operators.pubsub import ( # noqa
PubSubPublishOperator, PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
from airflow.providers.google.cloud.operators.pubsub import (
PubSubCreateSubscriptionOperator, PubSubCreateTopicOperator, PubSubDeleteSubscriptionOperator,
PubSubDeleteTopicOperator, PubSubPublishMessageOperator,
)

warnings.warn(
"This module is deprecated. Please use `airflow.providers.google.cloud.operators.pubsub`.",
DeprecationWarning, stacklevel=2
"""This module is deprecated.
"Please use `airflow.providers.google.cloud.operators.pubsub`.""",
DeprecationWarning,
stacklevel=2,
)


class PubSubPublishOperator(PubSubPublishMessageOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubPublishMessageOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubPublishMessageOperator`.""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)


class PubSubSubscriptionCreateOperator(PubSubCreateSubscriptionOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubCreateSubscriptionOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubCreateSubscriptionOperator`.""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)


class PubSubSubscriptionDeleteOperator(PubSubDeleteSubscriptionOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubDeleteSubscriptionOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubDeleteSubscriptionOperator`.""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)


class PubSubTopicCreateOperator(PubSubCreateTopicOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubCreateTopicOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubCreateTopicOperator`.""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)


class PubSubTopicDeleteOperator(PubSubDeleteTopicOperator):
"""
This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubDeleteTopicOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use `airflow.gcp.operators.pubsub.PubSubDeleteTopicOperator`.""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
14 changes: 7 additions & 7 deletions airflow/providers/google/cloud/example_dags/example_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.operators.pubsub import (
PubSubPublishOperator, PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
PubSubCreateSubscriptionOperator, PubSubCreateTopicOperator, PubSubDeleteSubscriptionOperator,
PubSubDeleteTopicOperator, PubSubPublishMessageOperator,
)
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
from airflow.utils.dates import days_ago
Expand All @@ -51,13 +51,13 @@
schedule_interval=None, # Override to match your needs
) as example_dag:
# [START howto_operator_gcp_pubsub_create_topic]
create_topic = PubSubTopicCreateOperator(
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_create_topic]

# [START howto_operator_gcp_pubsub_create_subscription]
subscribe_task = PubSubSubscriptionCreateOperator(
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC
)
# [END howto_operator_gcp_pubsub_create_subscription]
Expand All @@ -80,7 +80,7 @@
# [END howto_operator_gcp_pubsub_pull_messages_result]

# [START howto_operator_gcp_pubsub_publish]
publish_task = PubSubPublishOperator(
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=GCP_PROJECT_ID,
topic=TOPIC,
Expand All @@ -89,15 +89,15 @@
# [END howto_operator_gcp_pubsub_publish]

# [START howto_operator_gcp_pubsub_unsubscribe]
unsubscribe_task = PubSubSubscriptionDeleteOperator(
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=GCP_PROJECT_ID,
subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
)
# [END howto_operator_gcp_pubsub_unsubscribe]

# [START howto_operator_gcp_pubsub_delete_topic]
delete_topic = PubSubTopicDeleteOperator(
delete_topic = PubSubDeleteTopicOperator(
task_id="delete_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)
# [END howto_operator_gcp_pubsub_delete_topic]
Expand Down
20 changes: 10 additions & 10 deletions airflow/providers/google/cloud/operators/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
from airflow.utils.decorators import apply_defaults


class PubSubTopicCreateOperator(BaseOperator):
class PubSubCreateTopicOperator(BaseOperator):
"""Create a PubSub topic.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubTopicCreateOperator`
:ref:`howto/operator:PubSubCreateTopicOperator`
By default, if the topic already exists, this operator will
not cause the DAG to fail. ::
Expand Down Expand Up @@ -167,12 +167,12 @@ def execute(self, context):
self.log.info("Created topic %s", self.topic)


class PubSubSubscriptionCreateOperator(BaseOperator):
class PubSubCreateSubscriptionOperator(BaseOperator):
"""Create a PubSub subscription.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubSubscriptionCreateOperator`
:ref:`howto/operator:PubSubCreateSubscriptionOperator`
By default, the subscription will be created in ``topic_project``. If
``subscription_project`` is specified and the GCP credentials allow, the
Expand Down Expand Up @@ -359,12 +359,12 @@ def execute(self, context):
return result


class PubSubTopicDeleteOperator(BaseOperator):
class PubSubDeleteTopicOperator(BaseOperator):
"""Delete a PubSub topic.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubTopicDeleteOperator`
:ref:`howto/operator:PubSubDeleteTopicOperator`
By default, if the topic does not exist, this operator will
not cause the DAG to fail. ::
Expand Down Expand Up @@ -471,12 +471,12 @@ def execute(self, context):
self.log.info("Deleted topic %s", self.topic)


class PubSubSubscriptionDeleteOperator(BaseOperator):
class PubSubDeleteSubscriptionOperator(BaseOperator):
"""Delete a PubSub subscription.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubSubscriptionDeleteOperator`
:ref:`howto/operator:PubSubDeleteSubscriptionOperator`
By default, if the subscription does not exist, this operator will
not cause the DAG to fail. ::
Expand Down Expand Up @@ -585,12 +585,12 @@ def execute(self, context):
self.log.info("Deleted subscription %s", self.subscription)


class PubSubPublishOperator(BaseOperator):
class PubSubPublishMessageOperator(BaseOperator):
"""Publish messages to a PubSub topic.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubPublishOperator`
:ref:`howto/operator:PubSubPublishMessageOperator`
Each Task publishes all provided messages to the same topic
in a single GCP project. If the topic does not exist, this
Expand Down
20 changes: 10 additions & 10 deletions docs/howto/operator/gcp/pubsub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,42 @@ Prerequisite Tasks

.. include:: _partials/prerequisite_tasks.rst

.. _howto/operator:PubSubTopicCreateOperator:
.. _howto/operator:PubSubCreateTopicOperator:

Creating a PubSub topic
^^^^^^^^^^^^^^^^^^^^^^^

The PubSub topic is a named resource to which messages are sent by publishers.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator` operator creates a topic.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator` operator creates a topic.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_create_topic]
:end-before: [END howto_operator_gcp_pubsub_create_topic]


.. _howto/operator:PubSubSubscriptionCreateOperator:
.. _howto/operator:PubSubCreateSubscriptionOperator:

Creating a PubSub subscription
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

A ``Subscription`` is a named resource representing the stream of messages from a single, specific topic,
to be delivered to the subscribing application.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator` operator creates the subscription.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator` operator creates the subscription.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_create_subscription]
:end-before: [END howto_operator_gcp_pubsub_create_subscription]


.. _howto/operator:PubSubPublishOperator:
.. _howto/operator:PubSubPublishMessageOperator:

Publishing PubSub messages
^^^^^^^^^^^^^^^^^^^^^^^^^^

A ``Message`` is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator` operator would publish messages.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator` operator would publish messages.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
:language: python
Expand Down Expand Up @@ -105,25 +105,25 @@ To pull messages from XCom use the :class:`~airflow.operators.bash_operator.Bash
:end-before: [END howto_operator_gcp_pubsub_pull_messages_result]


.. _howto/operator:PubSubSubscriptionDeleteOperator:
.. _howto/operator:PubSubDeleteSubscriptionOperator:

Deleting a PubSub subscription
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator` operator deletes the subscription.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator` operator deletes the subscription.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_unsubscribe]
:end-before: [END howto_operator_gcp_pubsub_unsubscribe]


.. _howto/operator:PubSubTopicDeleteOperator:
.. _howto/operator:PubSubDeleteTopicOperator:

Deleting a PubSub topic
^^^^^^^^^^^^^^^^^^^^^^^

The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator` operator deletes topic.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator` operator deletes topic.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
:language: python
Expand Down
11 changes: 6 additions & 5 deletions tests/providers/google/cloud/hooks/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ def test_delete_subscription_api_call_error(self, mock_service):

@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
@mock.patch(PUBSUB_STRING.format('uuid4'), new_callable=mock.Mock(return_value=lambda: TEST_UUID))
def test_create_subscription_without_subscription_name(self, mock_uuid, mock_service): # noqa # pylint: disable=unused-argument,line-too-long
def test_create_subscription_without_subscription_name(self, mock_uuid,
mock_service): # noqa # pylint: disable=unused-argument,line-too-long
create_method = mock_service.create_subscription
expected_name = EXPANDED_SUBSCRIPTION.replace(TEST_SUBSCRIPTION, 'sub-%s' % TEST_UUID)

Expand Down Expand Up @@ -367,7 +368,7 @@ def test_pull_no_messages(self, mock_service):
self.assertListEqual([], response)

@parameterized.expand([
(exception, ) for exception in [
(exception,) for exception in [
HttpError(resp={'status': '404'}, content=EMPTY_CONTENT),
GoogleAPICallError("API Call Error")
]
Expand Down Expand Up @@ -406,7 +407,7 @@ def test_acknowledge(self, mock_service):
)

@parameterized.expand([
(exception, ) for exception in [
(exception,) for exception in [
HttpError(resp={'status': '404'}, content=EMPTY_CONTENT),
GoogleAPICallError("API Call Error")
]
Expand All @@ -431,7 +432,7 @@ def test_acknowledge_fails_on_exception(self, exception, mock_service):
)

@parameterized.expand([
(messages, ) for messages in [
(messages,) for messages in [
[{"data": b'test'}],
[{"data": b''}],
[{"data": b'test', "attributes": {"weight": "100kg"}}],
Expand All @@ -443,7 +444,7 @@ def test_messages_validation_positive(self, messages):
PubSubHook._validate_messages(messages)

@parameterized.expand([
([("wrong type", )], "Wrong message type. Must be a dictionary."),
([("wrong type",)], "Wrong message type. Must be a dictionary."),
([{"wrong_key": b'test'}], "Wrong message. Dictionary must contain 'data' or 'attributes'."),
([{"data": 'wrong string'}], "Wrong message. 'data' must be send as a bytestring"),
([{"data": None}], "Wrong message. 'data' must be send as a bytestring"),
Expand Down
Loading

0 comments on commit 4f8592a

Please sign in to comment.