This repository has been archived by the owner on Aug 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 111
/
Copy pathprotocol_test.go
113 lines (103 loc) · 3.31 KB
/
protocol_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package pushsync
import (
"context"
"encoding/binary"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
)
// TestProtocol tests the push sync protocol
// push syncer node communicate with storers via mock loopback PubSub
func TestProtocol(t *testing.T) {
timeout := 10 * time.Second
chunkCnt := 1024
tagCnt := 4
storerCnt := 4
sent := &sync.Map{}
store := &sync.Map{}
// mock pubsub messenger
lb := newLoopBack()
// set up a number of storers
storers := make([]*Storer, storerCnt)
for i := 0; i < storerCnt; i++ {
// every chunk is closest to exactly one storer
j := i
isClosestTo := func(addr []byte) bool {
n := int(binary.BigEndian.Uint64(addr[:8]))
log.Debug("closest node?", "n", n, "n%storerCnt", n%storerCnt, "storer", j)
return n%storerCnt == j
}
storers[j] = NewStorer(&testStore{store}, &testPubSub{lb, isClosestTo})
}
tags, tagIDs := setupTags(chunkCnt, tagCnt)
// construct the mock push sync index iterator
tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent)
// isClosestTo function mocked
isClosestTo := func([]byte) bool { return false }
// start push syncing in a go routine
p := NewPusher(tp, &testPubSub{lb, isClosestTo}, tags)
defer p.Close()
synced := make(map[int]int)
for {
select {
// receive indexes on synced channel when a chunk is set as synced
case idx := <-tp.synced:
n := synced[idx]
synced[idx] = n + 1
case <-time.After(timeout):
t.Fatalf("timeout waiting for all chunks to be synced")
}
// all chunks set as synced
if len(synced) == chunkCnt {
expTotal := int64(chunkCnt / tagCnt)
checkTags(t, expTotal, tagIDs[:tagCnt-1], tags)
for i := uint64(0); i < uint64(chunkCnt); i++ {
if n := synced[int(i)]; n != 1 {
t.Fatalf("expected to receive exactly 1 receipt for chunk %v, got %v", i, n)
}
v, ok := store.Load(i)
if !ok {
t.Fatalf("chunk %v not stored", i)
}
if cnt := *(v.(*uint32)); cnt < uint32(storerCnt) {
t.Fatalf("chunk %v expected to be saved at least %v times, got %v", i, storerCnt, cnt)
}
}
return
}
}
}
type testStore struct {
store *sync.Map
}
func (t *testStore) Put(_ context.Context, _ chunk.ModePut, chs ...chunk.Chunk) ([]bool, error) {
exists := make([]bool, len(chs))
for i, ch := range chs {
idx := binary.BigEndian.Uint64(ch.Address()[:8])
var storedCnt uint32 = 1
if v, loaded := t.store.LoadOrStore(idx, &storedCnt); loaded {
atomic.AddUint32(v.(*uint32), 1)
exists[i] = loaded
}
log.Debug("testStore put", "idx", idx)
}
return exists, nil
}