Skip to content

Commit

Permalink
client: resume subs only if present
Browse files Browse the repository at this point in the history
This patch only sends a PublishRequest after a reconnect when active
subscriptions are present. Sending a PublishRequest when no
subscriptions are present can confuse some servers.
  • Loading branch information
Daniele Vasselli authored and magiconair committed Mar 22, 2023
1 parent 2c767e1 commit ffb1c5f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
21 changes: 15 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,10 @@ func (c *Client) monitor(ctx context.Context) {
c.pauseSubscriptions(ctx)

var (
subsToRepublish []uint32 // subscription ids for which to send republish requests
subsToRecreate []uint32 // subscription ids which need to be recreated as new subscriptions
availableSeqs map[uint32][]uint32
subsToRepublish []uint32 // subscription ids for which to send republish requests
subsToRecreate []uint32 // subscription ids which need to be recreated as new subscriptions
availableSeqs map[uint32][]uint32 // available sequence numbers per subscription
activeSubs int // number of active subscriptions to resume/recreate
)

for action != none {
Expand Down Expand Up @@ -485,11 +486,13 @@ func (c *Client) monitor(ctx context.Context) {
// Assume that subsToRecreate and subsToRepublish have been
// populated in the previous step.

activeSubs = 0
for _, id := range subsToRepublish {
if err := c.republishSubscription(ctx, id, availableSeqs[id]); err != nil {
dlog.Printf("republish of subscription %d failed", id)
subsToRecreate = append(subsToRecreate, id)
}
activeSubs++
}

for _, id := range subsToRecreate {
Expand All @@ -498,6 +501,7 @@ func (c *Client) monitor(ctx context.Context) {
action = recreateSession
continue
}
activeSubs++
}

c.setState(Connected)
Expand All @@ -521,9 +525,14 @@ func (c *Client) monitor(ctx context.Context) {
<-c.sechanErr
}

dlog.Printf("resuming subscriptions")
c.resumeSubscriptions(ctx)
dlog.Printf("resumed subscriptions")
switch {
case activeSubs > 0:
dlog.Printf("resuming %d subscriptions", activeSubs)
c.resumeSubscriptions(ctx)
dlog.Printf("resumed %d subscriptions", activeSubs)
default:
dlog.Printf("no subscriptions to resume")
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ publish:

default:
// send publish request and handle response
//
// publish() blocks until a PublishResponse
// is received or the context is cancelled.
if err := c.publish(ctx); err != nil {
dlog.Print("error: ", err.Error())
c.pauseSubscriptions(ctx)
Expand Down

0 comments on commit ffb1c5f

Please sign in to comment.