Skip to content

Commit

Permalink
refactor: use ttl of 5m for subscriptions (#1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Feb 22, 2024
1 parent 0bdd359 commit 0ba0f1f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 86 deletions.
43 changes: 0 additions & 43 deletions waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,49 +368,6 @@ func (s *FilterTestSuite) TearDownTest() {
s.ctxCancel()
}

func (s *FilterTestSuite) TestPeerFailure() {
broadcaster2 := relay.NewBroadcaster(10)
s.Require().NoError(broadcaster2.Start(context.Background()))

// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Simulate there's been a failure before
s.fullNode.subscriptions.FlagAsFailure(s.lightNodeHost.ID())

// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)

s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID()))

s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic)
}, s.subDetails[0].C)

// Failure is removed
s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID()))

// Kill the subscriber
s.lightNodeHost.Close()

time.Sleep(1 * time.Second)

s.publishMsg(s.testTopic, s.testContentTopic)

// TODO: find out how to eliminate this sleep
time.Sleep(1 * time.Second)
s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID()))

time.Sleep(2 * time.Second)

s.publishMsg(s.testTopic, s.testContentTopic)

time.Sleep(2 * time.Second)

s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) // Failed peer has been removed
s.Require().False(s.fullNode.subscriptions.Has(s.lightNodeHost.ID())) // Failed peer has been removed
}

