Skip to content

Data doesn't get removed from subscribers and subscribersByTopicLock maps (GoChannel pub/sub) #391

Open
@avlajcic-axilis

Description

@avlajcic-axilis

Hello,
I've been testing Watermill with one of our services and I've noticed constant memory increase while using it (slight, but still there). After checking code in my service, I went to check Watermill implementation and I might have find an issue with implementation of GoChannel pub/sub.

Explanation

When we create new Sub, new data will be added to subscribers map and subscribersByTopicLock map.
After context is done or pub/sub is closing, subscriber will be removed from subscribers map.
But, only data from that subscriber will be removed from map. If that is last subscriber of that topic, data from g.subsribers[topic], and g.subscribersByTopicLock[topic] won't be removed.

That leads to believe that if we generate new topics over time, maps will still contain data for topics which are no longer used.

Proof

I've added following logs in pubSub.go L214

g.logger.Debug("Removing subscriber", watermill.LogFields{
	"lockMapSize":  g.countLockMapSize(),
	"subSizeTopic": len(g.subscribers[topic]),
	"subSizeMap":   len(g.subscribers),
	"topic":        topic,
})
g.removeSubscriber(topic, s)
g.logger.Debug("Removed subscriber", watermill.LogFields{
	"lockMapSize":  g.countLockMapSize(),
	"subSizeTopic": len(g.subscribers[topic]),
	"subSizeMap":   len(g.subscribers),
	"topic":        topic,
})

And I've run simple test which creates 100 new subscribers with different topics, and closes them after 500ms

func TestSubscribe_clean_map(t *testing.T) {
	subCount := 100
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{OutputChannelBuffer: int64(subCount)},
		watermill.NewStdLogger(true, false),
	)
	topicName := "test_topic"

	done := make(chan bool, subCount)

	for i := 0; i < subCount; i++ {
		ctx, cancel := context.WithCancel(context.Background())
		_, err := pubSub.Subscribe(ctx, topicName+"_index_"+strconv.Itoa(i))
		require.NoError(t, err)
		go func() {
			select {
			case <-time.After(500 * time.Millisecond):
				cancel()
				done <- true
			}
		}()
	}

	for i := 0; i < subCount; i++ {
		<-done
	}
	assert.NoError(t, pubSub.Close())
}

Final debug logs produced from this test:

[watermill] 2023/09/13 15:40:11.697577 pubsub.go:214: 	level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_47 
[watermill] 2023/09/13 15:40:11.697587 pubsub.go:224: 	level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_47 
[watermill] 2023/09/13 15:40:11.697604 pubsub.go:364: 	level=DEBUG msg="GoChannel Pub/Sub Subscriber closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj 
[watermill] 2023/09/13 15:40:11.697615 pubsub.go:214: 	level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_22 
[watermill] 2023/09/13 15:40:11.697630 pubsub.go:224: 	level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_22 
[watermill] 2023/09/13 15:40:11.692512 pubsub.go:364: 	level=DEBUG msg="GoChannel Pub/Sub Subscriber closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj 
[watermill] 2023/09/13 15:40:11.697649 pubsub.go:214: 	level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_29 
[watermill] 2023/09/13 15:40:11.697660 pubsub.go:224: 	level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_29 
[watermill] 2023/09/13 15:40:11.697667 pubsub.go:333: 	level=INFO  msg="Pub/Sub closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj 

As you can see, size of subscriber and subscribersByTopicLock map remained 100 even though all subscribers are closed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions