Description
Hello,
I've been testing Watermill with one of our services and I've noticed constant memory increase while using it (slight, but still there). After checking code in my service, I went to check Watermill implementation and I might have find an issue with implementation of GoChannel pub/sub.
Explanation
When we create new Sub, new data will be added to subscribers map and subscribersByTopicLock map.
After context is done or pub/sub is closing, subscriber will be removed from subscribers map.
But, only data from that subscriber will be removed from map. If that is last subscriber of that topic, data from g.subsribers[topic]
, and g.subscribersByTopicLock[topic]
won't be removed.
That leads to believe that if we generate new topics over time, maps will still contain data for topics which are no longer used.
Proof
I've added following logs in pubSub.go L214
g.logger.Debug("Removing subscriber", watermill.LogFields{
"lockMapSize": g.countLockMapSize(),
"subSizeTopic": len(g.subscribers[topic]),
"subSizeMap": len(g.subscribers),
"topic": topic,
})
g.removeSubscriber(topic, s)
g.logger.Debug("Removed subscriber", watermill.LogFields{
"lockMapSize": g.countLockMapSize(),
"subSizeTopic": len(g.subscribers[topic]),
"subSizeMap": len(g.subscribers),
"topic": topic,
})
And I've run simple test which creates 100 new subscribers with different topics, and closes them after 500ms
func TestSubscribe_clean_map(t *testing.T) {
subCount := 100
pubSub := gochannel.NewGoChannel(
gochannel.Config{OutputChannelBuffer: int64(subCount)},
watermill.NewStdLogger(true, false),
)
topicName := "test_topic"
done := make(chan bool, subCount)
for i := 0; i < subCount; i++ {
ctx, cancel := context.WithCancel(context.Background())
_, err := pubSub.Subscribe(ctx, topicName+"_index_"+strconv.Itoa(i))
require.NoError(t, err)
go func() {
select {
case <-time.After(500 * time.Millisecond):
cancel()
done <- true
}
}()
}
for i := 0; i < subCount; i++ {
<-done
}
assert.NoError(t, pubSub.Close())
}
Final debug logs produced from this test:
[watermill] 2023/09/13 15:40:11.697577 pubsub.go:214: level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_47
[watermill] 2023/09/13 15:40:11.697587 pubsub.go:224: level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_47
[watermill] 2023/09/13 15:40:11.697604 pubsub.go:364: level=DEBUG msg="GoChannel Pub/Sub Subscriber closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj
[watermill] 2023/09/13 15:40:11.697615 pubsub.go:214: level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_22
[watermill] 2023/09/13 15:40:11.697630 pubsub.go:224: level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_22
[watermill] 2023/09/13 15:40:11.692512 pubsub.go:364: level=DEBUG msg="GoChannel Pub/Sub Subscriber closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj
[watermill] 2023/09/13 15:40:11.697649 pubsub.go:214: level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_29
[watermill] 2023/09/13 15:40:11.697660 pubsub.go:224: level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_29
[watermill] 2023/09/13 15:40:11.697667 pubsub.go:333: level=INFO msg="Pub/Sub closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj
As you can see, size of subscriber
and subscribersByTopicLock
map remained 100 even though all subscribers are closed.