Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6f5c326
Add initial support for latency tracking.
tsachiherman Oct 9, 2021
6ede2cc
update unit tests
tsachiherman Oct 9, 2021
2a200fe
Merge branch 'master' into tsachi/measurelatency
tsachiherman Oct 9, 2021
b3d2009
add missing import
tsachiherman Oct 9, 2021
6512348
Merge branch 'master' into tsachi/measurelatency
tsachiherman Oct 11, 2021
126cbf3
hard-code number
tsachiherman Oct 11, 2021
62d06c1
update
tsachiherman Oct 13, 2021
2097a91
add missing file.
tsachiherman Oct 13, 2021
0e26994
update
tsachiherman Oct 13, 2021
c2f887c
update config.
tsachiherman Oct 13, 2021
c411372
update
tsachiherman Oct 13, 2021
2474b1a
add missing file.
tsachiherman Oct 13, 2021
9289343
add logging
nicholasguo Oct 11, 2021
b7eb591
changes
nicholasguo Oct 14, 2021
4a4c87f
subtract latency
nicholasguo Oct 12, 2021
48a8058
fix
nicholasguo Oct 12, 2021
133b572
remove logging
nicholasguo Oct 14, 2021
56074a4
netowrk
nicholasguo Oct 14, 2021
22e46f3
decrease significantMessageThreshold
nicholasguo Oct 15, 2021
a2d6b7d
tighter timing
nicholasguo Oct 14, 2021
e7d326d
updates
nicholasguo Oct 15, 2021
ff71170
loosen thresh
nicholasguo Oct 18, 2021
0fb4c2d
modify log
nicholasguo Oct 18, 2021
8ea96f1
logging
nicholasguo Oct 19, 2021
6aa493e
Merge branch 'master' into nguo/measurelatency
nicholasguo Oct 19, 2021
077e4f3
undo changes
nicholasguo Oct 19, 2021
5a620ae
fix
nicholasguo Oct 19, 2021
b3754aa
modify tests
nicholasguo Oct 20, 2021
7eff12b
make unit tests work
nicholasguo Oct 20, 2021
a46a65d
resolve comment
nicholasguo Oct 20, 2021
8332fd0
use msg received time
nicholasguo Oct 21, 2021
f047181
add log
nicholasguo Oct 21, 2021
3a1a43e
remove a bunch of logging
nicholasguo Oct 22, 2021
4b0284f
address comments
nicholasguo Oct 22, 2021
48bce4d
refactor
nicholasguo Oct 22, 2021
364e328
Revert "remove a bunch of logging"
nicholasguo Oct 22, 2021
2fc552f
Revert "Revert "remove a bunch of logging""
nicholasguo Oct 23, 2021
90ab7de
fix test
nicholasguo Oct 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion node/txnSyncConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func (tsnc *transactionSyncNodeConnector) SendPeerMessage(netPeer interface{}, m
}
}

func (tsnc *transactionSyncNodeConnector) GetPeerLatency(netPeer interface{}) time.Duration {
unicastPeer := netPeer.(network.UnicastPeer)
return unicastPeer.GetConnectionLatency()
}

// GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction
// pool and get the updated set of pending transactions. The second returned argument is the latest locally originated
// group counter within the given transaction groups list. If there is no group that is locally originated, the expected
Expand Down Expand Up @@ -215,7 +220,7 @@ func (tsnc *transactionSyncNodeConnector) Handle(raw network.IncomingMessage) ne
peer = peerData.(*txnsync.Peer)
}

err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence)
err := tsnc.messageHandler(raw.Sender, peer, raw.Data, raw.Sequence, raw.Received)
if err != nil {
return network.OutgoingMessage{
Action: network.Disconnect,
Expand Down
6 changes: 6 additions & 0 deletions txnsync/bloomFilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/binary"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -357,6 +358,11 @@ func (fn *justRandomFakeNode) UpdatePeers(txsyncPeers []*Peer, netPeers []interf
}
func (fn *justRandomFakeNode) SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback) {
}

func (fn *justRandomFakeNode) GetPeerLatency(netPeer interface{}) time.Duration {
return 0
}

func (fn *justRandomFakeNode) GetPendingTransactionGroups() (txGroups []pooldata.SignedTxGroup, latestLocallyOriginatedGroupCounter uint64) {
return
}
Expand Down
6 changes: 5 additions & 1 deletion txnsync/emulatorNode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ func (n *emulatedNode) SendPeerMessage(netPeer interface{}, msg []byte, callback
peer.outSeq++
}

func (n *emulatedNode) GetPeerLatency(netPeer interface{}) time.Duration {
return 0
}

