Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
Revert "stream: limit the number of peers we sync with on bin 0 (#1972)…
Browse files Browse the repository at this point in the history
…" (#2096)

This reverts commit f114078.
  • Loading branch information
acud authored Feb 7, 2020
1 parent 80acd99 commit a836208
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 69 deletions.
68 changes: 18 additions & 50 deletions network/stream/cursors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ func TestNodesCorrectBinsDynamic(t *testing.T) {
nodeCount = 6
chunkCount = 500
)
binZeroPeers := 0

opts := &SyncSimServiceOptions{
InitialChunkCount: chunkCount,
}
Expand All @@ -114,69 +112,53 @@ func TestNodesCorrectBinsDynamic(t *testing.T) {
}, false)
defer sim.Close()

idPivot, err := sim.AddNode()
_, err := sim.AddNodesAndConnectStar(2)
if err != nil {
t.Fatal(err)
}

if len(sim.UpNodeIDs()) != 1 {
t.Fatal("node not started")
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != 2 {
t.Fatal("not enough nodes up")
}

pivotKademlia := nodeKademlia(sim, idPivot)
pivotSyncer := nodeRegistry(sim, idPivot)
waitForCursors(t, sim, nodeIDs[0], nodeIDs[1], true)
waitForCursors(t, sim, nodeIDs[1], nodeIDs[0], true)

for j := 1; j <= nodeCount; j++ {
for j := 2; j <= nodeCount; j++ {
// append a node to the simulation
id, err := sim.AddNode()
id, err := sim.AddNodes(1)
if err != nil {
t.Fatal(err)
}
err = sim.Net.ConnectNodesStar([]enode.ID{id}, idPivot)
err = sim.Net.ConnectNodesStar(id, nodeIDs[0])
if err != nil {
t.Fatal(err)
}
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != j+1 {
t.Fatalf("not enough nodes up. got %d, want %d", len(nodeIDs), j+1)
}
idPivot := nodeIDs[0]

otherKad := sim.MustNodeItem(id, simulation.BucketKeyKademlia).(*network.Kademlia)
po := chunk.Proximity(otherKad.BaseAddr(), pivotKademlia.BaseAddr())
if po == 0 {
binZeroPeers++
if binZeroPeers <= maxBinZeroSyncPeers {
waitForCursors(t, sim, idPivot, nodeIDs[j], true)
waitForCursors(t, sim, nodeIDs[j], idPivot, true)
} else {
// wait for the peer to get created in the protocol
waitForPeer(t, sim, idPivot, id)
}
} else {
waitForCursors(t, sim, idPivot, nodeIDs[j], true)
waitForCursors(t, sim, nodeIDs[j], idPivot, true)
}
waitForCursors(t, sim, idPivot, nodeIDs[j], true)
waitForCursors(t, sim, nodeIDs[j], idPivot, true)

binZeroRun := 0
for i := 1; i < len(nodeIDs); i++ {
pivotSyncer := nodeRegistry(sim, idPivot)
pivotKademlia := nodeKademlia(sim, idPivot)
pivotDepth := uint(pivotKademlia.NeighbourhoodDepth())

for i := 1; i < j; i++ {
idOther := nodeIDs[i]
otherKademlia := sim.MustNodeItem(idOther, simulation.BucketKeyKademlia).(*network.Kademlia)

po := chunk.Proximity(otherKademlia.BaseAddr(), pivotKademlia.BaseAddr())
pivotCursors := pivotSyncer.getPeer(idOther).getCursorsCopy()
pivotDepth := uint(pivotKademlia.NeighbourhoodDepth())
if po == 0 {
binZeroRun++
if binZeroRun > maxBinZeroSyncPeers {
continue
}
}

// check that the pivot node is interested just in bins >= depth
if po >= int(pivotDepth) {
othersBins := nodeInitialBinIndexes(sim, idOther)
if err := compareNodeBinsToStreamsWithDepth(t, pivotCursors, othersBins, pivotDepth); err != nil {
t.Error(i, j, po, err)
t.Error(err)
}
}
}
Expand Down Expand Up @@ -372,20 +354,6 @@ func setupReestablishCursorsSimulation(t *testing.T, tagetPO int) (sim *simulati
return
}

func waitForPeer(t *testing.T, sim *simulation.Simulation, pivotEnode, lookupEnode enode.ID) {
for i := 0; i < 1000; i++ { // 10s total wait
time.Sleep(5 * time.Millisecond)
s, ok := sim.Service(serviceNameStream, pivotEnode).(*Registry)
if !ok {
continue
}
p := s.getPeer(lookupEnode)
if p == nil {
continue
}
}
}

// waitForCursors checks if the pivot node has some cursors or not
// by periodically checking for them.
func waitForCursors(t *testing.T, sim *simulation.Simulation, pivotEnode, lookupEnode enode.ID, wantSome bool) {
Expand Down
22 changes: 3 additions & 19 deletions network/stream/sync_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ import (
)

const (
syncStreamName = "SYNC"
cacheCapacity = 10000
setCacheCapacity = 80000 // 80000 * 32 = ~2.5mb mem footprint, 80K chunks ~=330 megs of data
maxBinZeroSyncPeers = 3
syncStreamName = "SYNC"
cacheCapacity = 10000
setCacheCapacity = 80000 // 80000 * 32 = ~2.5mb mem footprint, 80K chunks ~=330 megs of data
)

var (
Expand All @@ -59,7 +58,6 @@ type syncProvider struct {
setCacheMtx sync.RWMutex // set cache mutex
setCache *lru.Cache // cache to reduce load on localstore to not set the same chunk as synced
logger log.Logger // logger that appends the base address to loglines
binZeroSem chan struct{} // semaphore to limit number of syncing peers on bin 0
}

// NewSyncProvider creates a new sync provider that is used by the stream protocol to sink data and control its behaviour
Expand All @@ -86,7 +84,6 @@ func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, baseAddr *netw
cache: c,
setCache: sc,
logger: log.NewBaseAddressLogger(baseAddr.ShortString()),
binZeroSem: make(chan struct{}, maxBinZeroSyncPeers),
}
}

Expand Down Expand Up @@ -317,24 +314,11 @@ func (s *syncProvider) InitPeer(p *Peer) {
case <-timer.C:
case <-p.quit:
return
case <-s.quit:
return
}

po := chunk.Proximity(p.BzzAddr.Over(), s.kad.BaseAddr())
depth := s.kad.NeighbourhoodDepth()

if po == 0 {
select {
case s.binZeroSem <- struct{}{}:
case <-p.quit:
return
case <-s.quit:
return
}
defer func() { <-s.binZeroSem }()

}
p.logger.Debug("update syncing subscriptions: initial", "po", po, "depth", depth)

subBins, quitBins := syncSubscriptionsDiff(po, -1, depth, s.kad.MaxProxDisplay, s.syncBinsOnlyWithinDepth)
Expand Down

0 comments on commit a836208

Please sign in to comment.