Skip to content

Commit

Permalink
Reduce Routing Table churn (#90)
Browse files Browse the repository at this point in the history
* reduce Routing Table churn
  • Loading branch information
aarshkshah1992 authored Jun 4, 2020
1 parent 86c2b9a commit f49a71a
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 78 deletions.
11 changes: 11 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type PeerInfo struct {

// Id of the peer in the DHT XOR keyspace
dhtId ID

// if a bucket is full, this peer can be replaced to make space for a new peer.
replaceable bool
}

// bucket holds a list of peers.
Expand Down Expand Up @@ -76,6 +79,14 @@ func (b *bucket) min(lessThan func(p1 *PeerInfo, p2 *PeerInfo) bool) *PeerInfo {
return minVal
}

// updateAllWith updates all the peers in the bucket by applying the given update function.
func (b *bucket) updateAllWith(updateFnc func(p *PeerInfo)) {
for e := b.list.Front(); e != nil; e = e.Next() {
val := e.Value.(*PeerInfo)
updateFnc(val)
}
}

// return the Ids of all the peers in the bucket.
func (b *bucket) peerIds() []peer.ID {
ps := make([]peer.ID, 0, b.list.Len())
Expand Down
43 changes: 42 additions & 1 deletion bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBucketMinimum(t *testing.T) {
return first.LastUsefulAt.Before(second.LastUsefulAt)
}).Id)

// first is till min
// first is still min
b.pushFront(&PeerInfo{Id: pid2, LastUsefulAt: time.Now().AddDate(1, 0, 0)})
require.Equal(t, pid1, b.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
Expand All @@ -37,3 +37,44 @@ func TestBucketMinimum(t *testing.T) {
return first.LastUsefulAt.Before(second.LastUsefulAt)
}).Id)
}

func TestUpdateAllWith(t *testing.T) {
t.Parallel()

b := newBucket()
// dont crash
b.updateAllWith(func(p *PeerInfo) {})

pid1 := test.RandPeerIDFatal(t)
pid2 := test.RandPeerIDFatal(t)
pid3 := test.RandPeerIDFatal(t)

// peer1
b.pushFront(&PeerInfo{Id: pid1, replaceable: false})
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = true
})
require.True(t, b.getPeer(pid1).replaceable)

// peer2
b.pushFront(&PeerInfo{Id: pid2, replaceable: false})
b.updateAllWith(func(p *PeerInfo) {
if p.Id == pid1 {
p.replaceable = false
} else {
p.replaceable = true
}
})
require.True(t, b.getPeer(pid2).replaceable)
require.False(t, b.getPeer(pid1).replaceable)

// peer3
b.pushFront(&PeerInfo{Id: pid3, replaceable: false})
require.False(t, b.getPeer(pid3).replaceable)
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = true
})
require.True(t, b.getPeer(pid1).replaceable)
require.True(t, b.getPeer(pid2).replaceable)
require.True(t, b.getPeer(pid3).replaceable)
}
36 changes: 28 additions & 8 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ func (rt *RoutingTable) NPeersForCpl(cpl uint) int {
// the boolean value will ALWAYS be false i.e. the peer wont be added to the Routing Table it it's not already there.
//
// A return value of false with error=nil indicates that the peer ALREADY exists in the Routing Table.
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

return rt.addPeer(p, queryPeer)
return rt.addPeer(p, queryPeer, isReplaceable)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool, isReplaceable bool) (bool, error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]

Expand Down Expand Up @@ -183,6 +183,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
Expand All @@ -203,27 +204,31 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
}
}

// the bucket to which the peer belongs is full. Let's try to find a peer
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
minLast := bucket.min(func(first *PeerInfo, second *PeerInfo) bool {
return first.LastUsefulAt.Before(second.LastUsefulAt)
// in that bucket which is replaceable.
// we don't really need a stable sort here as it dosen't matter which peer we evict
// as long as it's a replaceable peer.
replaceablePeer := bucket.min(func(p1 *PeerInfo, p2 *PeerInfo) bool {
return p1.replaceable
})

if time.Since(minLast.LastUsefulAt) > rt.usefulnessGracePeriod {
if replaceablePeer != nil && replaceablePeer.replaceable {
// let's evict it and add the new peer
if rt.removePeer(minLast.Id) {
if rt.removePeer(replaceablePeer.Id) {
bucket.pushFront(&PeerInfo{
Id: p,
LastUsefulAt: lastUsefulAt,
LastSuccessfulOutboundQueryAt: now,
AddedAt: now,
dhtId: ConvertPeerID(p),
replaceable: isReplaceable,
})
rt.PeerAdded(p)
return true, nil
Expand All @@ -237,6 +242,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
return false, ErrPeerRejectedNoCapacity
}

// MarkAllPeersIrreplaceable marks all peers in the routing table as irreplaceable
// This means that we will never replace an existing peer in the table to make space for a new peer.
// However, they can still be removed by calling the `RemovePeer` API.
func (rt *RoutingTable) MarkAllPeersIrreplaceable() {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

for i := range rt.buckets {
b := rt.buckets[i]
b.updateAllWith(func(p *PeerInfo) {
p.replaceable = false
})
}
}

// GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
rt.tabLock.RLock()
Expand Down
4 changes: 2 additions & 2 deletions table_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) {

// add peer IDs.
for i, id := range peerIDs {
added, err := rt.TryAddPeer(id, true)
added, err := rt.TryAddPeer(id, true, false)
require.NoError(t, err)
require.True(t, added)
require.Len(t, rt.GetTrackedCplsForRefresh(), minCpl+i+1)
Expand All @@ -83,7 +83,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) {
}

// add our peer ID to max out the table
added, err := rt.TryAddPeer(local, true)
added, err := rt.TryAddPeer(local, true, false)
require.NoError(t, err)
require.True(t, added)

Expand Down
Loading

0 comments on commit f49a71a

Please sign in to comment.