From 0ba0f1fe265a8ad1d64e53bad37c4aec4f150bcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 22 Feb 2024 10:52:00 -0400 Subject: [PATCH] refactor: use ttl of 5m for subscriptions (#1041) --- waku/v2/protocol/filter/filter_test.go | 43 ------------- waku/v2/protocol/filter/options.go | 2 +- waku/v2/protocol/filter/server.go | 10 +-- waku/v2/protocol/filter/subscribers_map.go | 64 +++++++++++-------- .../protocol/filter/subscribers_map_test.go | 29 +++++---- 5 files changed, 62 insertions(+), 86 deletions(-) diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index c7e27e710..e5dac7182 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -368,49 +368,6 @@ func (s *FilterTestSuite) TearDownTest() { s.ctxCancel() } -func (s *FilterTestSuite) TestPeerFailure() { - broadcaster2 := relay.NewBroadcaster(10) - s.Require().NoError(broadcaster2.Start(context.Background())) - - // Initial subscribe - s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - - // Simulate there's been a failure before - s.fullNode.subscriptions.FlagAsFailure(s.lightNodeHost.ID()) - - // Sleep to make sure the filter is subscribed - time.Sleep(2 * time.Second) - - s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) - - s.waitForMsg(func() { - s.publishMsg(s.testTopic, s.testContentTopic) - }, s.subDetails[0].C) - - // Failure is removed - s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) - - // Kill the subscriber - s.lightNodeHost.Close() - - time.Sleep(1 * time.Second) - - s.publishMsg(s.testTopic, s.testContentTopic) - - // TODO: find out how to eliminate this sleep - time.Sleep(1 * time.Second) - s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) - - time.Sleep(2 * time.Second) - - s.publishMsg(s.testTopic, s.testContentTopic) - - time.Sleep(2 * time.Second) - - s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) // Failed peer has been removed - s.Require().False(s.fullNode.subscriptions.Has(s.lightNodeHost.ID())) // Failed peer has been removed -} - func (s *FilterTestSuite) TestRunningGuard() { s.lightNode.Stop() diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index f151eb716..afb9f2f1e 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -194,7 +194,7 @@ func WithPeerManager(pm *peermanager.PeerManager) Option { func DefaultOptions() []Option { return []Option{ - WithTimeout(24 * time.Hour), + WithTimeout(5 * time.Minute), WithMaxSubscribers(DefaultMaxSubscriptions), } } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index cea5efab6..29f38e12d 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -82,6 +82,8 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error { wf.WaitGroup().Add(1) go wf.filterListener(wf.Context()) + wf.subscriptions.Start(wf.Context()) + wf.log.Info("filter-subscriber protocol started") return nil } @@ -155,9 +157,11 @@ func (wf *WakuFilterFullNode) reply(ctx context.Context, stream network.Stream, } func (wf *WakuFilterFullNode) ping(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) { - exists := wf.subscriptions.Has(stream.Conn().RemotePeer()) + peerID := stream.Conn().RemotePeer() + exists := wf.subscriptions.Has(peerID) if exists { + wf.subscriptions.Refresh(peerID) wf.reply(ctx, stream, request, http.StatusOK) } else { wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription) @@ -265,7 +269,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge stream, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1) if err != nil { - wf.subscriptions.FlagAsFailure(peerID) if errors.Is(context.DeadlineExceeded, err) { wf.metrics.RecordError(pushTimeoutFailure) } else { @@ -284,7 +287,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge wf.metrics.RecordError(writeResponseFailure) } logger.Error("pushing messages to peer", zap.Error(err)) - wf.subscriptions.FlagAsFailure(peerID) if err := stream.Reset(); err != nil { wf.log.Error("resetting connection", zap.Error(err)) } @@ -293,8 +295,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge stream.Close() - wf.subscriptions.FlagAsSuccess(peerID) - logger.Debug("message pushed succesfully") return nil diff --git a/waku/v2/protocol/filter/subscribers_map.go b/waku/v2/protocol/filter/subscribers_map.go index 80a364e11..faa5700ca 100644 --- a/waku/v2/protocol/filter/subscribers_map.go +++ b/waku/v2/protocol/filter/subscribers_map.go @@ -1,6 +1,7 @@ package filter import ( + "context" "encoding/hex" "errors" "sync" @@ -17,14 +18,15 @@ type PubsubTopics map[string]protocol.ContentTopicSet // pubsubTopic => contentT var errNotFound = errors.New("not found") +const cleanupInterval = time.Minute + type SubscribersMap struct { sync.RWMutex items map[peer.ID]PubsubTopics interestMap map[string]PeerSet // key: sha256(pubsubTopic-contentTopic) => peers - timeout time.Duration - failedPeers map[peer.ID]time.Time + lastSeen map[peer.ID]time.Time } func NewSubscribersMap(timeout time.Duration) *SubscribersMap { @@ -32,23 +34,29 @@ func NewSubscribersMap(timeout time.Duration) *SubscribersMap { items: make(map[peer.ID]PubsubTopics), interestMap: make(map[string]PeerSet), timeout: timeout, - failedPeers: make(map[peer.ID]time.Time), + lastSeen: make(map[peer.ID]time.Time), } } +func (sub *SubscribersMap) Start(ctx context.Context) { + go sub.cleanUp(ctx, cleanupInterval) +} + func (sub *SubscribersMap) Clear() { sub.Lock() defer sub.Unlock() sub.items = make(map[peer.ID]PubsubTopics) sub.interestMap = make(map[string]PeerSet) - sub.failedPeers = make(map[peer.ID]time.Time) + sub.lastSeen = make(map[peer.ID]time.Time) } func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) { sub.Lock() defer sub.Unlock() + sub.lastSeen[peerID] = time.Now() + pubsubTopicMap, ok := sub.items[peerID] if !ok { pubsubTopicMap = make(PubsubTopics) @@ -105,6 +113,10 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop return errNotFound } + // Updating first the lastSeen since this is a valid activity + // (it will still get deleted if all content topics are removed) + sub.lastSeen[peerID] = time.Now() + // Removing content topics individually for _, c := range contentTopics { c := c @@ -123,6 +135,7 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop if len(sub.items[peerID]) == 0 { delete(sub.items, peerID) + delete(sub.lastSeen, peerID) } return nil @@ -142,6 +155,7 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error { } delete(sub.items, peerID) + delete(sub.lastSeen, peerID) return nil } @@ -158,6 +172,7 @@ func (sub *SubscribersMap) RemoveAll() { defer sub.Unlock() sub.items = make(map[peer.ID]PubsubTopics) + sub.lastSeen = make(map[peer.ID]time.Time) } func (sub *SubscribersMap) Count() int { @@ -213,34 +228,31 @@ func getKey(pubsubTopic string, contentTopic string) string { } -func (sub *SubscribersMap) IsFailedPeer(peerID peer.ID) bool { - sub.RLock() - defer sub.RUnlock() - _, ok := sub.failedPeers[peerID] - return ok -} - -func (sub *SubscribersMap) FlagAsSuccess(peerID peer.ID) { +func (sub *SubscribersMap) Refresh(peerID peer.ID) { sub.Lock() defer sub.Unlock() - _, ok := sub.failedPeers[peerID] - if ok { - delete(sub.failedPeers, peerID) - } + sub.lastSeen[peerID] = time.Now() } -func (sub *SubscribersMap) FlagAsFailure(peerID peer.ID) { - sub.Lock() - defer sub.Unlock() +func (sub *SubscribersMap) cleanUp(ctx context.Context, cleanupInterval time.Duration) { + t := time.NewTicker(cleanupInterval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + sub.Lock() + for peerID, lastSeen := range sub.lastSeen { + elapsedTime := time.Since(lastSeen) + if elapsedTime < sub.timeout { + _ = sub.deleteAll(peerID) + } - lastFailure, ok := sub.failedPeers[peerID] - if ok { - elapsedTime := time.Since(lastFailure) - if elapsedTime < sub.timeout { - _ = sub.deleteAll(peerID) + } + sub.Unlock() } - } else { - sub.failedPeers[peerID] = time.Now() } } diff --git a/waku/v2/protocol/filter/subscribers_map_test.go b/waku/v2/protocol/filter/subscribers_map_test.go index b4a0d6b88..7b7dd74b3 100644 --- a/waku/v2/protocol/filter/subscribers_map_test.go +++ b/waku/v2/protocol/filter/subscribers_map_test.go @@ -1,6 +1,7 @@ package filter import ( + "context" "testing" "time" @@ -106,23 +107,29 @@ func TestRemoveBogus(t *testing.T) { require.Error(t, err) } -func TestSuccessFailure(t *testing.T) { - subs := NewSubscribersMap(5 * time.Second) +func TestCleanup(t *testing.T) { + subs := NewSubscribersMap(2 * time.Second) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go subs.cleanUp(ctx, 500*time.Millisecond) + peerId := createPeerID(t) subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"}) - subs.FlagAsFailure(peerId) - require.True(t, subs.IsFailedPeer(peerId)) + hasSubs := subs.Has(peerId) + require.True(t, hasSubs) - subs.FlagAsFailure(peerId) - require.False(t, subs.Has(peerId)) + _, exists := subs.Get(peerId) + require.True(t, exists) - subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"}) + time.Sleep(2 * time.Second) - subs.FlagAsFailure(peerId) - require.True(t, subs.IsFailedPeer(peerId)) + hasSubs = subs.Has(peerId) + require.False(t, hasSubs) - subs.FlagAsSuccess(peerId) - require.False(t, subs.IsFailedPeer(peerId)) + _, exists = subs.Get(peerId) + require.False(t, exists) }