Skip to content

Commit 48d4075

Browse files
authored
network: add initial support for latency tracking (#3028)
## Summary This PR adds the node ( both client and server ), the ability to measure the time it takes to establish an outgoing connection ( excluding the TCP connection time ). This duration is captured as the initial latency, which would need to get updated via a pingpong style logic. ## Test Plan - [x] Extend existing unit tests - [x] mainnet-model testing is needed as well to confirm correctness
1 parent bd4cd9a commit 48d4075

File tree

14 files changed

+442
-277
lines changed

14 files changed

+442
-277
lines changed

catchup/fetcher_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"net/url"
2424
"strings"
2525
"testing"
26+
"time"
2627

2728
"github.com/gorilla/mux"
2829
"github.com/stretchr/testify/require"
@@ -291,6 +292,11 @@ func (p *testUnicastPeer) IsOutgoing() bool {
291292
return false
292293
}
293294

295+
// GetConnectionLatency returns the connection latency between the local node and this peer.
296+
func (p *testUnicastPeer) GetConnectionLatency() time.Duration {
297+
return time.Duration(0)
298+
}
299+
294300
func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback network.UnicastWebsocketMessageStateCallback) error {
295301
ps := p.gn.(*httpTestPeerSource)
296302
var dispather network.MessageHandler

catchup/peerSelector_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ func (d *mockUnicastPeer) IsOutgoing() bool {
6666
return false
6767
}
6868

69+
// GetConnectionLatency returns the connection latency between the local node and this peer.
70+
func (d *mockUnicastPeer) GetConnectionLatency() time.Duration {
71+
return time.Duration(0)
72+
}
73+
6974
func TestPeerAddress(t *testing.T) {
7075
partitiontest.PartitionTest(t)
7176

config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type Local struct {
6363
// Version tracks the current version of the defaults so we can migrate old -> new
6464
// This is specifically important whenever we decide to change the default value
6565
// for an existing parameter. This field tag must be updated any time we add a new version.
66-
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17"`
66+
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18"`
6767

6868
// environmental (may be overridden)
6969
// When enabled, stores blocks indefinitally, otherwise, only the most recents blocks
@@ -84,7 +84,7 @@ type Local struct {
8484
MaxConnectionsPerIP int `version[3]:"30"`
8585

8686
// 0 == disable
87-
PeerPingPeriodSeconds int `version[0]:"0"`
87+
PeerPingPeriodSeconds int `version[0]:"0" version[18]:"10"`
8888

8989
// for https serving
9090
TLSCertFile string `version[0]:""`

config/local_defaults.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package config
2121

2222
var defaultLocal = Local{
23-
Version: 17,
23+
Version: 18,
2424
AccountUpdatesStatsInterval: 5000000000,
2525
AccountsRebuildSynchronousMode: 1,
2626
AnnounceParticipationKey: true,
@@ -92,7 +92,7 @@ var defaultLocal = Local{
9292
OutgoingMessageFilterBucketSize: 128,
9393
ParticipationKeysRefreshInterval: 60000000000,
9494
PeerConnectionsUpdateInterval: 3600,
95-
PeerPingPeriodSeconds: 0,
95+
PeerPingPeriodSeconds: 10,
9696
PriorityPeers: map[string]bool{},
9797
PublicAddress: "",
9898
ReconnectTime: 60000000000,

installer/config.json.example

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"Version": 17,
2+
"Version": 18,
33
"AccountUpdatesStatsInterval": 5000000000,
44
"AccountsRebuildSynchronousMode": 1,
55
"AnnounceParticipationKey": true,
@@ -71,7 +71,7 @@
7171
"OutgoingMessageFilterBucketSize": 128,
7272
"ParticipationKeysRefreshInterval": 60000000000,
7373
"PeerConnectionsUpdateInterval": 3600,
74-
"PeerPingPeriodSeconds": 0,
74+
"PeerPingPeriodSeconds": 10,
7575
"PriorityPeers": {},
7676
"PublicAddress": "",
7777
"ReconnectTime": 60000000000,

network/latencyTracker.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright (C) 2019-2021 Algorand, Inc.
2+
// This file is part of go-algorand
3+
//
4+
// go-algorand is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Affero General Public License as
6+
// published by the Free Software Foundation, either version 3 of the
7+
// License, or (at your option) any later version.
8+
//
9+
// go-algorand is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Affero General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Affero General Public License
15+
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
16+
17+
package network
18+
19+
import (
20+
"errors"
21+
"net"
22+
"strconv"
23+
"sync/atomic"
24+
"time"
25+
26+
"github.com/algorand/websocket"
27+
28+
"github.com/algorand/go-deadlock"
29+
30+
"github.com/algorand/go-algorand/config"
31+
)
32+
33+
const pongMessageWriteDuration = time.Second
34+
const pingMessageWriteDuration = time.Second
35+
36+
var errInvalidPongMessageContent = errors.New("invalid pong message content")
37+
var errInvalidPingMessageContent = errors.New("invalid ping message content")
38+
39+
// latencyTracker works in conjunction with the wspeer in measuring the
40+
// communication latency over the websocket connection.
41+
type latencyTracker struct {
42+
// receivedPacketCounter is a counter for all incoming messages
43+
// placed here to be aligned with 64bit address.
44+
receivedPacketCounter uint64
45+
46+
// latency is the effective latency of the connection.
47+
// placed here to be aligned with 64bit address.
48+
latency int64
49+
50+
// lastPingSentTime is the timestamp at which we last sent a message.
51+
// this variable is only touched by checkPingSending, and therefore doesn't
52+
// need to be syncronized. The "clone" of this variable lastPingSentTimeSynced,
53+
// is being used by both the checkPingSending as well as by the pongHandler
54+
// and therefore require synchronization.
55+
lastPingSentTime int64
56+
57+
// static variables
58+
// ( doesn't get changed after init, hence, no synchronization needed )
59+
60+
// conn is the underlying connection object.
61+
conn wsPeerWebsocketConn
62+
63+
// enabled indicates whether the pingpong is currently enabled or not.
64+
enabled bool
65+
66+
// pingInterval is the max interval at which the client would send ping messages.
67+
pingInterval time.Duration
68+
69+
// lastPingMu synchronize the protected variables that might be modified across
70+
// the checkPingSending and the pongHandler. All the variable below this point
71+
// need to be syncronized with the mutex.
72+
lastPingMu deadlock.Mutex
73+
74+
// lastPingID is the last ping ID, a monotonic growing number used to ensure
75+
// that the pong message we've receive corresponds to the latest ping message
76+
// that we've sent.
77+
lastPingID uint64
78+
79+
// lastPingReceivedCounter stores message counter at the time we sent the ping.
80+
// In order to ensure the timing accuracy, we want to have no other messages
81+
// being exchanged. This, of course, would only delay the ping-pong until a
82+
// better measurement could be taken.
83+
lastPingReceivedCounter uint64
84+
85+
// lastPingSentTimeSynced, as stated above, is the syncronized version of lastPingSentTime.
86+
// it is used only in the case where we end up sending the ping message.
87+
lastPingSentTimeSynced int64
88+
}
89+
90+
func (lt *latencyTracker) init(conn wsPeerWebsocketConn, cfg config.Local, initialConnectionLatency time.Duration) {
91+
lt.conn = conn
92+
lt.enabled = cfg.PeerPingPeriodSeconds > 0 && cfg.EnablePingHandler
93+
lt.latency = int64(initialConnectionLatency)
94+
lt.pingInterval = time.Duration(cfg.PeerPingPeriodSeconds) * time.Second
95+
conn.SetPingHandler(lt.pingHandler)
96+
conn.SetPongHandler(lt.pongHandler)
97+
}
98+
99+
func (lt *latencyTracker) pingHandler(message string) error {
100+
if _, err := strconv.Atoi(message); err != nil {
101+
return errInvalidPingMessageContent
102+
}
103+
err := lt.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(pongMessageWriteDuration))
104+
if err == websocket.ErrCloseSent {
105+
return nil
106+
} else if e, ok := err.(net.Error); ok && e.Temporary() {
107+
return nil
108+
}
109+
return err
110+
}
111+
112+
func (lt *latencyTracker) pongHandler(message string) error {
113+
pongID, err := strconv.Atoi(message)
114+
if err != nil {
115+
return errInvalidPongMessageContent
116+
}
117+
118+
lt.lastPingMu.Lock()
119+
defer lt.lastPingMu.Unlock()
120+
121+
if uint64(pongID) != lt.lastPingID {
122+
// we've sent more than one ping since; ignore this message.
123+
return nil
124+
}
125+
if lt.receivedPacketCounter != lt.lastPingReceivedCounter {
126+
// we've received other messages since the one that we sent. The timing
127+
// here would not be accurate.
128+
return nil
129+
}
130+
lastPingSentTime := time.Unix(0, lt.lastPingSentTimeSynced)
131+
roundtripDuration := time.Since(lastPingSentTime)
132+
atomic.StoreInt64(&lt.latency, roundtripDuration.Nanoseconds())
133+
return nil
134+
}
135+
136+
func (lt *latencyTracker) getConnectionLatency() time.Duration {
137+
return time.Duration(atomic.LoadInt64(&lt.latency))
138+
}
139+
140+
func (lt *latencyTracker) checkPingSending(now *time.Time) error {
141+
if !lt.enabled {
142+
return nil
143+
}
144+
if now.Sub(time.Unix(0, lt.lastPingSentTime)) < lt.pingInterval {
145+
return nil
146+
}
147+
148+
// it looks like it's time to send a ping :
149+
lt.lastPingMu.Lock()
150+
defer lt.lastPingMu.Unlock()
151+
152+
lt.lastPingID++
153+
err := lt.conn.WriteControl(websocket.PingMessage, []byte(strconv.Itoa(int(lt.lastPingID))), now.Add(pingMessageWriteDuration))
154+
if err == websocket.ErrCloseSent {
155+
return nil
156+
} else if e, ok := err.(net.Error); ok && e.Temporary() {
157+
return nil
158+
}
159+
if err != nil {
160+
return err
161+
}
162+
lt.lastPingSentTimeSynced = now.UnixNano()
163+
lt.lastPingReceivedCounter = atomic.LoadUint64(&lt.receivedPacketCounter)
164+
lt.lastPingSentTime = lt.lastPingSentTimeSynced
165+
return nil
166+
}
167+
168+
func (lt *latencyTracker) increaseReceivedCounter() {
169+
atomic.AddUint64(&lt.receivedPacketCounter, 1)
170+
}

network/latencyTracker_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright (C) 2019-2021 Algorand, Inc.
2+
// This file is part of go-algorand
3+
//
4+
// go-algorand is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Affero General Public License as
6+
// published by the Free Software Foundation, either version 3 of the
7+
// License, or (at your option) any later version.
8+
//
9+
// go-algorand is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Affero General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Affero General Public License
15+
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
16+
17+
package network
18+
19+
import (
20+
"context"
21+
"math/rand"
22+
"testing"
23+
"time"
24+
25+
"github.com/stretchr/testify/require"
26+
27+
"github.com/algorand/go-algorand/protocol"
28+
"github.com/algorand/go-algorand/test/partitiontest"
29+
)
30+
31+
func TestLatencyTracker(t *testing.T) {
32+
partitiontest.PartitionTest(t)
33+
34+
netA := makeTestFilterWebsocketNode(t, "a")
35+
netA.config.GossipFanout = 1
36+
netA.config.PeerPingPeriodSeconds = 2
37+
netA.Start()
38+
defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()
39+
40+
netB := makeTestFilterWebsocketNode(t, "b")
41+
netB.config.GossipFanout = 1
42+
addrA, postListen := netA.Address()
43+
require.True(t, postListen)
44+
t.Log(addrA)
45+
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
46+
47+
netB.Start()
48+
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
49+
counter := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})}
50+
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
51+
debugTag2 := protocol.ProposalPayloadTag
52+
counter2 := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})}
53+
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: debugTag2, MessageHandler: counter2}})
54+
55+
readyTimeout := time.NewTimer(2 * time.Second)
56+
waitReady(t, netA, readyTimeout.C)
57+
waitReady(t, netB, readyTimeout.C)
58+
59+
msg := make([]byte, 200)
60+
rand.Read(msg)
61+
var lastMsgTime time.Time
62+
63+
var connLatencyInitialA time.Duration
64+
// wait for up to 20 seconds for the network latency to be established.
65+
startTime := time.Now()
66+
for {
67+
if time.Since(lastMsgTime) > 100*time.Millisecond {
68+
netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
69+
lastMsgTime = time.Now()
70+
}
71+
72+
connLatencyA := netA.peers[0].GetConnectionLatency()
73+
if connLatencyA == time.Duration(0) {
74+
require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds())
75+
time.Sleep(time.Millisecond)
76+
continue
77+
}
78+
require.LessOrEqual(t, connLatencyA.Nanoseconds(), (20 * time.Second).Nanoseconds())
79+
connLatencyInitialA = connLatencyA
80+
break
81+
}
82+
83+
// wait for up to 20 seconds for the network latency to be established.
84+
startTime = time.Now()
85+
lastMsgTime = time.Time{}
86+
for {
87+
if time.Since(lastMsgTime) > 100*time.Millisecond {
88+
netB.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
89+
lastMsgTime = time.Now()
90+
}
91+
92+
connLatencyB := netB.peers[0].GetConnectionLatency()
93+
if connLatencyB == time.Duration(0) {
94+
require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds())
95+
time.Sleep(time.Millisecond)
96+
continue
97+
}
98+
require.LessOrEqual(t, connLatencyB.Nanoseconds(), (20 * time.Second).Nanoseconds())
99+
break
100+
}
101+
102+
// send the given message until we get a different latency.
103+
// wait for up to 20 seconds for the network latency to be established.
104+
startTime = time.Now()
105+
lastMsgTime = time.Time{}
106+
for {
107+
if time.Since(lastMsgTime) > 100*time.Millisecond {
108+
netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
109+
lastMsgTime = time.Now()
110+
}
111+
112+
connLatencyA := netA.peers[0].GetConnectionLatency()
113+
if connLatencyA != connLatencyInitialA {
114+
require.NotEqual(t, connLatencyA.Nanoseconds(), int64(0))
115+
waitTime := time.Since(lastMsgTime)
116+
require.Less(t, waitTime.Seconds(), float64(netA.config.PeerPingPeriodSeconds*2))
117+
break
118+
}
119+
time.Sleep(time.Millisecond)
120+
}
121+
}

0 commit comments

Comments
 (0)