Skip to content

Commit

Permalink
Factor out message queue from peer implementation (ava-labs#1484)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored May 18, 2022
1 parent 2fd7d90 commit e5f9ae4
Show file tree
Hide file tree
Showing 7 changed files with 457 additions and 168 deletions.
27 changes: 20 additions & 7 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type network struct {
// messages
ipSigner *ipSigner

outboundMsgThrottler throttling.OutboundMsgThrottler

// Limits the number of connection attempts based on IP.
inboundConnUpgradeThrottler throttling.InboundConnUpgradeThrottler
// Listens for and accepts new inbound connections
Expand Down Expand Up @@ -203,7 +205,6 @@ func NewNetwork(
MessageCreator: msgCreator,
Log: log,
InboundMsgThrottler: inboundMsgThrottler,
OutboundMsgThrottler: outboundMsgThrottler,
Network: nil, // This is set below.
Router: router,
VersionCompatibility: version.GetCompatibility(config.NetworkID),
Expand All @@ -219,10 +220,11 @@ func NewNetwork(
}
onCloseCtx, cancel := context.WithCancel(context.Background())
n := &network{
config: config,
peerConfig: peerConfig,
metrics: metrics,
ipSigner: newIPSigner(&config.MyIP, &peerConfig.Clock, config.TLSKey),
config: config,
peerConfig: peerConfig,
metrics: metrics,
ipSigner: newIPSigner(&config.MyIP, &peerConfig.Clock, config.TLSKey),
outboundMsgThrottler: outboundMsgThrottler,

inboundConnUpgradeThrottler: throttling.NewInboundConnUpgradeThrottler(log, config.ThrottlerConfig.InboundConnUpgradeThrottlerConfig),
listener: listener,
Expand Down Expand Up @@ -715,7 +717,7 @@ func (n *network) send(msg message.OutboundMessage, peers []peer.Peer) ids.NodeI
// Add a reference to the message so that if it is sent, it won't be
// collected until it is done being processed.
msg.AddRef()
if peer.Send(msg) {
if peer.Send(n.onCloseCtx, msg) {
sentTo.Add(peer.ID())

// TODO: move send fail rate calculations into the peer metrics
Expand Down Expand Up @@ -982,7 +984,18 @@ func (n *network) upgrade(conn net.Conn, upgrader peer.Upgrader) error {

n.peerConfig.Log.Verbo("starting handshake with %s", nodeID)

peer := peer.Start(n.peerConfig, tlsConn, cert, nodeID)
peer := peer.Start(
n.peerConfig,
tlsConn,
cert,
nodeID,
peer.NewThrottledMessageQueue(
n.peerConfig.Metrics,
nodeID,
n.peerConfig.Log,
n.outboundMsgThrottler,
),
)
n.connectingPeers.Add(peer)
return nil
}
Expand Down
1 change: 0 additions & 1 deletion network/peer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type Config struct {
MessageCreator message.Creator
Log logging.Logger
InboundMsgThrottler throttling.InboundMsgThrottler
OutboundMsgThrottler throttling.OutboundMsgThrottler
Network Network
Router router.InboundHandler
VersionCompatibility version.Compatibility
Expand Down
284 changes: 284 additions & 0 deletions network/peer/message_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
// Copyright (C) 2019-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package peer

import (
"context"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/network/throttling"
"github.com/ava-labs/avalanchego/utils/logging"
)

var (
_ MessageQueue = &throttledMessageQueue{}
_ MessageQueue = &blockingMessageQueue{}
)

type SendFailedCallback interface {
SendFailed(message.OutboundMessage)
}

type SendFailedFunc func(message.OutboundMessage)

func (f SendFailedFunc) SendFailed(msg message.OutboundMessage) { f(msg) }

type MessageQueue interface {
// Push attempts to add the message to the queue. If the context is
// canceled, then pushing the message will return `false` and the message
// will not be added to the queue.
Push(ctx context.Context, msg message.OutboundMessage) bool

// Pop blocks until a message is available and then returns the message. If
// the queue is closed, then `false` is returned.
Pop() (message.OutboundMessage, bool)

// PopNow attempts to return a message without blocking. If a message is not
// available or the queue is closed, then `false` is returned.
PopNow() (message.OutboundMessage, bool)

// Close empties the queue and prevents further messages from being pushed
// onto it. After calling close once, future calls to close will do nothing.
Close()
}

type throttledMessageQueue struct {
onFailed SendFailedCallback
// [id] of the peer we're sending messages to
id ids.NodeID
log logging.Logger
outboundMsgThrottler throttling.OutboundMsgThrottler

// Signalled when a message is added to the queue and when Close() is
// called.
cond *sync.Cond

// closed flags whether the send queue has been closed.
// [cond.L] must be held while accessing [closed].
closed bool

// queue of the messages
// [cond.L] must be held while accessing [queue].
queue []message.OutboundMessage
}

func NewThrottledMessageQueue(
onFailed SendFailedCallback,
id ids.NodeID,
log logging.Logger,
outboundMsgThrottler throttling.OutboundMsgThrottler,
) MessageQueue {
return &throttledMessageQueue{
onFailed: onFailed,
id: id,
log: log,
outboundMsgThrottler: outboundMsgThrottler,

cond: sync.NewCond(&sync.Mutex{}),
}
}

func (q *throttledMessageQueue) Push(ctx context.Context, msg message.OutboundMessage) bool {
if err := ctx.Err(); err != nil {
q.log.Debug(
"dropping %s message to %s due to a context error: %s",
msg.Op(), q.id, err,
)
q.onFailed.SendFailed(msg)
return false
}

// Acquire space on the outbound message queue, or drop [msg] if we can't.
if !q.outboundMsgThrottler.Acquire(msg, q.id) {
q.log.Debug(
"dropping %s message to %s due to rate-limiting",
msg.Op(), q.id,
)
q.onFailed.SendFailed(msg)
return false
}

// Invariant: must call q.outboundMsgThrottler.Release(msg, q.id) when [msg]
// is popped or, if this queue closes before [msg] is popped, when this
// queue closes.

q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.closed {
q.log.Debug(
"dropping %s message to %s due to a closed queue",
msg.Op(), q.id,
)
q.outboundMsgThrottler.Release(msg, q.id)
q.onFailed.SendFailed(msg)
return false
}

q.queue = append(q.queue, msg)
q.cond.Signal()
return true
}

func (q *throttledMessageQueue) Pop() (message.OutboundMessage, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

for {
if q.closed {
return nil, false
}
if len(q.queue) > 0 {
// There is a message
break
}
// Wait until there is a message
q.cond.Wait()
}

return q.pop(), true
}

func (q *throttledMessageQueue) PopNow() (message.OutboundMessage, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if len(q.queue) == 0 {
// There isn't a message
return nil, false
}

return q.pop(), true
}

func (q *throttledMessageQueue) pop() message.OutboundMessage {
msg := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]

q.outboundMsgThrottler.Release(msg, q.id)
return msg
}

func (q *throttledMessageQueue) Close() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.closed = true

for _, msg := range q.queue {
q.outboundMsgThrottler.Release(msg, q.id)
q.onFailed.SendFailed(msg)
}
q.queue = nil

q.cond.Broadcast()
}

type blockingMessageQueue struct {
onFailed SendFailedCallback
log logging.Logger

closeOnce sync.Once
closingLock sync.RWMutex
closing chan struct{}

// queue of the messages
queue chan message.OutboundMessage
}

func NewBlockingMessageQueue(
onFailed SendFailedCallback,
log logging.Logger,
bufferSize int,
) MessageQueue {
return &blockingMessageQueue{
onFailed: onFailed,
log: log,

closing: make(chan struct{}),
queue: make(chan message.OutboundMessage, bufferSize),
}
}

func (q *blockingMessageQueue) Push(ctx context.Context, msg message.OutboundMessage) bool {
q.closingLock.RLock()
defer q.closingLock.RUnlock()

ctxDone := ctx.Done()
select {
case <-q.closing:
q.log.Debug(
"dropping %s message due to a closed queue",
msg.Op(),
)
q.onFailed.SendFailed(msg)
return false
case <-ctxDone:
q.log.Debug(
"dropping %s message due to a cancelled context",
msg.Op(),
)
q.onFailed.SendFailed(msg)
return false
default:
}

select {
case q.queue <- msg:
return true
case <-ctxDone:
q.log.Debug(
"dropping %s message due to a cancelled context",
msg.Op(),
)
q.onFailed.SendFailed(msg)
return false
case <-q.closing:
q.log.Debug(
"dropping %s message due to a closed queue",
msg.Op(),
)
q.onFailed.SendFailed(msg)
return false
}
}

func (q *blockingMessageQueue) Pop() (message.OutboundMessage, bool) {
select {
case msg := <-q.queue:
return msg, true
case <-q.closing:
return nil, false
}
}

func (q *blockingMessageQueue) PopNow() (message.OutboundMessage, bool) {
select {
case msg := <-q.queue:
return msg, true
default:
return nil, false
}
}

func (q *blockingMessageQueue) Close() {
q.closeOnce.Do(func() {
close(q.closing)

q.closingLock.Lock()
defer q.closingLock.Unlock()

for {
select {
case msg := <-q.queue:
q.onFailed.SendFailed(msg)
default:
return
}
}
})
}
Loading

0 comments on commit e5f9ae4

Please sign in to comment.