Skip to content

Commit e78c68b

Browse files
committed
feat: Rebroadcast latest prevote and precommit
1 parent bf548a0 commit e78c68b

File tree

8 files changed

+141
-51
lines changed

8 files changed

+141
-51
lines changed

consensus/p2p/buffered/buffered_test.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,19 @@ import (
2626
)
2727

2828
const (
29-
topicName = "test-buffered-topic-subscription"
30-
discoveryServiceTag = "test-buffered-topic-subscription-discovery"
31-
nodeCount = 20
32-
messageCount = 100
33-
throttledRate = 5 * nodeCount * time.Millisecond
34-
logLevel = zapcore.ErrorLevel
35-
retryInterval = 1 * time.Second
29+
topicName = "test-buffered-topic-subscription"
30+
nodeCount = 20
31+
messageCount = 100
32+
logLevel = zapcore.ErrorLevel
33+
retryInterval = 1 * time.Second
3634
)
3735

36+
type TestMessage = consensus.ConsensusStreamId
37+
3838
type node struct {
3939
host host.Host
4040
topic *pubsub.Topic
41-
messages []proto.Message
41+
messages []*TestMessage
4242
}
4343

4444
func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
@@ -67,7 +67,7 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
6767
allMessages := make(map[string]struct{})
6868