func (n *emulatedNode) GetPendingTransactionGroups() ([]pooldata.SignedTxGroup, uint64) {
return n.txpoolEntries, n.latestLocallyOriginatedGroupCounter
}
Expand Down Expand Up @@ -356,7 +360,7 @@ func (n *emulatedNode) step() {

peer.mu.Unlock()

msgHandler(peer, peer.peer, msgBytes, msgInSeq)
msgHandler(peer, peer.peer, msgBytes, msgInSeq, 0)
n.unblock()
n.waitBlocked()
peer.mu.Lock()
Expand Down
13 changes: 9 additions & 4 deletions txnsync/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type incomingMessage struct {
encodedSize int // the byte length of the incoming network message
bloomFilter *testableBloomFilter
transactionGroups []pooldata.SignedTxGroup
timeReceived int64
}

// incomingMessageQueue manages the global incoming message queue across all the incoming peers.
Expand Down Expand Up @@ -108,7 +109,7 @@ func (imq *incomingMessageQueue) clear(m incomingMessage) {

// incomingMessageHandler
// note - this message is called by the network go-routine dispatch pool, and is not synchronized with the rest of the transaction synchronizer
func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) (err error) {
func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) (err error) {
// increase number of incoming messages metric.
txsyncIncomingMessagesTotal.Inc(nil)

Expand All @@ -120,7 +121,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
}
}()

incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer}
incomingMessage := incomingMessage{networkPeer: networkPeer, sequenceNumber: sequenceNumber, encodedSize: len(message), peer: peer, timeReceived: receivedTimestamp}
_, err = incomingMessage.message.UnmarshalMsg(message)
if err != nil {
// if we received a message that we cannot parse, disconnect.
Expand Down Expand Up @@ -194,7 +195,7 @@ func (s *syncState) evaluateIncomingMessage(message incomingMessage) {
}
if peerInfo.TxnSyncPeer == nil {
// we couldn't really do much about this message previously, since we didn't have the peer.
peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log)
peer = makePeer(message.networkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(message.networkPeer))
// let the network peer object know about our peer
s.node.UpdatePeers([]*Peer{peer}, []interface{}{message.networkPeer}, 0)
} else {
Expand Down Expand Up @@ -249,7 +250,11 @@ incomingMessageLoop:
}

peer.updateRequestParams(incomingMsg.message.UpdatedRequestParams.Modulator, incomingMsg.message.UpdatedRequestParams.Offset)
peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), incomingMsg.encodedSize)
timeInQueue := time.Duration(0)
if incomingMsg.timeReceived > 0 {
timeInQueue = time.Since(time.Unix(0, incomingMsg.timeReceived))
}
peer.updateIncomingMessageTiming(incomingMsg.message.MsgSync, s.round, s.clock.Since(), timeInQueue, peer.cachedLatency, incomingMsg.encodedSize)

// if the peer's round is more than a single round behind the local node, then we don't want to
// try and load the transactions. The other peer should first catch up before getting transactions.
Expand Down
21 changes: 11 additions & 10 deletions txnsync/incoming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,28 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
cfg := config.GetDefaultLocal()
mNodeConnector := &mockNodeConnector{transactionPoolSize: 3}
s := syncState{
log: wrapLogger(&incLogger, &cfg),
node: mNodeConnector,
log: wrapLogger(&incLogger, &cfg),
node: mNodeConnector,
clock: mNodeConnector.Clock(),
}

// expect UnmarshalMsg error
messageBytes[0] = 0
err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err := s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
msgpe := msgp.TypeError{}
require.True(t, errors.As(err, &msgpe))

// expect wrong version error
message = transactionBlockMessage{Version: -3}
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errUnsupportedTransactionSyncMessageVersion, err)

// expect error decoding bloomFilter
message.Version = 1
message.TxnBloomFilter.BloomFilterType = byte(multiHashBloomFilter)
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errInvalidBloomFilter, err)

// error decoding transaction groups
Expand All @@ -89,33 +90,33 @@ func TestAsyncIncomingMessageHandlerAndErrors(t *testing.T) {
require.NoError(t, err)
message.TransactionGroups = packedTransactionGroups{Bytes: []byte{1}}
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errDecodingReceivedTransactionGroupsFailed, err)

// error queue full
message.TransactionGroups = packedTransactionGroups{}
messageBytes = message.MarshalMsg(nil)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err)

// Success where peer == nil
s.incomingMessagesQ = makeIncomingMessageQueue()
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, nil, messageBytes, sequenceNumber, 0)
require.NoError(t, err)

peer := Peer{}

