@@ -13,20 +13,24 @@ import (
1313 "sync"
1414 "time"
1515
16- "cloud.google.com/go/pubsub"
16+ "cloud.google.com/go/pubsub/v2"
17+ "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
1718 pscontext "github.com/cloudevents/sdk-go/protocol/pubsub/v2/context"
1819 "github.com/cloudevents/sdk-go/v2/binding"
20+ "google.golang.org/grpc/codes"
21+ "google.golang.org/grpc/status"
22+ "google.golang.org/protobuf/types/known/durationpb"
1923)
2024
2125type topicInfo struct {
22- topic * pubsub.Topic
26+ topic * pubsub.Publisher
2327 wasCreated bool
2428 once sync.Once
2529 err error
2630}
2731
2832type subInfo struct {
29- sub * pubsub.Subscription
33+ sub * pubsub.Subscriber
3034 wasCreated bool
3135 once sync.Once
3236 err error
@@ -89,8 +93,35 @@ const (
8993var DefaultReceiveSettings = pubsub.ReceiveSettings {
9094 // Pubsub default receive settings will fill in other values.
9195 // https://godoc.org/cloud.google.com/go/pubsub#Client.Subscription
96+ }
9297
93- Synchronous : false ,
98+ // pubsub/v2 dropped Exists methods and suggests optimistically using GetTopic instead
99+ func topicExists (ctx context.Context , client * pubsub.Client , topicID string ) (bool , error ) {
100+ // Check if the topic exists.
101+ _ , err := client .TopicAdminClient .GetTopic (ctx , & pubsubpb.GetTopicRequest {
102+ Topic : fmt .Sprintf ("projects/%s/topics/%s" , client .Project (), topicID ),
103+ })
104+ if err != nil {
105+ if st , ok := status .FromError (err ); ok && st .Code () == codes .NotFound {
106+ return false , nil
107+ }
108+ return false , fmt .Errorf ("unable to get topic %q, %v" , topicID , err )
109+ }
110+ return true , nil
111+ }
112+
113+ func subscriptionExists (ctx context.Context , client * pubsub.Client , subID string ) (bool , error ) {
114+ // Check if the subscription exists.
115+ _ , err := client .SubscriptionAdminClient .GetSubscription (ctx , & pubsubpb.GetSubscriptionRequest {
116+ Subscription : fmt .Sprintf ("projects/%s/subscriptions/%s" , client .Project (), subID ),
117+ })
118+ if err != nil {
119+ if st , ok := status .FromError (err ); ok && st .Code () == codes .NotFound {
120+ return false , nil
121+ }
122+ return false , fmt .Errorf ("unable to get subscription %q, %v" , subID , err )
123+ }
124+ return true , nil
94125}
95126
96127func (c * Connection ) getOrCreateTopicInfo (ctx context.Context , getAlreadyOpenOnly bool ) (* topicInfo , error ) {
@@ -110,9 +141,7 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
110141 // Make sure the topic structure is initialized at most once.
111142 ti .once .Do (func () {
112143 var ok bool
113- // Load the topic.
114- topic := c .Client .Topic (c .TopicID )
115- ok , ti .err = topic .Exists (ctx )
144+ ok , ti .err = topicExists (ctx , c .Client , c .TopicID )
116145 if ti .err != nil {
117146 return
118147 }
@@ -122,14 +151,17 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
122151 ti .err = fmt .Errorf ("protocol not allowed to create topic %q" , c .TopicID )
123152 return
124153 }
125- topic , ti .err = c .Client .CreateTopic (ctx , c .TopicID )
154+ _ , ti .err = c .Client .TopicAdminClient .CreateTopic (ctx , & pubsubpb.Topic {
155+ Name : fmt .Sprintf ("projects/%s/topics/%s" , c .Client .Project (), c .TopicID ),
156+ })
126157 if ti .err != nil {
127158 return
128159 }
129160 ti .wasCreated = true
130161 }
131- // Success.
132- ti .topic = topic
162+
163+ // If the topic exists, we can use it
164+ ti .topic = c .Client .Publisher (c .TopicID )
133165
134166 // EnableMessageOrdering is a runtime parameter only and not part of the topic
135167 // Pub/Sub configuration. The Pub/Sub SDK requires this to be set to accept Pub/Sub
@@ -151,7 +183,7 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
151183 return ti , nil
152184}
153185
154- func (c * Connection ) getOrCreateTopic (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Topic , error ) {
186+ func (c * Connection ) getOrCreateTopic (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Publisher , error ) {
155187 ti , err := c .getOrCreateTopicInfo (ctx , getAlreadyOpenOnly )
156188 if ti != nil {
157189 return ti .topic , nil
@@ -170,12 +202,17 @@ func (c *Connection) DeleteTopic(ctx context.Context) error {
170202 if ! ti .wasCreated {
171203 return errors .New ("topic was not created by pubsub protocol" )
172204 }
173- if err := ti .topic .Delete (ctx ); err != nil {
174- return err
175- }
176205
206+ // Stop the publisher and send all messages before deleting the topic
177207 ti .topic .Stop ()
178208
209+ err = c .Client .TopicAdminClient .DeleteTopic (ctx , & pubsubpb.DeleteTopicRequest {
210+ Topic : fmt .Sprintf ("projects/%s/topics/%s" , c .Client .Project (), ti .topic .ID ()),
211+ })
212+ if err != nil {
213+ return err
214+ }
215+
179216 c .initLock .Lock ()
180217 if ti == c .topicInfo {
181218 c .topicInfo = nil
@@ -211,10 +248,8 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
211248
212249 // Make sure the subscription structure is initialized at most once.
213250 si .once .Do (func () {
214- // Load the subscription.
215251 var ok bool
216- sub := c .Client .Subscription (c .SubscriptionID )
217- ok , si .err = sub .Exists (ctx )
252+ ok , si .err = subscriptionExists (ctx , c .Client , c .SubscriptionID )
218253 if si .err != nil {
219254 return
220255 }
@@ -226,7 +261,7 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
226261 }
227262
228263 // Load the topic.
229- var topic * pubsub.Topic
264+ var topic * pubsub.Publisher
230265 topic , si .err = c .getOrCreateTopic (ctx , false )
231266 if si .err != nil {
232267 return
@@ -235,26 +270,29 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
235270 // Create a new subscription to the previously created topic
236271 // with the given name.
237272 // TODO: allow to use push config + allow setting the SubscriptionConfig.
238- sub , si .err = c .Client .CreateSubscription (ctx , c .SubscriptionID , pubsub.SubscriptionConfig {
239- Topic : topic ,
240- AckDeadline : * c .AckDeadline ,
241- RetentionDuration : * c .RetentionDuration ,
242- EnableMessageOrdering : c .MessageOrdering ,
243- Filter : c .Filter ,
273+ _ , si .err = c .Client .SubscriptionAdminClient .CreateSubscription (ctx , & pubsubpb.Subscription {
274+ Name : fmt .Sprintf ("projects/%s/subscriptions/%s" , c .Client .Project (), c .SubscriptionID ),
275+ Topic : fmt .Sprintf ("projects/%s/topics/%s" , c .Client .Project (), topic .ID ()),
276+ AckDeadlineSeconds : int32 (c .AckDeadline .Seconds ()),
277+ MessageRetentionDuration : durationpb .New (* c .RetentionDuration ),
278+ EnableMessageOrdering : c .MessageOrdering ,
279+ Filter : c .Filter ,
244280 })
245281 if si .err != nil {
246282 return
247283 }
248284
249285 si .wasCreated = true
250286 }
287+
288+ // Success.
289+ si .sub = c .Client .Subscriber (c .SubscriptionID )
290+
251291 if c .ReceiveSettings == nil {
252- sub .ReceiveSettings = DefaultReceiveSettings
292+ si . sub .ReceiveSettings = DefaultReceiveSettings
253293 } else {
254- sub .ReceiveSettings = * c .ReceiveSettings
294+ si . sub .ReceiveSettings = * c .ReceiveSettings
255295 }
256- // Success.
257- si .sub = sub
258296 })
259297 if si .sub == nil {
260298 // Initialization failed, remove this attempt so that future callers
@@ -269,7 +307,7 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
269307 return si , nil
270308}
271309
272- func (c * Connection ) getOrCreateSubscription (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Subscription , error ) {
310+ func (c * Connection ) getOrCreateSubscription (ctx context.Context , getAlreadyOpenOnly bool ) (* pubsub.Subscriber , error ) {
273311 si , err := c .getOrCreateSubscriptionInfo (ctx , getAlreadyOpenOnly )
274312 if si != nil {
275313 return si .sub , nil
@@ -289,7 +327,9 @@ func (c *Connection) DeleteSubscription(ctx context.Context) error {
289327 if ! si .wasCreated {
290328 return errors .New ("subscription was not created by pubsub protocol" )
291329 }
292- if err := si .sub .Delete (ctx ); err != nil {
330+ if err := c .Client .SubscriptionAdminClient .DeleteSubscription (ctx , & pubsubpb.DeleteSubscriptionRequest {
331+ Subscription : fmt .Sprintf ("projects/%s/subscriptions/%s" , c .Client .Project (), si .sub .ID ()),
332+ }); err != nil {
293333 return err
294334 }
295335
0 commit comments