Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -291,6 +292,11 @@ func (p *testUnicastPeer) IsOutgoing() bool {
return false
}

// GetConnectionLatency returns the connection latency between the local node and this peer.
func (p *testUnicastPeer) GetConnectionLatency() time.Duration {
return time.Duration(0)
}

func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag, callback network.UnicastWebsocketMessageStateCallback) error {
ps := p.gn.(*httpTestPeerSource)
var dispather network.MessageHandler
Expand Down
5 changes: 5 additions & 0 deletions catchup/peerSelector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func (d *mockUnicastPeer) IsOutgoing() bool {
return false
}

// GetConnectionLatency returns the connection latency between the local node and this peer.
func (d *mockUnicastPeer) GetConnectionLatency() time.Duration {
return time.Duration(0)
}

func TestPeerAddress(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Local struct {
// Version tracks the current version of the defaults so we can migrate old -> new
// This is specifically important whenever we decide to change the default value
// for an existing parameter. This field tag must be updated any time we add a new version.
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17"`
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18"`

// environmental (may be overridden)
// When enabled, stores blocks indefinitally, otherwise, only the most recents blocks
Expand All @@ -84,7 +84,7 @@ type Local struct {
MaxConnectionsPerIP int `version[3]:"30"`

// 0 == disable
PeerPingPeriodSeconds int `version[0]:"0"`
PeerPingPeriodSeconds int `version[0]:"0" version[18]:"10"`

// for https serving
TLSCertFile string `version[0]:""`
Expand Down
4 changes: 2 additions & 2 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package config

var defaultLocal = Local{
Version: 17,
Version: 18,
AccountUpdatesStatsInterval: 5000000000,
AccountsRebuildSynchronousMode: 1,
AnnounceParticipationKey: true,
Expand Down Expand Up @@ -92,7 +92,7 @@ var defaultLocal = Local{
OutgoingMessageFilterBucketSize: 128,
ParticipationKeysRefreshInterval: 60000000000,
PeerConnectionsUpdateInterval: 3600,
PeerPingPeriodSeconds: 0,
PeerPingPeriodSeconds: 10,
PriorityPeers: map[string]bool{},
PublicAddress: "",
ReconnectTime: 60000000000,
Expand Down
4 changes: 2 additions & 2 deletions installer/config.json.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"Version": 17,
"Version": 18,
"AccountUpdatesStatsInterval": 5000000000,
"AccountsRebuildSynchronousMode": 1,
"AnnounceParticipationKey": true,
Expand Down Expand Up @@ -71,7 +71,7 @@
"OutgoingMessageFilterBucketSize": 128,
"ParticipationKeysRefreshInterval": 60000000000,
"PeerConnectionsUpdateInterval": 3600,
"PeerPingPeriodSeconds": 0,
"PeerPingPeriodSeconds": 10,
"PriorityPeers": {},
"PublicAddress": "",
"ReconnectTime": 60000000000,
Expand Down
170 changes: 170 additions & 0 deletions network/latencyTracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"errors"
"net"
"strconv"
"sync/atomic"
"time"

"github.com/algorand/websocket"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/config"
)

const pongMessageWriteDuration = time.Second
const pingMessageWriteDuration = time.Second

var errInvalidPongMessageContent = errors.New("invalid pong message content")
var errInvalidPingMessageContent = errors.New("invalid ping message content")

// latencyTracker works in conjunction with the wspeer in measuring the
// communication latency over the websocket connection.
type latencyTracker struct {
// receivedPacketCounter is a counter for all incoming messages
// placed here to be aligned with 64bit address.
receivedPacketCounter uint64

// latency is the effective latency of the connection.
// placed here to be aligned with 64bit address.
latency int64

// lastPingSentTime is the timestamp at which we last sent a message.
// this variable is only touched by checkPingSending, and therefore doesn't
// need to be syncronized. The "clone" of this variable lastPingSentTimeSynced,
// is being used by both the checkPingSending as well as by the pongHandler
// and therefore require synchronization.
lastPingSentTime int64

// static variables
// ( doesn't get changed after init, hence, no synchronization needed )

// conn is the underlying connection object.
conn wsPeerWebsocketConn

// enabled indicates whether the pingpong is currently enabled or not.
enabled bool

// pingInterval is the max interval at which the client would send ping messages.
pingInterval time.Duration

// lastPingMu synchronize the protected variables that might be modified across
// the checkPingSending and the pongHandler. All the variable below this point
// need to be syncronized with the mutex.
lastPingMu deadlock.Mutex

// lastPingID is the last ping ID, a monotonic growing number used to ensure
// that the pong message we've receive corresponds to the latest ping message
// that we've sent.
lastPingID uint64

// lastPingReceivedCounter stores message counter at the time we sent the ping.
// In order to ensure the timing accuracy, we want to have no other messages
// being exchanged. This, of course, would only delay the ping-pong until a
// better measurement could be taken.
lastPingReceivedCounter uint64

// lastPingSentTimeSynced, as stated above, is the syncronized version of lastPingSentTime.
// it is used only in the case where we end up sending the ping message.
lastPingSentTimeSynced int64
}

func (lt *latencyTracker) init(conn wsPeerWebsocketConn, cfg config.Local, initialConnectionLatency time.Duration) {
lt.conn = conn
lt.enabled = cfg.PeerPingPeriodSeconds > 0 && cfg.EnablePingHandler
lt.latency = int64(initialConnectionLatency)
lt.pingInterval = time.Duration(cfg.PeerPingPeriodSeconds) * time.Second
conn.SetPingHandler(lt.pingHandler)
conn.SetPongHandler(lt.pongHandler)
}

func (lt *latencyTracker) pingHandler(message string) error {
if _, err := strconv.Atoi(message); err != nil {
return errInvalidPingMessageContent
}
err := lt.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(pongMessageWriteDuration))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
}

func (lt *latencyTracker) pongHandler(message string) error {
pongID, err := strconv.Atoi(message)
if err != nil {
return errInvalidPongMessageContent
}

lt.lastPingMu.Lock()
defer lt.lastPingMu.Unlock()

if uint64(pongID) != lt.lastPingID {
// we've sent more than one ping since; ignore this message.
return nil
}
if lt.receivedPacketCounter != lt.lastPingReceivedCounter {
// we've received other messages since the one that we sent. The timing
// here would not be accurate.
return nil
}
lastPingSentTime := time.Unix(0, lt.lastPingSentTimeSynced)
roundtripDuration := time.Since(lastPingSentTime)
atomic.StoreInt64(&lt.latency, roundtripDuration.Nanoseconds())
return nil
}

func (lt *latencyTracker) getConnectionLatency() time.Duration {
return time.Duration(atomic.LoadInt64(&lt.latency))
}

func (lt *latencyTracker) checkPingSending(now *time.Time) error {
if !lt.enabled {
return nil
}
if now.Sub(time.Unix(0, lt.lastPingSentTime)) < lt.pingInterval {
return nil
}

// it looks like it's time to send a ping :
lt.lastPingMu.Lock()
defer lt.lastPingMu.Unlock()

lt.lastPingID++
err := lt.conn.WriteControl(websocket.PingMessage, []byte(strconv.Itoa(int(lt.lastPingID))), now.Add(pingMessageWriteDuration))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
if err != nil {
return err
}
lt.lastPingSentTimeSynced = now.UnixNano()
lt.lastPingReceivedCounter = atomic.LoadUint64(&lt.receivedPacketCounter)
lt.lastPingSentTime = lt.lastPingSentTimeSynced
return nil
}

func (lt *latencyTracker) increaseReceivedCounter() {
atomic.AddUint64(&lt.receivedPacketCounter, 1)
}
121 changes: 121 additions & 0 deletions network/latencyTracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"context"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)

func TestLatencyTracker(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestFilterWebsocketNode(t, "a")
netA.config.GossipFanout = 1
netA.config.PeerPingPeriodSeconds = 2
netA.Start()
defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()

netB := makeTestFilterWebsocketNode(t, "b")
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)

netB.Start()
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()
counter := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})}
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}})
debugTag2 := protocol.ProposalPayloadTag
counter2 := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})}
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: debugTag2, MessageHandler: counter2}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
waitReady(t, netB, readyTimeout.C)