// error when placing the peer message on the main queue (incomingMessages cannot accept messages)
s.incomingMessagesQ = incomingMessageQueue{}
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0)
require.Equal(t, errTransactionSyncIncomingMessageQueueFull, err)

s.incomingMessagesQ = makeIncomingMessageQueue()
err = nil
// fill up the incoming message queue (one was already added)
for x := 1; x <= messageOrderingHeapLimit; x++ {
require.NoError(t, err)
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber)
err = s.asyncIncomingMessageHandler(nil, &peer, messageBytes, sequenceNumber, 0)
}
require.Equal(t, errHeapReachedCapacity, err)
}
Expand Down
5 changes: 4 additions & 1 deletion txnsync/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package txnsync

import (
"time"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/pooldata"
"github.com/algorand/go-algorand/util/timers"
Expand Down Expand Up @@ -46,7 +48,7 @@ type Event struct {
}

// IncomingMessageHandler is the signature of the incoming message handler used by the transaction sync to receive network messages
type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64) error
type IncomingMessageHandler func(networkPeer interface{}, peer *Peer, message []byte, sequenceNumber uint64, receivedTimestamp int64) error

// SendMessageCallback define a message sent feedback for performing message tracking
type SendMessageCallback func(enqueued bool, sequenceNumber uint64) error
Expand Down Expand Up @@ -79,6 +81,7 @@ type NodeConnector interface {
// across all the connected peers.
UpdatePeers(txsyncPeers []*Peer, netPeers []interface{}, peersAverageDataExchangeRate uint64)
SendPeerMessage(netPeer interface{}, msg []byte, callback SendMessageCallback)
GetPeerLatency(netPeer interface{}) time.Duration
// GetPendingTransactionGroups is called by the transaction sync when it needs to look into the transaction
// pool and get the updated set of pending transactions. The second returned argument is the latest locally originated
// group counter within the given transaction groups list. If there is no group that is locally originated, the expected
Expand Down
9 changes: 8 additions & 1 deletion txnsync/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ func (s *syncState) onNewRoundEvent(ent Event) {
if !s.isRelay {
s.nextOffsetRollingCh = s.clock.TimeoutAt(kickoffTime + 2*s.lastBeta)
}
s.updatePeersLatency(peers)
s.updatePeersRequestParams(peers)
}

Expand Down Expand Up @@ -395,7 +396,7 @@ func (s *syncState) getPeers() (result []*Peer) {
// some of the network peers might not have a sync peer, so we need to create one for these.
for _, peerInfo := range peersInfo {
if peerInfo.TxnSyncPeer == nil {
syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log)
syncPeer := makePeer(peerInfo.NetworkPeer, peerInfo.IsOutgoing, s.isRelay, &s.config, s.log, s.node.GetPeerLatency(peerInfo.NetworkPeer))
peerInfo.TxnSyncPeer = syncPeer
updatedNetworkPeers = append(updatedNetworkPeers, peerInfo.NetworkPeer)
updatedNetworkPeersSync = append(updatedNetworkPeersSync, syncPeer)
Expand Down Expand Up @@ -435,3 +436,9 @@ func (s *syncState) updatePeersRequestParams(peers []*Peer) {
}
}
}

func (s *syncState) updatePeersLatency(peers []*Peer) {
for _, peer := range peers {
peer.cachedLatency = s.node.GetPeerLatency(peer.networkPeer)
}
}
25 changes: 16 additions & 9 deletions txnsync/outgoing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ type sentMessageMetadata struct {
// could be a lengthy operation which does't need to be blocking the main loop. Moving the actual encoding into an
// execution pool thread frees up the main loop, allowing smoother operation.
type messageAsyncEncoder struct {
state *syncState
messageData sentMessageMetadata
roundClock timers.WallClock
peerDataExchangeRate uint64
state *syncState
messageData sentMessageMetadata
roundClock timers.WallClock
lastReceivedMessageTimestamp time.Duration
peerDataExchangeRate uint64
// sentMessagesCh is a copy of the outgoingMessagesCallbackCh in the syncState object. We want to create a copy of
// the channel so that in case of a txnsync restart ( i.e. fast catchup ), we can still generate a new channel
// without triggering a data race. The alternative is to block the txnsync.Shutdown() until we receive the feedback
Expand Down Expand Up @@ -94,6 +95,12 @@ func (encoder *messageAsyncEncoder) asyncEncodeAndSend(interface{}) interface{}
encoder.messageData.transactionGroups = nil // clear out to allow GC to reclaim
}

if encoder.lastReceivedMessageTimestamp >= 0 {
// adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that
// the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set"
encoder.messageData.message.MsgSync.ResponseElapsedTime = uint64((encoder.roundClock.Since() - encoder.lastReceivedMessageTimestamp).Nanoseconds())
}

encodedMessage := encoder.messageData.message.MarshalMsg(getMessageBuffer())
encoder.messageData.encodedMessageSize = len(encodedMessage)
// now that the message is ready, we can discard the encoded transaction group slice to allow the GC to collect it.
Expand Down Expand Up @@ -143,7 +150,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
for _, peer := range peers {
msgEncoder := &messageAsyncEncoder{state: s, roundClock: s.clock, peerDataExchangeRate: peer.dataExchangeRate, sentMessagesCh: s.outgoingMessagesCallbackCh}
profAssembleMessage.start()
msgEncoder.messageData, assembledBloomFilter = s.assemblePeerMessage(peer, &pendingTransactions)
msgEncoder.messageData, assembledBloomFilter, msgEncoder.lastReceivedMessageTimestamp = s.assemblePeerMessage(peer, &pendingTransactions)
profAssembleMessage.end()
isPartialMessage := msgEncoder.messageData.partialMessage
// The message that we've just encoded is expected to be sent out with the next sequence number.
Expand Down Expand Up @@ -183,7 +190,7 @@ func (s *syncState) sendMessageLoop(currentTime time.Duration, deadline timers.D
}
}

func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter) {
func (s *syncState) assemblePeerMessage(peer *Peer, pendingTransactions *pendingTransactionGroupsSnapshot) (metaMessage sentMessageMetadata, assembledBloomFilter bloomFilter, lastReceivedMessageTimestamp time.Duration) {
metaMessage = sentMessageMetadata{
peer: peer,
message: &transactionBlockMessage{
Expand Down Expand Up @@ -281,10 +288,10 @@ notxns:
}

metaMessage.message.MsgSync.RefTxnBlockMsgSeq = peer.nextReceivedMessageSeq - 1
// signify that timestamp is not set
lastReceivedMessageTimestamp = time.Duration(-1)
if peer.lastReceivedMessageTimestamp != 0 && peer.lastReceivedMessageLocalRound == s.round {
// adding a nanosecond to the elapsed time is meaningless for the data rate calculation, but would ensure that
// the ResponseElapsedTime field has a clear distinction between "being set" vs. "not being set"
metaMessage.message.MsgSync.ResponseElapsedTime = uint64((s.clock.Since() - peer.lastReceivedMessageTimestamp).Nanoseconds()) + 1
lastReceivedMessageTimestamp = peer.lastReceivedMessageTimestamp
// reset the lastReceivedMessageTimestamp so that we won't be using that again on a subsequent outgoing message.
peer.lastReceivedMessageTimestamp = 0
}
Expand Down
14 changes: 7 additions & 7 deletions txnsync/outgoing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,14 @@ func TestAssemblePeerMessage_messageConstBloomFilter(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateLateBloom

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
a.True(responseTime >= 0)
a.Equal(s.lastBloomFilter, expectedFilter)
}

Expand Down Expand Up @@ -330,14 +330,14 @@ func TestAssemblePeerMessage_messageConstBloomFilterNonRelay(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateLateBloom

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
a.True(responseTime >= 0)
a.NotEqual(s.lastBloomFilter, expectedFilter)
}

Expand All @@ -362,14 +362,14 @@ func TestAssemblePeerMessage_messageConstNextMinDelay_messageConstUpdateRequestP
s.isRelay = true
s.lastBeta = 123 * time.Nanosecond

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, responseTime := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(metaMessage.message.UpdatedRequestParams.Modulator, byte(222))
a.Equal(metaMessage.message.UpdatedRequestParams.Offset, byte(111))
a.Equal(metaMessage.peer, &peer)
a.Equal(metaMessage.message.Version, int32(txnBlockMessageVersion))
a.Equal(metaMessage.message.Round, s.round)
a.True(metaMessage.message.MsgSync.ResponseElapsedTime != 0)
a.True(responseTime >= 0)
a.Equal(metaMessage.message.MsgSync.NextMsgMinDelay, uint64(s.lastBeta.Nanoseconds())*2)

}
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestAssemblePeerMessage_messageConstTransactions(t *testing.T) {
peer.isOutgoing = true
peer.state = peerStateHoldsoff

metaMessage, _ := s.assemblePeerMessage(&peer, &pendingTransactions)
metaMessage, _, _ := s.assemblePeerMessage(&peer, &pendingTransactions)

a.Equal(len(metaMessage.transactionGroups), 1)
a.True(reflect.DeepEqual(metaMessage.transactionGroups[0], pendingTransactions.pendingTransactionsGroups[0]))
Expand Down
Loading