func (s *FilterTestSuite) TestRunningGuard() {

s.lightNode.Stop()
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func WithPeerManager(pm *peermanager.PeerManager) Option {

func DefaultOptions() []Option {
return []Option{
WithTimeout(24 * time.Hour),
WithTimeout(5 * time.Minute),
WithMaxSubscribers(DefaultMaxSubscriptions),
}
}
10 changes: 5 additions & 5 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error {
wf.WaitGroup().Add(1)
go wf.filterListener(wf.Context())

wf.subscriptions.Start(wf.Context())

wf.log.Info("filter-subscriber protocol started")
return nil
}
Expand Down Expand Up @@ -155,9 +157,11 @@ func (wf *WakuFilterFullNode) reply(ctx context.Context, stream network.Stream,
}

func (wf *WakuFilterFullNode) ping(ctx context.Context, stream network.Stream, request *pb.FilterSubscribeRequest) {
exists := wf.subscriptions.Has(stream.Conn().RemotePeer())
peerID := stream.Conn().RemotePeer()

exists := wf.subscriptions.Has(peerID)
if exists {
wf.subscriptions.Refresh(peerID)
wf.reply(ctx, stream, request, http.StatusOK)
} else {
wf.reply(ctx, stream, request, http.StatusNotFound, peerHasNoSubscription)
Expand Down Expand Up @@ -265,7 +269,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge

stream, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1)
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)
if errors.Is(context.DeadlineExceeded, err) {
wf.metrics.RecordError(pushTimeoutFailure)
} else {
Expand All @@ -284,7 +287,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
wf.metrics.RecordError(writeResponseFailure)
}
logger.Error("pushing messages to peer", zap.Error(err))
wf.subscriptions.FlagAsFailure(peerID)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
Expand All @@ -293,8 +295,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge

stream.Close()

wf.subscriptions.FlagAsSuccess(peerID)

logger.Debug("message pushed succesfully")

return nil
Expand Down
64 changes: 38 additions & 26 deletions waku/v2/protocol/filter/subscribers_map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filter

import (
"context"
"encoding/hex"
"errors"
"sync"
Expand All @@ -17,38 +18,45 @@ type PubsubTopics map[string]protocol.ContentTopicSet // pubsubTopic => contentT

var errNotFound = errors.New("not found")

const cleanupInterval = time.Minute

type SubscribersMap struct {
sync.RWMutex

items map[peer.ID]PubsubTopics
interestMap map[string]PeerSet // key: sha256(pubsubTopic-contentTopic) => peers

timeout time.Duration
failedPeers map[peer.ID]time.Time
lastSeen map[peer.ID]time.Time
}

func NewSubscribersMap(timeout time.Duration) *SubscribersMap {
return &SubscribersMap{
items: make(map[peer.ID]PubsubTopics),
interestMap: make(map[string]PeerSet),
timeout: timeout,
failedPeers: make(map[peer.ID]time.Time),
lastSeen: make(map[peer.ID]time.Time),
}
}

func (sub *SubscribersMap) Start(ctx context.Context) {
go sub.cleanUp(ctx, cleanupInterval)
}

func (sub *SubscribersMap) Clear() {
sub.Lock()
defer sub.Unlock()

sub.items = make(map[peer.ID]PubsubTopics)
sub.interestMap = make(map[string]PeerSet)
sub.failedPeers = make(map[peer.ID]time.Time)
sub.lastSeen = make(map[peer.ID]time.Time)
}

func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) {
sub.Lock()
defer sub.Unlock()

sub.lastSeen[peerID] = time.Now()

pubsubTopicMap, ok := sub.items[peerID]
if !ok {
pubsubTopicMap = make(PubsubTopics)
Expand Down Expand Up @@ -105,6 +113,10 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop
return errNotFound
}

// Updating first the lastSeen since this is a valid activity
// (it will still get deleted if all content topics are removed)
sub.lastSeen[peerID] = time.Now()

// Removing content topics individually
for _, c := range contentTopics {
c := c
Expand All @@ -123,6 +135,7 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop

if len(sub.items[peerID]) == 0 {
delete(sub.items, peerID)
delete(sub.lastSeen, peerID)
}

return nil
Expand All @@ -142,6 +155,7 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error {
}

delete(sub.items, peerID)
delete(sub.lastSeen, peerID)

return nil
}
Expand All @@ -158,6 +172,7 @@ func (sub *SubscribersMap) RemoveAll() {
defer sub.Unlock()

sub.items = make(map[peer.ID]PubsubTopics)
sub.lastSeen = make(map[peer.ID]time.Time)
}

func (sub *SubscribersMap) Count() int {
Expand Down Expand Up @@ -213,34 +228,31 @@ func getKey(pubsubTopic string, contentTopic string) string {

}

func (sub *SubscribersMap) IsFailedPeer(peerID peer.ID) bool {
sub.RLock()
defer sub.RUnlock()
_, ok := sub.failedPeers[peerID]
return ok
}

func (sub *SubscribersMap) FlagAsSuccess(peerID peer.ID) {
func (sub *SubscribersMap) Refresh(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()

_, ok := sub.failedPeers[peerID]
if ok {
delete(sub.failedPeers, peerID)
}
sub.lastSeen[peerID] = time.Now()
}

func (sub *SubscribersMap) FlagAsFailure(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()
func (sub *SubscribersMap) cleanUp(ctx context.Context, cleanupInterval time.Duration) {
t := time.NewTicker(cleanupInterval)
defer t.Stop()

for {
select {
case <-ctx.Done():
return
case <-t.C:
sub.Lock()
for peerID, lastSeen := range sub.lastSeen {
elapsedTime := time.Since(lastSeen)
if elapsedTime < sub.timeout {
_ = sub.deleteAll(peerID)
}

lastFailure, ok := sub.failedPeers[peerID]
if ok {
elapsedTime := time.Since(lastFailure)
if elapsedTime < sub.timeout {
_ = sub.deleteAll(peerID)
}
sub.Unlock()
}
} else {
sub.failedPeers[peerID] = time.Now()
}
}
29 changes: 18 additions & 11 deletions waku/v2/protocol/filter/subscribers_map_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filter

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -106,23 +107,29 @@ func TestRemoveBogus(t *testing.T) {
require.Error(t, err)
}

func TestSuccessFailure(t *testing.T) {
subs := NewSubscribersMap(5 * time.Second)
func TestCleanup(t *testing.T) {
subs := NewSubscribersMap(2 * time.Second)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go subs.cleanUp(ctx, 500*time.Millisecond)

peerId := createPeerID(t)

subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"})

subs.FlagAsFailure(peerId)
require.True(t, subs.IsFailedPeer(peerId))
hasSubs := subs.Has(peerId)
require.True(t, hasSubs)

subs.FlagAsFailure(peerId)
require.False(t, subs.Has(peerId))
_, exists := subs.Get(peerId)
require.True(t, exists)

subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"})
time.Sleep(2 * time.Second)

subs.FlagAsFailure(peerId)
require.True(t, subs.IsFailedPeer(peerId))
hasSubs = subs.Has(peerId)
require.False(t, hasSubs)

subs.FlagAsSuccess(peerId)
require.False(t, subs.IsFailedPeer(peerId))
_, exists = subs.Get(peerId)
require.False(t, exists)
}

0 comments on commit 0ba0f1f

Please sign in to comment.