-
Notifications
You must be signed in to change notification settings - Fork 523
network: add initial support for latency tracking #3028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
tsachiherman
merged 21 commits into
algorand:master
from
tsachiherman:tsachi/measurelatency
Oct 19, 2021
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
6f5c326
Add initial support for latency tracking.
tsachiherman 6ede2cc
update unit tests
tsachiherman 2a200fe
Merge branch 'master' into tsachi/measurelatency
tsachiherman b3d2009
add missing import
tsachiherman 6512348
Merge branch 'master' into tsachi/measurelatency
tsachiherman 126cbf3
hard-code number
tsachiherman 62d06c1
update
tsachiherman 2097a91
add missing file.
tsachiherman 0e26994
update
tsachiherman c2f887c
update config.
tsachiherman c411372
update
tsachiherman 2474b1a
add missing file.
tsachiherman bc92323
Merge branch 'master' into tsachi/measurelatency
tsachiherman 62c7544
rollback few changes.
tsachiherman ff0dfba
add few more comments.
tsachiherman 3e4d96d
update
tsachiherman 841b160
fix typos
tsachiherman 3f57b0f
add unit test.
tsachiherman 937375e
improve test.
tsachiherman f1a6fa9
rollback unit test.
tsachiherman 0de295d
Merge branch 'master' into tsachi/measurelatency
tsachiherman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| // Copyright (C) 2019-2021 Algorand, Inc. | ||
| // This file is part of go-algorand | ||
| // | ||
| // go-algorand is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU Affero General Public License as | ||
| // published by the Free Software Foundation, either version 3 of the | ||
| // License, or (at your option) any later version. | ||
| // | ||
| // go-algorand is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU Affero General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU Affero General Public License | ||
| // along with go-algorand. If not, see <https://www.gnu.org/licenses/>. | ||
|
|
||
| package network | ||
|
|
||
| import ( | ||
| "errors" | ||
| "net" | ||
| "strconv" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/algorand/websocket" | ||
|
|
||
| "github.com/algorand/go-deadlock" | ||
|
|
||
| "github.com/algorand/go-algorand/config" | ||
| ) | ||
|
|
||
| const pongMessageWriteDuration = time.Second | ||
| const pingMessageWriteDuration = time.Second | ||
|
|
||
| var errInvalidPongMessageContent = errors.New("invalid pong message content") | ||
| var errInvalidPingMessageContent = errors.New("invalid ping message content") | ||
|
|
||
| // latencyTracker works in conjunction with the wspeer in measuring the | ||
| // communication latency over the websocket connection. | ||
| type latencyTracker struct { | ||
| // receivedPacketCounter is a counter for all incoming messages | ||
| // placed here to be aligned with 64bit address. | ||
| receivedPacketCounter uint64 | ||
|
|
||
| // latency is the effective latency of the connection. | ||
| // placed here to be aligned with 64bit address. | ||
| latency int64 | ||
|
|
||
| // lastPingSentTime is the timestamp at which we last sent a message. | ||
| // this variable is only touched by checkPingSending, and therefore doesn't | ||
| // need to be syncronized. The "clone" of this variable lastPingSentTimeSynced, | ||
| // is being used by both the checkPingSending as well as by the pongHandler | ||
| // and therefore require synchronization. | ||
| lastPingSentTime int64 | ||
|
|
||
| // static variables | ||
| // ( doesn't get changed after init, hence, no synchronization needed ) | ||
|
|
||
| // conn is the underlying connection object. | ||
| conn wsPeerWebsocketConn | ||
|
|
||
| // enabled indicates whether the pingpong is currently enabled or not. | ||
| enabled bool | ||
|
|
||
| // pingInterval is the max interval at which the client would send ping messages. | ||
| pingInterval time.Duration | ||
|
|
||
| // lastPingMu synchronize the protected variables that might be modified across | ||
| // the checkPingSending and the pongHandler. All the variable below this point | ||
| // need to be syncronized with the mutex. | ||
| lastPingMu deadlock.Mutex | ||
|
|
||
| // lastPingID is the last ping ID, a monotonic growing number used to ensure | ||
| // that the pong message we've receive corresponds to the latest ping message | ||
| // that we've sent. | ||
| lastPingID uint64 | ||
|
|
||
| // lastPingReceivedCounter stores message counter at the time we sent the ping. | ||
| // In order to ensure the timing accuracy, we want to have no other messages | ||
| // being exchanged. This, of course, would only delay the ping-pong until a | ||
| // better measurement could be taken. | ||
| lastPingReceivedCounter uint64 | ||
|
|
||
| // lastPingSentTimeSynced, as stated above, is the syncronized version of lastPingSentTime. | ||
| // it is used only in the case where we end up sending the ping message. | ||
| lastPingSentTimeSynced int64 | ||
| } | ||
|
|
||
| func (lt *latencyTracker) init(conn wsPeerWebsocketConn, cfg config.Local, initialConnectionLatency time.Duration) { | ||
| lt.conn = conn | ||
| lt.enabled = cfg.PeerPingPeriodSeconds > 0 && cfg.EnablePingHandler | ||
| lt.latency = int64(initialConnectionLatency) | ||
| lt.pingInterval = time.Duration(cfg.PeerPingPeriodSeconds) * time.Second | ||
| conn.SetPingHandler(lt.pingHandler) | ||
| conn.SetPongHandler(lt.pongHandler) | ||
| } | ||
|
|
||
| func (lt *latencyTracker) pingHandler(message string) error { | ||
| if _, err := strconv.Atoi(message); err != nil { | ||
| return errInvalidPingMessageContent | ||
| } | ||
| err := lt.conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(pongMessageWriteDuration)) | ||
| if err == websocket.ErrCloseSent { | ||
| return nil | ||
| } else if e, ok := err.(net.Error); ok && e.Temporary() { | ||
| return nil | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| func (lt *latencyTracker) pongHandler(message string) error { | ||
| pongID, err := strconv.Atoi(message) | ||
| if err != nil { | ||
| return errInvalidPongMessageContent | ||
| } | ||
|
|
||
| lt.lastPingMu.Lock() | ||
| defer lt.lastPingMu.Unlock() | ||
|
|
||
| if uint64(pongID) != lt.lastPingID { | ||
| // we've sent more than one ping since; ignore this message. | ||
| return nil | ||
| } | ||
| if lt.receivedPacketCounter != lt.lastPingReceivedCounter { | ||
| // we've received other messages since the one that we sent. The timing | ||
| // here would not be accurate. | ||
| return nil | ||
| } | ||
| lastPingSentTime := time.Unix(0, lt.lastPingSentTimeSynced) | ||
| roundtripDuration := time.Since(lastPingSentTime) | ||
| atomic.StoreInt64(<.latency, roundtripDuration.Nanoseconds()) | ||
| return nil | ||
| } | ||
|
|
||
| func (lt *latencyTracker) getConnectionLatency() time.Duration { | ||
| return time.Duration(atomic.LoadInt64(<.latency)) | ||
| } | ||
|
|
||
| func (lt *latencyTracker) checkPingSending(now *time.Time) error { | ||
| if !lt.enabled { | ||
| return nil | ||
| } | ||
| if now.Sub(time.Unix(0, lt.lastPingSentTime)) < lt.pingInterval { | ||
| return nil | ||
| } | ||
|
|
||
| // it looks like it's time to send a ping : | ||
| lt.lastPingMu.Lock() | ||
| defer lt.lastPingMu.Unlock() | ||
|
|
||
| lt.lastPingID++ | ||
| err := lt.conn.WriteControl(websocket.PingMessage, []byte(strconv.Itoa(int(lt.lastPingID))), now.Add(pingMessageWriteDuration)) | ||
| if err == websocket.ErrCloseSent { | ||
| return nil | ||
| } else if e, ok := err.(net.Error); ok && e.Temporary() { | ||
| return nil | ||
| } | ||
| if err != nil { | ||
| return err | ||
| } | ||
| lt.lastPingSentTimeSynced = now.UnixNano() | ||
| lt.lastPingReceivedCounter = atomic.LoadUint64(<.receivedPacketCounter) | ||
| lt.lastPingSentTime = lt.lastPingSentTimeSynced | ||
| return nil | ||
| } | ||
|
|
||
| func (lt *latencyTracker) increaseReceivedCounter() { | ||
| atomic.AddUint64(<.receivedPacketCounter, 1) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| // Copyright (C) 2019-2021 Algorand, Inc. | ||
| // This file is part of go-algorand | ||
| // | ||
| // go-algorand is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU Affero General Public License as | ||
| // published by the Free Software Foundation, either version 3 of the | ||
| // License, or (at your option) any later version. | ||
| // | ||
| // go-algorand is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU Affero General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU Affero General Public License | ||
| // along with go-algorand. If not, see <https://www.gnu.org/licenses/>. | ||
|
|
||
| package network | ||
|
|
||
| import ( | ||
| "context" | ||
| "math/rand" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/algorand/go-algorand/protocol" | ||
| "github.com/algorand/go-algorand/test/partitiontest" | ||
| ) | ||
|
|
||
| func TestLatencyTracker(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
|
|
||
| netA := makeTestFilterWebsocketNode(t, "a") | ||
| netA.config.GossipFanout = 1 | ||
| netA.config.PeerPingPeriodSeconds = 2 | ||
| netA.Start() | ||
| defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() | ||
|
|
||
| netB := makeTestFilterWebsocketNode(t, "b") | ||
| netB.config.GossipFanout = 1 | ||
| addrA, postListen := netA.Address() | ||
| require.True(t, postListen) | ||
| t.Log(addrA) | ||
| netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole) | ||
|
|
||
| netB.Start() | ||
| defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() | ||
| counter := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})} | ||
| netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: counter}}) | ||
| debugTag2 := protocol.ProposalPayloadTag | ||
| counter2 := &messageCounterHandler{t: t, limit: 1, done: make(chan struct{})} | ||
| netB.RegisterHandlers([]TaggedMessageHandler{{Tag: debugTag2, MessageHandler: counter2}}) | ||
|
|
||
| readyTimeout := time.NewTimer(2 * time.Second) | ||
| waitReady(t, netA, readyTimeout.C) | ||
| waitReady(t, netB, readyTimeout.C) | ||
|
|
||
| msg := make([]byte, 200) | ||
| rand.Read(msg) | ||
| var lastMsgTime time.Time | ||
|
|
||
| var connLatencyInitialA time.Duration | ||
| // wait for up to 20 seconds for the network latency to be established. | ||
| startTime := time.Now() | ||
| for { | ||
| if time.Since(lastMsgTime) > 100*time.Millisecond { | ||
| netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) | ||
| lastMsgTime = time.Now() | ||
| } | ||
|
|
||
| connLatencyA := netA.peers[0].GetConnectionLatency() | ||
| if connLatencyA == time.Duration(0) { | ||
| require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds()) | ||
| time.Sleep(time.Millisecond) | ||
| continue | ||
| } | ||
| require.LessOrEqual(t, connLatencyA.Nanoseconds(), (20 * time.Second).Nanoseconds()) | ||
| connLatencyInitialA = connLatencyA | ||
| break | ||
| } | ||
|
|
||
| // wait for up to 20 seconds for the network latency to be established. | ||
| startTime = time.Now() | ||
| lastMsgTime = time.Time{} | ||
| for { | ||
| if time.Since(lastMsgTime) > 100*time.Millisecond { | ||
| netB.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) | ||
| lastMsgTime = time.Now() | ||
| } | ||
|
|
||
| connLatencyB := netB.peers[0].GetConnectionLatency() | ||
| if connLatencyB == time.Duration(0) { | ||
| require.LessOrEqual(t, time.Since(startTime).Nanoseconds(), (20 * time.Second).Nanoseconds()) | ||
| time.Sleep(time.Millisecond) | ||
| continue | ||
| } | ||
| require.LessOrEqual(t, connLatencyB.Nanoseconds(), (20 * time.Second).Nanoseconds()) | ||
| break | ||
| } | ||
|
|
||
| // send the given message until we get a different latency. | ||
| // wait for up to 20 seconds for the network latency to be established. | ||
| startTime = time.Now() | ||
| lastMsgTime = time.Time{} | ||
| for { | ||
| if time.Since(lastMsgTime) > 100*time.Millisecond { | ||
| netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil) | ||
| lastMsgTime = time.Now() | ||
| } | ||
|
|
||
| connLatencyA := netA.peers[0].GetConnectionLatency() | ||
| if connLatencyA != connLatencyInitialA { | ||
| require.NotEqual(t, connLatencyA.Nanoseconds(), int64(0)) | ||
| waitTime := time.Since(lastMsgTime) | ||
| require.Less(t, waitTime.Seconds(), float64(netA.config.PeerPingPeriodSeconds*2)) | ||
| break | ||
| } | ||
| time.Sleep(time.Millisecond) | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.