diff --git a/network/network.go b/network/network.go index 4a7930ca2796..21e7ee9c5955 100644 --- a/network/network.go +++ b/network/network.go @@ -99,6 +99,8 @@ type network struct { // messages ipSigner *ipSigner + outboundMsgThrottler throttling.OutboundMsgThrottler + // Limits the number of connection attempts based on IP. inboundConnUpgradeThrottler throttling.InboundConnUpgradeThrottler // Listens for and accepts new inbound connections @@ -203,7 +205,6 @@ func NewNetwork( MessageCreator: msgCreator, Log: log, InboundMsgThrottler: inboundMsgThrottler, - OutboundMsgThrottler: outboundMsgThrottler, Network: nil, // This is set below. Router: router, VersionCompatibility: version.GetCompatibility(config.NetworkID), @@ -219,10 +220,11 @@ func NewNetwork( } onCloseCtx, cancel := context.WithCancel(context.Background()) n := &network{ - config: config, - peerConfig: peerConfig, - metrics: metrics, - ipSigner: newIPSigner(&config.MyIP, &peerConfig.Clock, config.TLSKey), + config: config, + peerConfig: peerConfig, + metrics: metrics, + ipSigner: newIPSigner(&config.MyIP, &peerConfig.Clock, config.TLSKey), + outboundMsgThrottler: outboundMsgThrottler, inboundConnUpgradeThrottler: throttling.NewInboundConnUpgradeThrottler(log, config.ThrottlerConfig.InboundConnUpgradeThrottlerConfig), listener: listener, @@ -715,7 +717,7 @@ func (n *network) send(msg message.OutboundMessage, peers []peer.Peer) ids.NodeI // Add a reference to the message so that if it is sent, it won't be // collected until it is done being processed. msg.AddRef() - if peer.Send(msg) { + if peer.Send(n.onCloseCtx, msg) { sentTo.Add(peer.ID()) // TODO: move send fail rate calculations into the peer metrics @@ -982,7 +984,18 @@ func (n *network) upgrade(conn net.Conn, upgrader peer.Upgrader) error { n.peerConfig.Log.Verbo("starting handshake with %s", nodeID) - peer := peer.Start(n.peerConfig, tlsConn, cert, nodeID) + peer := peer.Start( + n.peerConfig, + tlsConn, + cert, + nodeID, + peer.NewThrottledMessageQueue( + n.peerConfig.Metrics, + nodeID, + n.peerConfig.Log, + n.outboundMsgThrottler, + ), + ) n.connectingPeers.Add(peer) return nil } diff --git a/network/peer/config.go b/network/peer/config.go index 22f190de7b18..6dfca0b3e850 100644 --- a/network/peer/config.go +++ b/network/peer/config.go @@ -27,7 +27,6 @@ type Config struct { MessageCreator message.Creator Log logging.Logger InboundMsgThrottler throttling.InboundMsgThrottler - OutboundMsgThrottler throttling.OutboundMsgThrottler Network Network Router router.InboundHandler VersionCompatibility version.Compatibility diff --git a/network/peer/message_queue.go b/network/peer/message_queue.go new file mode 100644 index 000000000000..092104c60747 --- /dev/null +++ b/network/peer/message_queue.go @@ -0,0 +1,284 @@ +// Copyright (C) 2019-2021, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package peer + +import ( + "context" + "sync" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/message" + "github.com/ava-labs/avalanchego/network/throttling" + "github.com/ava-labs/avalanchego/utils/logging" +) + +var ( + _ MessageQueue = &throttledMessageQueue{} + _ MessageQueue = &blockingMessageQueue{} +) + +type SendFailedCallback interface { + SendFailed(message.OutboundMessage) +} + +type SendFailedFunc func(message.OutboundMessage) + +func (f SendFailedFunc) SendFailed(msg message.OutboundMessage) { f(msg) } + +type MessageQueue interface { + // Push attempts to add the message to the queue. If the context is + // canceled, then pushing the message will return `false` and the message + // will not be added to the queue. + Push(ctx context.Context, msg message.OutboundMessage) bool + + // Pop blocks until a message is available and then returns the message. If + // the queue is closed, then `false` is returned. + Pop() (message.OutboundMessage, bool) + + // PopNow attempts to return a message without blocking. If a message is not + // available or the queue is closed, then `false` is returned. + PopNow() (message.OutboundMessage, bool) + + // Close empties the queue and prevents further messages from being pushed + // onto it. After calling close once, future calls to close will do nothing. + Close() +} + +type throttledMessageQueue struct { + onFailed SendFailedCallback + // [id] of the peer we're sending messages to + id ids.NodeID + log logging.Logger + outboundMsgThrottler throttling.OutboundMsgThrottler + + // Signalled when a message is added to the queue and when Close() is + // called. + cond *sync.Cond + + // closed flags whether the send queue has been closed. + // [cond.L] must be held while accessing [closed]. + closed bool + + // queue of the messages + // [cond.L] must be held while accessing [queue]. + queue []message.OutboundMessage +} + +func NewThrottledMessageQueue( + onFailed SendFailedCallback, + id ids.NodeID, + log logging.Logger, + outboundMsgThrottler throttling.OutboundMsgThrottler, +) MessageQueue { + return &throttledMessageQueue{ + onFailed: onFailed, + id: id, + log: log, + outboundMsgThrottler: outboundMsgThrottler, + + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (q *throttledMessageQueue) Push(ctx context.Context, msg message.OutboundMessage) bool { + if err := ctx.Err(); err != nil { + q.log.Debug( + "dropping %s message to %s due to a context error: %s", + msg.Op(), q.id, err, + ) + q.onFailed.SendFailed(msg) + return false + } + + // Acquire space on the outbound message queue, or drop [msg] if we can't. + if !q.outboundMsgThrottler.Acquire(msg, q.id) { + q.log.Debug( + "dropping %s message to %s due to rate-limiting", + msg.Op(), q.id, + ) + q.onFailed.SendFailed(msg) + return false + } + + // Invariant: must call q.outboundMsgThrottler.Release(msg, q.id) when [msg] + // is popped or, if this queue closes before [msg] is popped, when this + // queue closes. + + q.cond.L.Lock() + defer q.cond.L.Unlock() + + if q.closed { + q.log.Debug( + "dropping %s message to %s due to a closed queue", + msg.Op(), q.id, + ) + q.outboundMsgThrottler.Release(msg, q.id) + q.onFailed.SendFailed(msg) + return false + } + + q.queue = append(q.queue, msg) + q.cond.Signal() + return true +} + +func (q *throttledMessageQueue) Pop() (message.OutboundMessage, bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + for { + if q.closed { + return nil, false + } + if len(q.queue) > 0 { + // There is a message + break + } + // Wait until there is a message + q.cond.Wait() + } + + return q.pop(), true +} + +func (q *throttledMessageQueue) PopNow() (message.OutboundMessage, bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + if len(q.queue) == 0 { + // There isn't a message + return nil, false + } + + return q.pop(), true +} + +func (q *throttledMessageQueue) pop() message.OutboundMessage { + msg := q.queue[0] + q.queue[0] = nil + q.queue = q.queue[1:] + + q.outboundMsgThrottler.Release(msg, q.id) + return msg +} + +func (q *throttledMessageQueue) Close() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.closed = true + + for _, msg := range q.queue { + q.outboundMsgThrottler.Release(msg, q.id) + q.onFailed.SendFailed(msg) + } + q.queue = nil + + q.cond.Broadcast() +} + +type blockingMessageQueue struct { + onFailed SendFailedCallback + log logging.Logger + + closeOnce sync.Once + closingLock sync.RWMutex + closing chan struct{} + + // queue of the messages + queue chan message.OutboundMessage +} + +func NewBlockingMessageQueue( + onFailed SendFailedCallback, + log logging.Logger, + bufferSize int, +) MessageQueue { + return &blockingMessageQueue{ + onFailed: onFailed, + log: log, + + closing: make(chan struct{}), + queue: make(chan message.OutboundMessage, bufferSize), + } +} + +func (q *blockingMessageQueue) Push(ctx context.Context, msg message.OutboundMessage) bool { + q.closingLock.RLock() + defer q.closingLock.RUnlock() + + ctxDone := ctx.Done() + select { + case <-q.closing: + q.log.Debug( + "dropping %s message due to a closed queue", + msg.Op(), + ) + q.onFailed.SendFailed(msg) + return false + case <-ctxDone: + q.log.Debug( + "dropping %s message due to a cancelled context", + msg.Op(), + ) + q.onFailed.SendFailed(msg) + return false + default: + } + + select { + case q.queue <- msg: + return true + case <-ctxDone: + q.log.Debug( + "dropping %s message due to a cancelled context", + msg.Op(), + ) + q.onFailed.SendFailed(msg) + return false + case <-q.closing: + q.log.Debug( + "dropping %s message due to a closed queue", + msg.Op(), + ) + q.onFailed.SendFailed(msg) + return false + } +} + +func (q *blockingMessageQueue) Pop() (message.OutboundMessage, bool) { + select { + case msg := <-q.queue: + return msg, true + case <-q.closing: + return nil, false + } +} + +func (q *blockingMessageQueue) PopNow() (message.OutboundMessage, bool) { + select { + case msg := <-q.queue: + return msg, true + default: + return nil, false + } +} + +func (q *blockingMessageQueue) Close() { + q.closeOnce.Do(func() { + close(q.closing) + + q.closingLock.Lock() + defer q.closingLock.Unlock() + + for { + select { + case msg := <-q.queue: + q.onFailed.SendFailed(msg) + default: + return + } + } + }) +} diff --git a/network/peer/message_queue_test.go b/network/peer/message_queue_test.go new file mode 100644 index 000000000000..5db991e567d0 --- /dev/null +++ b/network/peer/message_queue_test.go @@ -0,0 +1,50 @@ +// Copyright (C) 2019-2021, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package peer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/ava-labs/avalanchego/message" + "github.com/ava-labs/avalanchego/utils/logging" +) + +func TestBlockingMessageQueue(t *testing.T) { + assert := assert.New(t) + + q := NewBlockingMessageQueue( + SendFailedFunc(func(msg message.OutboundMessage) { + t.Fail() + }), + logging.NoLog{}, + 0, + ) + + mc := newMessageCreator(t) + msg, err := mc.Ping() + assert.NoError(err) + + numToSend := 10 + go func() { + for i := 0; i < numToSend; i++ { + q.Push(context.Background(), msg) + } + }() + + for i := 0; i < numToSend; i++ { + _, ok := q.Pop() + assert.True(ok) + } + + _, ok := q.PopNow() + assert.False(ok) + + q.Close() + + _, ok = q.Pop() + assert.False(ok) +} diff --git a/network/peer/peer.go b/network/peer/peer.go index 3e6b2a3aac08..45c4e58cfb12 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -81,7 +81,7 @@ type Peer interface { // Send attempts to send [msg] to the peer. The peer takes ownership of // [msg] for reference counting. This returns false if the message is // guaranteed not to be delivered to the peer. - Send(msg message.OutboundMessage) bool + Send(ctx context.Context, msg message.OutboundMessage) bool // StartClose will begin shutting down the peer. It will not block. StartClose() @@ -109,6 +109,9 @@ type peer struct { // node ID of this peer. id ids.NodeID + // queue of messages to send to this peer. + messageQueue MessageQueue + // ip is the claimed IP the peer gave us in the Version message. ip *SignedIP // version is the claimed version the peer is running that we received in @@ -148,22 +151,6 @@ type peer struct { // onClosed is closed when the peer is closed onClosed chan struct{} - // Signalled when a message is added to [sendQueue], and when [p.closing] is - // set to true. [sendQueueCond.L] must be held when using [sendQueue] and - // [canSend]. - sendQueueCond *sync.Cond - - // closing flags whether the peer has started shutting down. - closing bool - - // canSend flags whether the send queue has been closed. This is separate - // from [closing] because it's possible for the send queue to be flushed - // before [StartClose] is called. - canSend bool - - // queue of the messages to be sent to this peer - sendQueue []message.OutboundMessage - // Unix time of the last message sent and received respectively // Must only be accessed atomically lastSent, lastReceived int64 @@ -174,6 +161,7 @@ func Start( conn net.Conn, cert *x509.Certificate, id ids.NodeID, + messageQueue MessageQueue, ) Peer { onClosingCtx, onClosingCtxCancel := context.WithCancel(context.Background()) p := &peer{ @@ -181,22 +169,16 @@ func Start( conn: conn, cert: cert, id: id, + messageQueue: messageQueue, onFinishHandshake: make(chan struct{}), numExecuting: 3, onClosingCtx: onClosingCtx, onClosingCtxCancel: onClosingCtxCancel, onClosed: make(chan struct{}), - sendQueueCond: sync.NewCond(&sync.Mutex{}), - canSend: true, } p.trackedSubnets.Add(constants.PrimaryNetworkID) - // Make sure that the version is the first message sent - msg, err := p.Network.Version() - p.Log.AssertNoError(err) - p.Send(msg) - go p.readMessages() go p.writeMessages() go p.sendPings() @@ -265,36 +247,8 @@ func (p *peer) ObservedUptime() uint8 { return uptime } -func (p *peer) Send(msg message.OutboundMessage) bool { - // Acquire space on the outbound message queue, or drop [msg] if we can't. - if !p.OutboundMsgThrottler.Acquire(msg, p.id) { - p.Log.Debug( - "dropping %s message to %s due to rate-limiting", - msg.Op(), p.id, - ) - p.Metrics.SendFailed(msg) - return false - } - - // Invariant: must call p.outboundMsgThrottler.Release(msg, p.id) when done - // sending [msg] or when we give up sending [msg]. - - p.sendQueueCond.L.Lock() - defer p.sendQueueCond.L.Unlock() - - if !p.canSend { - p.Log.Debug( - "dropping %s message to %s due to a closed connection", - msg.Op(), p.id, - ) - p.OutboundMsgThrottler.Release(msg, p.id) - p.Metrics.SendFailed(msg) - return false - } - - p.sendQueue = append(p.sendQueue, msg) - p.sendQueueCond.Signal() - return true +func (p *peer) Send(ctx context.Context, msg message.OutboundMessage) bool { + return p.messageQueue.Push(ctx, msg) } func (p *peer) StartClose() { @@ -306,15 +260,7 @@ func (p *peer) StartClose() { ) } - // The lock is grabbed here to avoid any potential race conditions - // causing the [Broadcast] to be dropped. - p.sendQueueCond.L.Lock() - p.closing = true - // Per [p.sendQueueCond]'s spec, it is signalled when [p.closing] is set - // to true so that we exit the WriteMessages goroutine. - p.sendQueueCond.Broadcast() - p.sendQueueCond.L.Unlock() - + p.messageQueue.Close() p.onClosingCtxCancel() }) } @@ -466,118 +412,77 @@ func (p *peer) readMessages() { func (p *peer) writeMessages() { defer func() { - // Release the bytes of the unsent messages to the outbound message - // throttler - p.sendQueueCond.L.Lock() - p.canSend = false - for _, msg := range p.sendQueue { - p.OutboundMsgThrottler.Release(msg, p.id) - p.Metrics.SendFailed(msg) - } - p.sendQueue = nil - p.sendQueueCond.L.Unlock() - p.StartClose() p.close() }() writer := bufio.NewWriterSize(p.conn, p.Config.WriteBufferSize) - for { // When this loop exits, p.sendQueueCond.L is unlocked - msg, ok := p.nextMessageWithoutBlocking() - if !ok { - // Make sure the peer was fully sent all prior messages before - // blocking. - if err := writer.Flush(); err != nil { - p.Log.Verbo( - "couldn't flush writer to %s: %s", - p.id, err, - ) - return - } - msg, ok = p.nextMessageWithBlocking() - if !ok { - // This peer is closing - return - } - } - msgBytes := msg.Bytes() - p.Log.Verbo( - "sending message to %s:\n%s", - p.id, formatting.DumpBytes(msgBytes), - ) + // Make sure that the version is the first message sent + msg, err := p.Network.Version() + p.Log.AssertNoError(err) - msgLen := uint32(len(msgBytes)) - msgLenBytes := [wrappers.IntLen]byte{} - binary.BigEndian.PutUint32(msgLenBytes[:], msgLen) + p.writeMessage(writer, msg) - if err := p.conn.SetWriteDeadline(p.nextTimeout()); err != nil { + for { + msg, ok := p.messageQueue.PopNow() + if ok { + p.writeMessage(writer, msg) + continue + } + + // Make sure the peer was fully sent all prior messages before + // blocking. + if err := writer.Flush(); err != nil { p.Log.Verbo( - "error setting write deadline to %s due to: %s", + "couldn't flush writer to %s: %s", p.id, err, ) - p.OutboundMsgThrottler.Release(msg, p.id) - msg.DecRef() return } - // Write the message - var buf net.Buffers = [][]byte{msgLenBytes[:], msgBytes} - if _, err := io.CopyN(writer, &buf, int64(wrappers.IntLen+msgLen)); err != nil { - p.Log.Verbo("error writing to %s: %s", p.id, err) - p.OutboundMsgThrottler.Release(msg, p.id) - msg.DecRef() + msg, ok = p.messageQueue.Pop() + if !ok { + // This peer is closing return } - p.OutboundMsgThrottler.Release(msg, p.id) - - now := p.Clock.Time().Unix() - atomic.StoreInt64(&p.Config.LastSent, now) - atomic.StoreInt64(&p.lastSent, now) - p.Metrics.Sent(msg) + p.writeMessage(writer, msg) } } -// Returns the next message to send to this peer. -// If there is no message to send or the peer is closing, returns false. -func (p *peer) nextMessageWithoutBlocking() (message.OutboundMessage, bool) { - p.sendQueueCond.L.Lock() - defer p.sendQueueCond.L.Unlock() +func (p *peer) writeMessage(writer io.Writer, msg message.OutboundMessage) { + msgBytes := msg.Bytes() + p.Log.Verbo( + "sending message to %s:\n%s", + p.id, formatting.DumpBytes(msgBytes), + ) - if len(p.sendQueue) == 0 || p.closing { - // There isn't a message to send or the peer is closing. - return nil, false - } + msgLen := uint32(len(msgBytes)) + msgLenBytes := [wrappers.IntLen]byte{} + binary.BigEndian.PutUint32(msgLenBytes[:], msgLen) - msg := p.sendQueue[0] - p.sendQueue[0] = nil - p.sendQueue = p.sendQueue[1:] - return msg, true -} - -// Blocks until there is a message to send to this peer, then returns it. -// Returns false if the peer is closing. -func (p *peer) nextMessageWithBlocking() (message.OutboundMessage, bool) { - p.sendQueueCond.L.Lock() - defer p.sendQueueCond.L.Unlock() + if err := p.conn.SetWriteDeadline(p.nextTimeout()); err != nil { + p.Log.Verbo( + "error setting write deadline to %s due to: %s", + p.id, err, + ) + msg.DecRef() + return + } - for { - if p.closing { - return nil, false - } - if len(p.sendQueue) > 0 { - // There is a message to send - break - } - // Wait until there is a message to send - p.sendQueueCond.Wait() + // Write the message + var buf net.Buffers = [][]byte{msgLenBytes[:], msgBytes} + if _, err := io.CopyN(writer, &buf, int64(wrappers.IntLen+msgLen)); err != nil { + p.Log.Verbo("error writing to %s: %s", p.id, err) + msg.DecRef() + return } - msg := p.sendQueue[0] - p.sendQueue[0] = nil - p.sendQueue = p.sendQueue[1:] - return msg, true + now := p.Clock.Time().Unix() + atomic.StoreInt64(&p.Config.LastSent, now) + atomic.StoreInt64(&p.lastSent, now) + p.Metrics.Sent(msg) } func (p *peer) sendPings() { @@ -612,7 +517,7 @@ func (p *peer) sendPings() { msg, err := p.MessageCreator.Ping() p.Log.AssertNoError(err) - p.Send(msg) + p.Send(p.onClosingCtx, msg) case <-p.onClosingCtx.Done(): return } @@ -655,7 +560,7 @@ func (p *peer) handle(msg message.InboundMessage) { func (p *peer) handlePing(_ message.InboundMessage) { msg, err := p.Network.Pong(p.id) p.Log.AssertNoError(err) - p.Send(msg) + p.Send(p.onClosingCtx, msg) } func (p *peer) handlePong(msg message.InboundMessage) { @@ -788,7 +693,7 @@ func (p *peer) handleVersion(msg message.InboundMessage) { peerlistMsg, err := p.Network.Peers() p.Log.AssertNoError(err) - p.Send(peerlistMsg) + p.Send(p.onClosingCtx, peerlistMsg) } func (p *peer) handlePeerList(msg message.InboundMessage) { diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index c43b339a1901..0c95cacbd068 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -86,7 +86,6 @@ func makeRawTestPeers(t *testing.T) (*rawTestPeer, *rawTestPeer) { MessageCreator: mc, Log: logging.NoLog{}, InboundMsgThrottler: throttling.NewNoInboundThrottler(), - OutboundMsgThrottler: throttling.NewNoOutboundThrottler(), VersionCompatibility: version.GetCompatibility(constants.LocalID), VersionParser: version.DefaultApplicationParser, MySubnets: ids.Set{}, @@ -173,6 +172,12 @@ func makeTestPeers(t *testing.T) (*testPeer, *testPeer) { rawPeer0.conn, rawPeer1.cert, rawPeer1.nodeID, + NewThrottledMessageQueue( + rawPeer0.config.Metrics, + rawPeer1.nodeID, + logging.NoLog{}, + throttling.NewNoOutboundThrottler(), + ), ), inboundMsgChan: rawPeer0.inboundMsgChan, } @@ -182,6 +187,12 @@ func makeTestPeers(t *testing.T) (*testPeer, *testPeer) { rawPeer1.conn, rawPeer0.cert, rawPeer0.nodeID, + NewThrottledMessageQueue( + rawPeer1.config.Metrics, + rawPeer0.nodeID, + logging.NoLog{}, + throttling.NewNoOutboundThrottler(), + ), ), inboundMsgChan: rawPeer1.inboundMsgChan, } @@ -217,6 +228,12 @@ func TestReady(t *testing.T) { rawPeer0.conn, rawPeer1.cert, rawPeer1.nodeID, + NewThrottledMessageQueue( + rawPeer0.config.Metrics, + rawPeer1.nodeID, + logging.NoLog{}, + throttling.NewNoOutboundThrottler(), + ), ) isReady := peer0.Ready() @@ -227,6 +244,12 @@ func TestReady(t *testing.T) { rawPeer1.conn, rawPeer0.cert, rawPeer0.nodeID, + NewThrottledMessageQueue( + rawPeer1.config.Metrics, + rawPeer0.nodeID, + logging.NoLog{}, + throttling.NewNoOutboundThrottler(), + ), ) err := peer0.AwaitReady(context.Background()) @@ -255,7 +278,7 @@ func TestSend(t *testing.T) { outboundGetMsg, err := mc.Get(ids.Empty, 1, time.Second, ids.Empty) assert.NoError(err) - sent := peer0.Send(outboundGetMsg) + sent := peer0.Send(context.Background(), outboundGetMsg) assert.True(sent) inboundGetMsg := <-peer1.inboundMsgChan diff --git a/network/peer/test_peer.go b/network/peer/test_peer.go index 6e64b3f55017..dfd3706166a4 100644 --- a/network/peer/test_peer.go +++ b/network/peer/test_peer.go @@ -26,6 +26,8 @@ import ( "github.com/ava-labs/avalanchego/version" ) +const maxMessageToSend = 1024 + // StartTestPeer provides a simple interface to create a peer that has finished // the p2p handshake. // @@ -94,11 +96,10 @@ func StartTestPeer( } peer := Start( &Config{ - Metrics: metrics, - MessageCreator: mc, - Log: logging.NoLog{}, - InboundMsgThrottler: throttling.NewNoInboundThrottler(), - OutboundMsgThrottler: throttling.NewNoOutboundThrottler(), + Metrics: metrics, + MessageCreator: mc, + Log: logging.NoLog{}, + InboundMsgThrottler: throttling.NewNoInboundThrottler(), Network: NewTestNetwork( mc, networkID, @@ -118,10 +119,24 @@ func StartTestPeer( PongTimeout: constants.DefaultPingPongTimeout, MaxClockDifference: time.Minute, CPUTracker: cpuTracker, + CPUTargeter: tracker.NewCPUTargeter( + &tracker.CPUTargeterConfig{ + VdrCPUAlloc: 10, + MaxNonVdrUsage: 10, + MaxNonVdrNodeUsage: 10, + }, + validators.NewSet(), + cpuTracker, + ), }, conn, cert, peerID, + NewBlockingMessageQueue( + metrics, + logging.NoLog{}, + maxMessageToSend, + ), ) return peer, peer.AwaitReady(ctx) }