6969
for i := range nodes {
70-
nodes[i].messages = make([]proto.Message, messageCount)
70+
nodes[i].messages = make([]*TestMessage, messageCount)
7171
for j := range nodes[i].messages {
7272
msg := getTestMessage(i, j)
7373
nodes[i].messages[j] = msg
@@ -88,6 +88,10 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
8888
logger := &utils.ZapLogger{SugaredLogger: logger.Named(fmt.Sprintf("destination-%d", i))}
8989
pending := maps.Clone(allMessages)
9090
subscription := buffered.NewTopicSubscription(logger, nodeCount*messageCount, func(ctx context.Context, msg *pubsub.Message) {
91+
if len(pending) == 0 {
92+
return
93+
}
94+
9195
delete(pending, string(msg.Message.Data))
9296
if len(pending) == 0 {
9397
wg.Done()
@@ -104,12 +108,17 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
104108
time.Sleep(1 * time.Second)
105109
iterator.ForEachIdx(nodes, func(i int, source *node) {
106110
logger := &utils.ZapLogger{SugaredLogger: logger.Named(fmt.Sprintf("source-%d", i))}
107-
broadcaster := buffered.NewProtoBroadcaster(logger, messageCount, retryInterval)
111+
var rebroadcastStrategy buffered.RebroadcastStrategy[*TestMessage] = buffered.NoRebroadcast[*TestMessage]{}
112+
if i%2 == 0 {
113+
rebroadcastStrategy = buffered.NewRebroadcastLatest(retryInterval, func(msg *TestMessage) uint64 {
114+
return msg.BlockNumber
115+
})
116+
}
117+
broadcaster := buffered.NewProtoBroadcaster(logger, messageCount, retryInterval, rebroadcastStrategy)
108118
go broadcaster.Loop(t.Context(), source.topic)
109119
for _, message := range source.messages {
110120
logger.Debugw("publishing", "message", message)
111121
broadcaster.Broadcast(t.Context(), message)
112-
time.Sleep(throttledRate)
113122
}
114123
})
115124
}()
@@ -138,8 +147,8 @@ func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
138147
})
139148
}
140149

141-
func getTestMessage(node, messageIndex int) proto.Message {
142-
return &consensus.ConsensusStreamId{
150+
func getTestMessage(node, messageIndex int) *TestMessage {
151+
return &TestMessage{
143152
Nonce: uint64(node*messageCount + messageIndex),
144153
}
145154
}
@@ -158,7 +167,12 @@ func getNode(t *testing.T) node {
158167
)
159168
require.NoError(t, err)
160169

161-
gossipSub, err := pubsub.NewGossipSub(t.Context(), host)
170+
gossipSub, err := pubsub.NewGossipSub(
171+
t.Context(),
172+
host,
173+
pubsub.WithValidateQueueSize(nodeCount*messageCount),
174+
pubsub.WithPeerOutboundQueueSize(nodeCount*messageCount),
175+
)
162176
require.NoError(t, err)
163177

164178
topic, err := gossipSub.Join(topicName)
@@ -171,7 +185,7 @@ func getNode(t *testing.T) node {
171185
return node{
172186
host: host,
173187
topic: topic,
174-
messages: make([]proto.Message, messageCount),
188+
messages: make([]*TestMessage, messageCount),
175189
}
176190
}
177191

consensus/p2p/buffered/proto_broadcaster.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,36 @@ import (
1010
"google.golang.org/protobuf/proto"
1111
)
1212

13-
type ProtoBroadcaster struct {
14-
log utils.Logger
15-
ch chan proto.Message
16-
retryInterval time.Duration
13+
type ProtoBroadcaster[M proto.Message] struct {
14+
log utils.Logger
15+
ch chan M
16+
retryInterval time.Duration
17+
rebroadcastStrategy RebroadcastStrategy[M]
1718
}
1819

19-
func NewProtoBroadcaster(log utils.Logger, bufferSize int, retryInterval time.Duration) ProtoBroadcaster {
20-
return ProtoBroadcaster{
21-
log: log,
22-
ch: make(chan proto.Message, bufferSize),
23-
retryInterval: retryInterval,
20+
func NewProtoBroadcaster[M proto.Message](
21+
log utils.Logger,
22+
bufferSize int,
23+
retryInterval time.Duration,
24+
rebroadcastStrategy RebroadcastStrategy[M],
25+
) ProtoBroadcaster[M] {
26+
return ProtoBroadcaster[M]{
27+
log: log,
28+
ch: make(chan M, bufferSize),
29+
retryInterval: retryInterval,
30+
rebroadcastStrategy: rebroadcastStrategy,
2431
}
2532
}
2633

27-
func (b ProtoBroadcaster) Broadcast(ctx context.Context, msg proto.Message) {
34+
func (b ProtoBroadcaster[M]) Broadcast(ctx context.Context, msg M) {
2835
select {
2936
case <-ctx.Done():
3037
return
3138
case b.ch <- msg:
3239
}
3340
}
3441

35-
func (b ProtoBroadcaster) Loop(ctx context.Context, topic *pubsub.Topic) {
42+
func (b ProtoBroadcaster[M]) Loop(ctx context.Context, topic *pubsub.Topic) {
3643
readinessOpt := pubsub.WithReadiness(pubsub.MinTopicSize(1))
3744
for {
3845
select {
@@ -53,6 +60,13 @@ func (b ProtoBroadcaster) Loop(ctx context.Context, topic *pubsub.Topic) {
5360
}
5461
break
5562
}
63+
b.rebroadcastStrategy.Receive(msg, msgBytes)
64+
case <-b.rebroadcastStrategy.RebroadcastTrigger():
65+
for msgBytes := range b.rebroadcastStrategy.GetRebroadcastedMessages() {
66+
if err := topic.Publish(ctx, msgBytes, readinessOpt); err != nil && !errors.Is(err, context.Canceled) {
67+
b.log.Errorw("unable to rebroadcast message", "error", err)
68+
}
69+
}
5670
}
5771
}
5872
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package buffered
2+
3+
import (
4+
"iter"
5+
"maps"
6+
"time"
7+
8+
"google.golang.org/protobuf/proto"
9+
)
10+
11+
type RebroadcastStrategy[M proto.Message] interface {
12+
Receive(M, []byte)
13+
GetRebroadcastedMessages() iter.Seq[[]byte]
14+
RebroadcastTrigger() <-chan time.Time
15+
}
16+
17+
type NoRebroadcast[M proto.Message] struct{}
18+
19+
func (s NoRebroadcast[M]) Receive(msg M, msgBytes []byte) {}
20+
21+
func (s NoRebroadcast[M]) RebroadcastTrigger() <-chan time.Time {
22+
return nil
23+
}
24+
25+
func (s NoRebroadcast[M]) GetRebroadcastedMessages() iter.Seq[[]byte] {
26+
return func(yield func([]byte) bool) {}
27+
}
28+
29+
type RebroadcastLatest[M proto.Message, K comparable] struct {
30+
rebroadcastInterval time.Duration
31+
getKey func(M) K
32+
cache map[K][]byte
33+
}
34+
35+
func NewRebroadcastLatest[M proto.Message, K comparable](rebroadcastInterval time.Duration, getKey func(M) K) RebroadcastLatest[M, K] {
36+
return RebroadcastLatest[M, K]{
37+
rebroadcastInterval: rebroadcastInterval,
38+
getKey: getKey,
39+
cache: make(map[K][]byte),
40+
}
41+
}
42+
43+
func (s RebroadcastLatest[M, K]) Receive(msg M, msgBytes []byte) {
44+
key := s.getKey(msg)
45+
s.cache[key] = msgBytes
46+
}
47+
48+
func (s RebroadcastLatest[M, K]) GetRebroadcastedMessages() iter.Seq[[]byte] {
49+
return maps.Values(s.cache)
50+
}
51+
52+
func (s RebroadcastLatest[M, K]) RebroadcastTrigger() <-chan time.Time {
53+
return time.After(s.rebroadcastInterval)
54+
}

consensus/p2p/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type BufferSizes struct {
2222
VoteProtoBroadcaster int
2323
PubSubQueueSize int
2424
RetryInterval time.Duration
25+
RebroadcastInterval time.Duration
2526
}
2627

2728
var DefaultBufferSizes = BufferSizes{
@@ -36,4 +37,5 @@ var DefaultBufferSizes = BufferSizes{
3637
VoteProtoBroadcaster: 1024,
3738
PubSubQueueSize: 1024,
3839
RetryInterval: 1 * time.Second,
40+
RebroadcastInterval: 5 * time.Second,
3941
}

consensus/p2p/p2p.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ func New(
7777
voteBroadcaster := vote.NewVoteBroadcaster(
7878
log,
7979
vote.StarknetVoteAdapter,
80-
bufferSizeConfig.VoteProtoBroadcaster,
81-
bufferSizeConfig.RetryInterval,
80+
bufferSizeConfig,
8281
)
8382
broadcasters := Broadcasters[starknet.Value, starknet.Hash, address.Address]{
8483
ProposalBroadcaster: &proposalBroadcaster,

consensus/p2p/proposer/proposal_broadcaster.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ import (
1010
"github.com/NethermindEth/juno/utils"
1111
pubsub "github.com/libp2p/go-libp2p-pubsub"
1212
"github.com/sourcegraph/conc"
13+
"github.com/starknet-io/starknet-p2pspecs/p2p/proto/consensus/consensus"
1314
)
1415

1516
type proposalBroadcaster[V types.Hashable[H], H types.Hash, A types.Addr] struct {
1617
log utils.Logger
1718
proposalAdapter ProposerAdapter[V, H, A]
1819
proposalStore *proposal.ProposalStore[H]
19-
broadcaster buffered.ProtoBroadcaster
20+
broadcaster buffered.ProtoBroadcaster[*consensus.StreamMessage]
2021
proposals chan types.Proposal[V, H, A]
2122
}
2223

@@ -27,11 +28,12 @@ func NewProposalBroadcaster[V types.Hashable[H], H types.Hash, A types.Addr](
2728
bufferSize int,
2829
retryInterval time.Duration,
2930
) proposalBroadcaster[V, H, A] {
31+
rebroadcastStrategy := buffered.NoRebroadcast[*consensus.StreamMessage]{}
3032
return proposalBroadcaster[V, H, A]{
3133
log: log,
3234
proposalAdapter: proposalAdapter,
3335
proposalStore: proposalStore,
34-
broadcaster: buffered.NewProtoBroadcaster(log, bufferSize, retryInterval),
36+
broadcaster: buffered.NewProtoBroadcaster(log, bufferSize, retryInterval, rebroadcastStrategy),
3537
proposals: make(chan types.Proposal[V, H, A], bufferSize),
3638
}
3739
}

consensus/p2p/vote/vote_broadcasters.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,36 @@ package vote
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/NethermindEth/juno/consensus/p2p/buffered"
7+
"github.com/NethermindEth/juno/consensus/p2p/config"
88
"github.com/NethermindEth/juno/consensus/types"
99
"github.com/NethermindEth/juno/utils"
1010
"github.com/starknet-io/starknet-p2pspecs/p2p/proto/consensus/consensus"
1111
)
1212

1313
type voteBroadcaster[H types.Hash, A types.Addr] struct {
14-
buffered.ProtoBroadcaster
14+
buffered.ProtoBroadcaster[*consensus.Vote]
1515
log utils.Logger
1616
voteAdapter VoteAdapter[H, A]
1717
}
1818

1919
func NewVoteBroadcaster[H types.Hash, A types.Addr](
2020
log utils.Logger,
2121
voteAdapter VoteAdapter[H, A],
22-
bufferSize int,
23-
retryInterval time.Duration,
22+
bufferSizeConfig *config.BufferSizes,
2423
) voteBroadcaster[H, A] {
2524
return voteBroadcaster[H, A]{
26-
log: log,
27-
voteAdapter: voteAdapter,
28-
ProtoBroadcaster: buffered.NewProtoBroadcaster(log, bufferSize, retryInterval),
25+
log: log,
26+
voteAdapter: voteAdapter,
27+
ProtoBroadcaster: buffered.NewProtoBroadcaster(
28+
log,
29+
bufferSizeConfig.VoteProtoBroadcaster,
30+
bufferSizeConfig.RetryInterval,
31+
buffered.NewRebroadcastLatest(bufferSizeConfig.RebroadcastInterval, func(msg *consensus.Vote) consensus.Vote_VoteType {
32+
return msg.VoteType
33+
}),
34+
),
2935
}
3036
}
3137

0 commit comments

Comments
 (0)