Skip to content

Commit 11ca40c

Browse files
Merge pull request #236 from StephenButtolph/network-upgrade
Network upgrade
2 parents 96198ef + ced4a37 commit 11ca40c

File tree

13 files changed

+329
-270
lines changed

13 files changed

+329
-270
lines changed

api/admin/service.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ import (
1515
"github.com/ava-labs/gecko/network"
1616
"github.com/ava-labs/gecko/snow/engine/common"
1717
"github.com/ava-labs/gecko/utils/logging"
18+
"github.com/ava-labs/gecko/version"
1819

1920
cjson "github.com/ava-labs/gecko/utils/json"
2021
)
2122

2223
// Admin is the API service for node admin management
2324
type Admin struct {
25+
version version.Version
2426
nodeID ids.ShortID
2527
networkID uint32
2628
log logging.Logger
@@ -31,12 +33,13 @@ type Admin struct {
3133
}
3234

3335
// NewService returns a new admin API service
34-
func NewService(nodeID ids.ShortID, networkID uint32, log logging.Logger, chainManager chains.Manager, peers network.Network, httpServer *api.Server) *common.HTTPHandler {
36+
func NewService(version version.Version, nodeID ids.ShortID, networkID uint32, log logging.Logger, chainManager chains.Manager, peers network.Network, httpServer *api.Server) *common.HTTPHandler {
3537
newServer := rpc.NewServer()
3638
codec := cjson.NewCodec()
3739
newServer.RegisterCodec(codec, "application/json")
3840
newServer.RegisterCodec(codec, "application/json;charset=UTF-8")
3941
newServer.RegisterService(&Admin{
42+
version: version,
4043
nodeID: nodeID,
4144
networkID: networkID,
4245
log: log,
@@ -47,6 +50,19 @@ func NewService(nodeID ids.ShortID, networkID uint32, log logging.Logger, chainM
4750
return &common.HTTPHandler{Handler: newServer}
4851
}
4952

53+
// GetNodeVersionReply are the results from calling GetNodeVersion
54+
type GetNodeVersionReply struct {
55+
Version string `json:"version"`
56+
}
57+
58+
// GetNodeVersion returns the version this node is running
59+
func (service *Admin) GetNodeVersion(_ *http.Request, _ *struct{}, reply *GetNodeVersionReply) error {
60+
service.log.Debug("Admin: GetNodeVersion called")
61+
62+
reply.Version = service.version.String()
63+
return nil
64+
}
65+
5066
// GetNodeIDReply are the results from calling GetNodeID
5167
type GetNodeIDReply struct {
5268
NodeID ids.ShortID `json:"nodeID"`

network/builder_test.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,8 @@ func TestBuildGetPeerList(t *testing.T) {
7979

8080
func TestBuildPeerList(t *testing.T) {
8181
ips := []utils.IPDesc{
82-
utils.IPDesc{
83-
IP: net.IPv6loopback,
84-
Port: 12345,
85-
},
86-
utils.IPDesc{
87-
IP: net.IPv6loopback,
88-
Port: 54321,
89-
},
82+
{IP: net.IPv6loopback, Port: 12345},
83+
{IP: net.IPv6loopback, Port: 54321},
9084
}
9185

9286
msg, err := TestBuilder.PeerList(ips)

network/commands.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -188,22 +188,22 @@ const (
188188
var (
189189
Messages = map[Op][]Field{
190190
// Handshake:
191-
GetVersion: []Field{},
192-
Version: []Field{NetworkID, NodeID, MyTime, IP, VersionStr},
193-
GetPeerList: []Field{},
194-
PeerList: []Field{Peers},
191+
GetVersion: {},
192+
Version: {NetworkID, NodeID, MyTime, IP, VersionStr},
193+
GetPeerList: {},
194+
PeerList: {Peers},
195195
// Bootstrapping:
196-
GetAcceptedFrontier: []Field{ChainID, RequestID},
197-
AcceptedFrontier: []Field{ChainID, RequestID, ContainerIDs},
198-
GetAccepted: []Field{ChainID, RequestID, ContainerIDs},
199-
Accepted: []Field{ChainID, RequestID, ContainerIDs},
200-
GetAncestors: []Field{ChainID, RequestID, ContainerID},
201-
MultiPut: []Field{ChainID, RequestID, MultiContainerBytes},
196+
GetAcceptedFrontier: {ChainID, RequestID},
197+
AcceptedFrontier: {ChainID, RequestID, ContainerIDs},
198+
GetAccepted: {ChainID, RequestID, ContainerIDs},
199+
Accepted: {ChainID, RequestID, ContainerIDs},
200+
GetAncestors: {ChainID, RequestID, ContainerID},
201+
MultiPut: {ChainID, RequestID, MultiContainerBytes},
202202
// Consensus:
203-
Get: []Field{ChainID, RequestID, ContainerID},
204-
Put: []Field{ChainID, RequestID, ContainerID, ContainerBytes},
205-
PushQuery: []Field{ChainID, RequestID, ContainerID, ContainerBytes},
206-
PullQuery: []Field{ChainID, RequestID, ContainerID},
207-
Chits: []Field{ChainID, RequestID, ContainerIDs},
203+
Get: {ChainID, RequestID, ContainerID},
204+
Put: {ChainID, RequestID, ContainerID, ContainerBytes},
205+
PushQuery: {ChainID, RequestID, ContainerID, ContainerBytes},
206+
PullQuery: {ChainID, RequestID, ContainerID},
207+
Chits: {ChainID, RequestID, ContainerIDs},
208208
}
209209
)

network/network.go

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,21 @@ import (
2727
"github.com/ava-labs/gecko/version"
2828
)
2929

30+
// reasonable default values
3031
const (
31-
defaultInitialReconnectDelay = time.Second
32-
defaultMaxReconnectDelay = time.Hour
33-
DefaultMaxMessageSize uint32 = 1 << 21
34-
defaultSendQueueSize = 1 << 10
35-
defaultMaxClockDifference = time.Minute
36-
defaultPeerListGossipSpacing = time.Minute
37-
defaultPeerListGossipSize = 100
38-
defaultPeerListStakerGossipFraction = 2
39-
defaultGetVersionTimeout = 2 * time.Second
40-
defaultAllowPrivateIPs = true
41-
defaultGossipSize = 50
32+
defaultInitialReconnectDelay = time.Second
33+
defaultMaxReconnectDelay = time.Hour
34+
DefaultMaxMessageSize uint32 = 1 << 21
35+
defaultSendQueueSize = 1 << 10
36+
defaultMaxNetworkPendingSendBytes = 1 << 29 // 512MB
37+
defaultNetworkPendingSendBytesToRateLimit = defaultMaxNetworkPendingSendBytes / 4
38+
defaultMaxClockDifference = time.Minute
39+
defaultPeerListGossipSpacing = time.Minute
40+
defaultPeerListGossipSize = 100
41+
defaultPeerListStakerGossipFraction = 2
42+
defaultGetVersionTimeout = 2 * time.Second
43+
defaultAllowPrivateIPs = true
44+
defaultGossipSize = 50
4245
)
4346

4447
// Network defines the functionality of the networking library.
@@ -102,23 +105,26 @@ type network struct {
102105
clock timer.Clock
103106
lastHeartbeat int64
104107

105-
initialReconnectDelay time.Duration
106-
maxReconnectDelay time.Duration
107-
maxMessageSize uint32
108-
sendQueueSize int
109-
maxClockDifference time.Duration
110-
peerListGossipSpacing time.Duration
111-
peerListGossipSize int
112-
peerListStakerGossipFraction int
113-
getVersionTimeout time.Duration
114-
allowPrivateIPs bool
115-
gossipSize int
108+
initialReconnectDelay time.Duration
109+
maxReconnectDelay time.Duration
110+
maxMessageSize uint32
111+
sendQueueSize int
112+
maxNetworkPendingSendBytes int
113+
networkPendingSendBytesToRateLimit int
114+
maxClockDifference time.Duration
115+
peerListGossipSpacing time.Duration
116+
peerListGossipSize int
117+
peerListStakerGossipFraction int
118+
getVersionTimeout time.Duration
119+
allowPrivateIPs bool
120+
gossipSize int
116121

117122
executor timer.Executor
118123

119124
b Builder
120125

121126
stateLock sync.Mutex
127+
pendingBytes int
122128
closed bool
123129
disconnectedIPs map[string]struct{}
124130
connectedIPs map[string]struct{}
@@ -164,6 +170,8 @@ func NewDefaultNetwork(
164170
defaultMaxReconnectDelay,
165171
DefaultMaxMessageSize,
166172
defaultSendQueueSize,
173+
defaultMaxNetworkPendingSendBytes,
174+
defaultNetworkPendingSendBytesToRateLimit,
167175
defaultMaxClockDifference,
168176
defaultPeerListGossipSpacing,
169177
defaultPeerListGossipSize,
@@ -193,6 +201,8 @@ func NewNetwork(
193201
maxReconnectDelay time.Duration,
194202
maxMessageSize uint32,
195203
sendQueueSize int,
204+
maxNetworkPendingSendBytes int,
205+
networkPendingSendBytesToRateLimit int,
196206
maxClockDifference time.Duration,
197207
peerListGossipSpacing time.Duration,
198208
peerListGossipSize int,
@@ -202,35 +212,37 @@ func NewNetwork(
202212
gossipSize int,
203213
) Network {
204214
net := &network{
205-
log: log,
206-
id: id,
207-
ip: ip,
208-
networkID: networkID,
209-
version: version,
210-
parser: parser,
211-
listener: listener,
212-
dialer: dialer,
213-
serverUpgrader: serverUpgrader,
214-
clientUpgrader: clientUpgrader,
215-
vdrs: vdrs,
216-
router: router,
217-
nodeID: rand.Uint32(),
218-
initialReconnectDelay: initialReconnectDelay,
219-
maxReconnectDelay: maxReconnectDelay,
220-
maxMessageSize: maxMessageSize,
221-
sendQueueSize: sendQueueSize,
222-
maxClockDifference: maxClockDifference,
223-
peerListGossipSpacing: peerListGossipSpacing,
224-
peerListGossipSize: peerListGossipSize,
225-
peerListStakerGossipFraction: peerListStakerGossipFraction,
226-
getVersionTimeout: getVersionTimeout,
227-
allowPrivateIPs: allowPrivateIPs,
228-
gossipSize: gossipSize,
215+
log: log,
216+
id: id,
217+
ip: ip,
218+
networkID: networkID,
219+
version: version,
220+
parser: parser,
221+
listener: listener,
222+
dialer: dialer,
223+
serverUpgrader: serverUpgrader,
224+
clientUpgrader: clientUpgrader,
225+
vdrs: vdrs,
226+
router: router,
227+
nodeID: rand.Uint32(),
228+
initialReconnectDelay: initialReconnectDelay,
229+
maxReconnectDelay: maxReconnectDelay,
230+
maxMessageSize: maxMessageSize,
231+
sendQueueSize: sendQueueSize,
232+
maxNetworkPendingSendBytes: maxNetworkPendingSendBytes,
233+
networkPendingSendBytesToRateLimit: networkPendingSendBytesToRateLimit,
234+
maxClockDifference: maxClockDifference,
235+
peerListGossipSpacing: peerListGossipSpacing,
236+
peerListGossipSize: peerListGossipSize,
237+
peerListStakerGossipFraction: peerListStakerGossipFraction,
238+
getVersionTimeout: getVersionTimeout,
239+
allowPrivateIPs: allowPrivateIPs,
240+
gossipSize: gossipSize,
229241

230242
disconnectedIPs: make(map[string]struct{}),
231243
connectedIPs: make(map[string]struct{}),
232244
retryDelay: make(map[string]time.Duration),
233-
myIPs: map[string]struct{}{ip.String(): struct{}{}},
245+
myIPs: map[string]struct{}{ip.String(): {}},
234246
peers: make(map[[20]byte]*peer),
235247
}
236248
net.initialize(registerer)
@@ -738,11 +750,12 @@ func (n *network) connectTo(ip utils.IPDesc) {
738750

739751
if delay == 0 {
740752
delay = n.initialReconnectDelay
741-
} else {
742-
delay *= 2
743753
}
754+
755+
delay = time.Duration(float64(delay) * (1 + rand.Float64()))
744756
if delay > n.maxReconnectDelay {
745-
delay = n.maxReconnectDelay
757+
// set the timeout to [.75, 1) * maxReconnectDelay
758+
delay = time.Duration(float64(n.maxReconnectDelay) * (3 + rand.Float64()) / 4)
746759
}
747760

748761
n.stateLock.Lock()

network/peer.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ type peer struct {
3131
// state lock held.
3232
closed bool
3333

34+
// number of bytes currently in the send queue, is only modifed when the
35+
// network state lock held.
36+
pendingBytes int
37+
3438
// queue of messages this connection is attempting to send the peer. Is
3539
// closed when the connection is closed.
3640
sender chan []byte
@@ -155,6 +159,10 @@ func (p *peer) WriteMessages() {
155159
p.id,
156160
formatting.DumpBytes{Bytes: msg})
157161

162+
p.net.stateLock.Lock()
163+
p.pendingBytes -= len(msg)
164+
p.net.stateLock.Unlock()
165+
158166
packer := wrappers.Packer{Bytes: make([]byte, len(msg)+wrappers.IntLen)}
159167
packer.PackBytes(msg)
160168
msg = packer.Bytes
@@ -184,8 +192,22 @@ func (p *peer) send(msg Msg) bool {
184192
p.net.log.Debug("dropping message to %s due to a closed connection", p.id)
185193
return false
186194
}
195+
196+
msgBytes := msg.Bytes()
197+
newPendingBytes := p.net.pendingBytes + len(msgBytes)
198+
newConnPendingBytes := p.pendingBytes + len(msgBytes)
199+
if newPendingBytes > p.net.networkPendingSendBytesToRateLimit && // Check to see if we should be enforcing any rate limiting
200+
uint32(p.pendingBytes) > p.net.maxMessageSize && // this connection should have a minimum allowed bandwidth
201+
(newPendingBytes > p.net.maxNetworkPendingSendBytes || // Check to see if this message would put too much memory into the network
202+
newConnPendingBytes > p.net.maxNetworkPendingSendBytes/20) { // Check to see if this connection is using too much memory
203+
p.net.log.Debug("dropping message to %s due to a send queue with too many bytes", p.id)
204+
return false
205+
}
206+
187207
select {
188-
case p.sender <- msg.Bytes():
208+
case p.sender <- msgBytes:
209+
p.net.pendingBytes = newPendingBytes
210+
p.pendingBytes = newConnPendingBytes
189211
return true
190212
default:
191213
p.net.log.Debug("dropping message to %s due to a full send queue", p.id)

node/node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ var (
5656
genesisHashKey = []byte("genesisID")
5757

5858
// Version is the version of this code
59-
Version = version.NewDefaultVersion("avalanche", 0, 5, 4)
59+
Version = version.NewDefaultVersion("avalanche", 0, 5, 5)
6060
versionParser = version.NewDefaultParser()
6161
)
6262

@@ -461,7 +461,7 @@ func (n *Node) initMetricsAPI() {
461461
func (n *Node) initAdminAPI() {
462462
if n.Config.AdminAPIEnabled {
463463
n.Log.Info("initializing Admin API")
464-
service := admin.NewService(n.ID, n.Config.NetworkID, n.Log, n.chainManager, n.Net, &n.APIServer)
464+
service := admin.NewService(Version, n.ID, n.Config.NetworkID, n.Log, n.chainManager, n.Net, &n.APIServer)
465465
n.APIServer.AddRoute(service, &sync.RWMutex{}, "admin", "", n.HTTPLog)
466466
}
467467
}

snow/engine/avalanche/bootstrapper.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,11 @@ func (b *bootstrapper) process(vtx avalanche.Vertex) error {
147147
numDropped: b.numBSDroppedVtx,
148148
vtx: vtx,
149149
}); err == nil {
150-
b.numFetched++ // Progress tracker
151150
b.numBSBlockedVtx.Inc()
151+
b.numFetched++ // Progress tracker
152+
if b.numFetched%common.StatusUpdateFrequency == 0 {
153+
b.BootstrapConfig.Context.Log.Info("fetched %d vertices", b.numFetched)
154+
}
152155
} else {
153156
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked: %s", err)
154157
}
@@ -169,10 +172,6 @@ func (b *bootstrapper) process(vtx avalanche.Vertex) error {
169172
}
170173
b.processedCache.Put(vtx.ID(), nil)
171174
}
172-
173-
if b.numFetched%common.StatusUpdateFrequency == 0 {
174-
b.BootstrapConfig.Context.Log.Info("fetched %d vertices", b.numFetched)
175-
}
176175
}
177176

178177
if err := b.VtxBlocked.Commit(); err != nil {

snow/engine/snowman/bootstrapper.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ type bootstrapper struct {
3535
// true if all of the vertices in the original accepted frontier have been processed
3636
processedStartingAcceptedFrontier bool
3737

38-
// Number of blocks processed
39-
numProcessed uint32
38+
// Number of blocks fetched
39+
numFetched uint32
4040

4141
// tracks which validators were asked for which containers in which requests
4242
outstandingRequests common.Requests
@@ -183,16 +183,16 @@ func (b *bootstrapper) process(blk snowman.Block) error {
183183
status := blk.Status()
184184
blkID := blk.ID()
185185
for status == choices.Processing {
186-
b.numProcessed++ // Progress tracker
187-
if b.numProcessed%common.StatusUpdateFrequency == 0 { // Periodically print progress
188-
b.BootstrapConfig.Context.Log.Info("processed %d blocks", b.numProcessed)
189-
}
190186
if err := b.Blocked.Push(&blockJob{
191187
numAccepted: b.numBootstrapped,
192188
numDropped: b.numDropped,
193189
blk: blk,
194190
}); err == nil {
195191
b.numBlocked.Inc()
192+
b.numFetched++ // Progress tracker
193+
if b.numFetched%common.StatusUpdateFrequency == 0 { // Periodically print progress
194+
b.BootstrapConfig.Context.Log.Info("fetched %d blocks", b.numFetched)
195+
}
196196
}
197197

198198
if err := b.Blocked.Commit(); err != nil {

0 commit comments

Comments
 (0)