msg := make([]byte, 200)
rand.Read(msg)
var lastMsgTime time.Time

var connLatencyInitialA time.Duration
// wait for up to 20 seconds for the network latency to be established.
startTime := time.Now()
for {
if time.Since(lastMsgTime) > 100*time.Millisecond {
netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
lastMsgTime = time.Now()
}

connLatencyA := netA.peers[0].GetConnectionLatency()
if connLatencyA == time.Duration(0) {
require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds())
time.Sleep(time.Millisecond)
continue
}
require.LessOrEqual(t, connLatencyA.Nanoseconds(), (20 * time.Second).Nanoseconds())
connLatencyInitialA = connLatencyA
break
}

// wait for up to 20 seconds for the network latency to be established.
startTime = time.Now()
lastMsgTime = time.Time{}
for {
if time.Since(lastMsgTime) > 100*time.Millisecond {
netB.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
lastMsgTime = time.Now()
}

connLatencyB := netB.peers[0].GetConnectionLatency()
if connLatencyB == time.Duration(0) {
require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds())
time.Sleep(time.Millisecond)
continue
}
require.LessOrEqual(t, connLatencyB.Nanoseconds(), (20 * time.Second).Nanoseconds())
break
}

// send the given message until we get a different latency.
// wait for up to 20 seconds for the network latency to be established.
startTime = time.Now()
lastMsgTime = time.Time{}
for {
if time.Since(lastMsgTime) > 100*time.Millisecond {
netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
lastMsgTime = time.Now()
}

connLatencyA := netA.peers[0].GetConnectionLatency()
if connLatencyA != connLatencyInitialA {
require.NotEqual(t, connLatencyA.Nanoseconds(), int64(0))
waitTime := time.Since(lastMsgTime)
require.Less(t, waitTime.Seconds(), float64(netA.config.PeerPingPeriodSeconds*2))
break
}
time.Sleep(time.Millisecond)
}
}
Loading