@@ -4,11 +4,12 @@ import (
4
4
"context"
5
5
"fmt"
6
6
"maps"
7
- "sync "
7
+ "slices "
8
8
"testing"
9
9
"time"
10
10
11
11
"github.com/NethermindEth/juno/consensus/p2p/buffered"
12
+ "github.com/NethermindEth/juno/consensus/p2p/config"
12
13
"github.com/NethermindEth/juno/p2p/pubsub/testutils"
13
14
"github.com/NethermindEth/juno/utils"
14
15
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -22,17 +23,22 @@ import (
22
23
)
23
24
24
25
const (
25
- chainID = "1"
26
- protocolID = "test-buffered-topic-subscription-protocol"
27
- topicName = "test-buffered-topic-subscription-topic"
28
- nodeCount = 20
29
- messageCount = 100
30
- logLevel = zapcore .ErrorLevel
31
- retryInterval = 1 * time .Second
26
+ chainID = "1"
27
+ protocolID = "test-buffered-topic-subscription-protocol"
28
+ topicName = "test-buffered-topic-subscription-topic"
29
+ nodeCount = 20
30
+ messageCount = 50
31
+ logLevel = zapcore .InfoLevel
32
+ maxWait = 5 * time .Second
32
33
)
33
34
34
35
type TestMessage = consensus.ConsensusStreamId
35
36
37
+ type origin struct {
38
+ Source int
39
+ Index int
40
+ }
41
+
36
42
func TestBufferedTopicSubscriptionAndProtoBroadcaster (t * testing.T ) {
37
43
t .Run (fmt .Sprintf ("%d nodes, each sending %d messages" , nodeCount , messageCount ), func (t * testing.T ) {
38
44
logger , err := utils .NewZapLogger (utils .NewLogLevel (logLevel ), true )
@@ -42,7 +48,7 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
42
48
topics := nodes .JoinTopic (t , chainID , protocolID , topicName )
43
49
44
50
messages := make ([][]* TestMessage , nodeCount )
45
- allMessages := make (map [string ]struct {} )
51
+ allMessages := make (map [string ]origin )
46
52
47
53
for i := range messages {
48
54
messages [i ] = make ([]* TestMessage , messageCount )
@@ -53,46 +59,65 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
53
59
msgBytes , err := proto .Marshal (msg )
54
60
require .NoError (t , err )
55
61
56
- allMessages [string (msgBytes )] = struct {}{ }
62
+ allMessages [string (msgBytes )] = origin { Source : i , Index : j }
57
63
}
58
64
}
59
65
60
- iterator := iter.Iterator [* pubsub.Topic ]{MaxGoroutines : len ( topics ) }
61
- wg := sync. WaitGroup {}
62
- wg . Add ( len ( messages ) )
66
+ iterator := iter.Iterator [* pubsub.Topic ]{MaxGoroutines : nodeCount }
67
+ finished := make ( chan struct {}, nodeCount )
68
+ liveness := make ( chan struct {}, 1 )
63
69
64
70
go func () {
65
71
iterator .ForEachIdx (topics , func (i int , destination * * pubsub.Topic ) {
66
72
logger := & utils.ZapLogger {SugaredLogger : logger .Named (fmt .Sprintf ("destination-%d" , i ))}
67
73
pending := maps .Clone (allMessages )
74
+
75
+ // Ignore the messages we are broadcasting
76
+ for _ , message := range messages [i ] {
77
+ msgBytes , err := proto .Marshal (message )
78
+ require .NoError (t , err )
79
+ delete (pending , string (msgBytes ))
80
+ }
81
+
68
82
subscription := buffered .NewTopicSubscription (logger , nodeCount * messageCount , func (ctx context.Context , msg * pubsub.Message ) {
69
- if len (pending ) == 0 {
83
+ msgStr := string (msg .Message .Data )
84
+ if _ , ok := pending [msgStr ]; ! ok {
70
85
return
71
86
}
72
87
73
- delete (pending , string (msg .Message .Data ))
88
+ select {
89
+ case liveness <- struct {}{}:
90
+ default :
91
+ }
92
+
93
+ delete (pending , msgStr )
94
+
74
95
if len (pending ) == 0 {
75
- wg . Done ()
96
+ finished <- struct {}{}
76
97
logger .Info ("all messages received" )
77
98
}
78
99
logger .Debugw ("received" , "message" , string (msg .Message .Data ), "pending" , len (pending ))
79
100
})
80
101
81
102
subscription .Loop (t .Context (), * destination )
103
+ if len (pending ) > 0 {
104
+ logger .Infow ("missing messages" , "pending" , slices .Collect (maps .Values (pending )))
105
+ }
82
106
})
83
107
}()
84
108
85
109
go func () {
86
- time .Sleep (1 * time .Second )
87
110
iterator .ForEachIdx (topics , func (i int , source * * pubsub.Topic ) {
88
111
logger := & utils.ZapLogger {SugaredLogger : logger .Named (fmt .Sprintf ("source-%d" , i ))}
112
+ rebroadcastInterval := config .DefaultBufferSizes .RebroadcastInterval
113
+
89
114
var rebroadcastStrategy buffered.RebroadcastStrategy [* TestMessage ]
90
115
if i % 2 == 0 {
91
- rebroadcastStrategy = buffered .NewRebroadcastStrategy (retryInterval , func (msg * TestMessage ) uint64 {
116
+ rebroadcastStrategy = buffered .NewRebroadcastStrategy (rebroadcastInterval , func (msg * TestMessage ) uint64 {
92
117
return msg .BlockNumber
93
118
})
94
119
}
95
- broadcaster := buffered .NewProtoBroadcaster (logger , messageCount , retryInterval , rebroadcastStrategy )
120
+ broadcaster := buffered .NewProtoBroadcaster (logger , messageCount , rebroadcastInterval , rebroadcastStrategy )
96
121
go broadcaster .Loop (t .Context (), * source )
97
122
for _ , message := range messages [i ] {
98
123
logger .Debugw ("publishing" , "message" , message )
@@ -101,7 +126,9 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
101
126
})
102
127
}()
103
128
104
- wg .Wait ()
129
+ for range nodeCount {
130
+ wait (t , liveness , finished )
131
+ }
105
132
})
106
133
107
134
t .Run ("canceled context" , func (t * testing.T ) {
@@ -128,6 +155,21 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
128
155
})
129
156
}
130
157
158
+ func wait (t * testing.T , liveness , finished chan struct {}) {
159
+ t .Helper ()
160
+ for {
161
+ select {
162
+ case <- finished :
163
+ return
164
+ case <- liveness :
165
+ continue
166
+ case <- time .After (maxWait ):
167
+ require .FailNow (t , "liveness check failed" )
168
+ return
169
+ }
170
+ }
171
+ }
172
+
131
173
func getTestMessage (node , messageIndex int ) * TestMessage {
132
174
return & TestMessage {
133
175
Nonce : uint64 (node * messageCount + messageIndex ),
0 commit comments