-
Notifications
You must be signed in to change notification settings - Fork 14.7k
/
Copy pathgcp_pubsub_hook.py
284 lines (255 loc) · 12.1 KB
/
gcp_pubsub_hook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from uuid import uuid4
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
def _format_subscription(project, subscription):
return 'projects/{}/subscriptions/{}'.format(project, subscription)
def _format_topic(project, topic):
return 'projects/{}/topics/{}'.format(project, topic)
class PubSubException(Exception):
pass
class PubSubHook(GoogleCloudBaseHook):
"""Hook for accessing Google Pub/Sub.
The GCP project against which actions are applied is determined by
the project embedded in the Connection referenced by gcp_conn_id.
"""
def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
super(PubSubHook, self).__init__(gcp_conn_id, delegate_to=delegate_to)
def get_conn(self):
"""Returns a Pub/Sub service object.
:rtype: googleapiclient.discovery.Resource
"""
http_authorized = self._authorize()
return build(
'pubsub', 'v1', http=http_authorized, cache_discovery=False)
def publish(self, project, topic, messages):
"""Publishes messages to a Pub/Sub topic.
:param project: the GCP project ID in which to publish
:type project: str
:param topic: the Pub/Sub topic to which to publish; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
:param messages: messages to publish; if the data field in a
message is set, it should already be base64 encoded.
:type messages: list of PubSub messages; see
http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
"""
body = {'messages': messages}
full_topic = _format_topic(project, topic)
request = self.get_conn().projects().topics().publish(
topic=full_topic, body=body)
try:
request.execute(num_retries=self.num_retries)
except HttpError as e:
raise PubSubException(
'Error publishing to topic {}'.format(full_topic), e)
def create_topic(self, project, topic, fail_if_exists=False):
"""Creates a Pub/Sub topic, if it does not already exist.
:param project: the GCP project ID in which to create
the topic
:type project: str
:param topic: the Pub/Sub topic name to create; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
:param fail_if_exists: if set, raise an exception if the topic
already exists
:type fail_if_exists: bool
"""
service = self.get_conn()
full_topic = _format_topic(project, topic)
try:
service.projects().topics().create(
name=full_topic, body={}).execute(num_retries=self.num_retries)
except HttpError as e:
# Status code 409 indicates that the topic already exists.
if str(e.resp['status']) == '409':
message = 'Topic already exists: {}'.format(full_topic)
self.log.warning(message)
if fail_if_exists:
raise PubSubException(message)
else:
raise PubSubException(
'Error creating topic {}'.format(full_topic), e)
def delete_topic(self, project, topic, fail_if_not_exists=False):
"""Deletes a Pub/Sub topic if it exists.
:param project: the GCP project ID in which to delete the topic
:type project: str
:param topic: the Pub/Sub topic name to delete; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
:param fail_if_not_exists: if set, raise an exception if the topic
does not exist
:type fail_if_not_exists: bool
"""
service = self.get_conn()
full_topic = _format_topic(project, topic)
try:
service.projects().topics().delete(topic=full_topic).execute(num_retries=self.num_retries)
except HttpError as e:
# Status code 409 indicates that the topic was not found
if str(e.resp['status']) == '404':
message = 'Topic does not exist: {}'.format(full_topic)
self.log.warning(message)
if fail_if_not_exists:
raise PubSubException(message)
else:
raise PubSubException(
'Error deleting topic {}'.format(full_topic), e)
def create_subscription(self, topic_project, topic, subscription=None,
subscription_project=None, ack_deadline_secs=10,
fail_if_exists=False):
"""Creates a Pub/Sub subscription, if it does not already exist.
:param topic_project: the GCP project ID of the topic that the
subscription will be bound to.
:type topic_project: str
:param topic: the Pub/Sub topic name that the subscription will be bound
to create; do not include the ``projects/{project}/subscriptions/``
prefix.
:type topic: str
:param subscription: the Pub/Sub subscription name. If empty, a random
name will be generated using the uuid module
:type subscription: str
:param subscription_project: the GCP project ID where the subscription
will be created. If unspecified, ``topic_project`` will be used.
:type subscription_project: str
:param ack_deadline_secs: Number of seconds that a subscriber has to
acknowledge each message pulled from the subscription
:type ack_deadline_secs: int
:param fail_if_exists: if set, raise an exception if the topic
already exists
:type fail_if_exists: bool
:return: subscription name which will be the system-generated value if
the ``subscription`` parameter is not supplied
:rtype: str
"""
service = self.get_conn()
full_topic = _format_topic(topic_project, topic)
if not subscription:
subscription = 'sub-{}'.format(uuid4())
if not subscription_project:
subscription_project = topic_project
full_subscription = _format_subscription(subscription_project,
subscription)
body = {
'topic': full_topic,
'ackDeadlineSeconds': ack_deadline_secs
}
try:
service.projects().subscriptions().create(
name=full_subscription, body=body).execute(num_retries=self.num_retries)
except HttpError as e:
# Status code 409 indicates that the subscription already exists.
if str(e.resp['status']) == '409':
message = 'Subscription already exists: {}'.format(
full_subscription)
self.log.warning(message)
if fail_if_exists:
raise PubSubException(message)
else:
raise PubSubException(
'Error creating subscription {}'.format(full_subscription),
e)
return subscription
def delete_subscription(self, project, subscription,
fail_if_not_exists=False):
"""Deletes a Pub/Sub subscription, if it exists.
:param project: the GCP project ID where the subscription exists
:type project: str
:param subscription: the Pub/Sub subscription name to delete; do not
include the ``projects/{project}/subscriptions/`` prefix.
:type subscription: str
:param fail_if_not_exists: if set, raise an exception if the topic
does not exist
:type fail_if_not_exists: bool
"""
service = self.get_conn()
full_subscription = _format_subscription(project, subscription)
try:
service.projects().subscriptions().delete(
subscription=full_subscription).execute(num_retries=self.num_retries)
except HttpError as e:
# Status code 404 indicates that the subscription was not found
if str(e.resp['status']) == '404':
message = 'Subscription does not exist: {}'.format(
full_subscription)
self.log.warning(message)
if fail_if_not_exists:
raise PubSubException(message)
else:
raise PubSubException(
'Error deleting subscription {}'.format(full_subscription),
e)
def pull(self, project, subscription, max_messages,
return_immediately=False):
"""Pulls up to ``max_messages`` messages from Pub/Sub subscription.
:param project: the GCP project ID where the subscription exists
:type project: str
:param subscription: the Pub/Sub subscription name to pull from; do not
include the 'projects/{project}/topics/' prefix.
:type subscription: str
:param max_messages: The maximum number of messages to return from
the Pub/Sub API.
:type max_messages: int
:param return_immediately: If set, the Pub/Sub API will immediately
return if no messages are available. Otherwise, the request will
block for an undisclosed, but bounded period of time
:type return_immediately: bool
:return: A list of Pub/Sub ReceivedMessage objects each containing
an ``ackId`` property and a ``message`` property, which includes
the base64-encoded message content. See
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#ReceivedMessage
"""
service = self.get_conn()
full_subscription = _format_subscription(project, subscription)
body = {
'maxMessages': max_messages,
'returnImmediately': return_immediately
}
try:
response = service.projects().subscriptions().pull(
subscription=full_subscription, body=body).execute(num_retries=self.num_retries)
return response.get('receivedMessages', [])
except HttpError as e:
raise PubSubException(
'Error pulling messages from subscription {}'.format(
full_subscription), e)
def acknowledge(self, project, subscription, ack_ids):
"""Pulls up to ``max_messages`` messages from Pub/Sub subscription.
:param project: the GCP project name or ID in which to create
the topic
:type project: str
:param subscription: the Pub/Sub subscription name to delete; do not
include the 'projects/{project}/topics/' prefix.
:type subscription: str
:param ack_ids: List of ReceivedMessage ackIds from a previous pull
response
:type ack_ids: list
"""
service = self.get_conn()
full_subscription = _format_subscription(project, subscription)
try:
service.projects().subscriptions().acknowledge(
subscription=full_subscription, body={'ackIds': ack_ids}
).execute(num_retries=self.num_retries)
except HttpError as e:
raise PubSubException(
'Error acknowledging {} messages pulled from subscription {}'
.format(len(ack_ids), full_subscription), e)