Skip to content

Commit 91c34d2

Browse files
committed
add sdk router to network
1 parent 3f5dc8a commit 91c34d2

File tree

3 files changed

+109
-39
lines changed

3 files changed

+109
-39
lines changed

peer/network.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212

1313
"golang.org/x/sync/semaphore"
1414

15+
"github.com/ava-labs/avalanchego/network/p2p"
16+
1517
"github.com/ethereum/go-ethereum/log"
1618

1719
"github.com/ava-labs/avalanchego/codec"
@@ -87,23 +89,25 @@ type network struct {
8789
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
8890
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
8991
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
90-
appSender common.AppSender // avalanchego AppSender for sending messages
91-
codec codec.Manager // Codec used for parsing messages
92-
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
93-
appRequestHandler message.RequestHandler // maps request type => handler
94-
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
95-
gossipHandler message.GossipHandler // maps gossip type => handler
96-
peers *peerTracker // tracking of peers & bandwidth
97-
appStats stats.RequestHandlerStats // Provide request handler metrics
98-
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
92+
router *p2p.Router
93+
appSender common.AppSender // avalanchego AppSender for sending messages
94+
codec codec.Manager // Codec used for parsing messages
95+
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
96+
appRequestHandler message.RequestHandler // maps request type => handler
97+
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
98+
gossipHandler message.GossipHandler // maps gossip type => handler
99+
peers *peerTracker // tracking of peers & bandwidth
100+
appStats stats.RequestHandlerStats // Provide request handler metrics
101+
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
99102

100103
// Set to true when Shutdown is called, after which all operations on this
101104
// struct are no-ops.
102105
closed utils.Atomic[bool]
103106
}
104107

105-
func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
108+
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
106109
return &network{
110+
router: router,
107111
appSender: appSender,
108112
codec: codec,
109113
crossChainCodec: crossChainCodec,
@@ -335,8 +339,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
335339

336340
var req message.Request
337341
if _, err := n.codec.Unmarshal(request, &req); err != nil {
338-
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
339-
return nil
342+
log.Debug("forwarding unknown AppRequest to sdk", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
343+
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
340344
}
341345

342346
bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
@@ -366,7 +370,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
366370
// Error returned by this function is expected to be treated as fatal by the engine
367371
// If [requestID] is not known, this function will emit a log and return a nil error.
368372
// If the response handler returns an error it is propagated as a fatal error.
369-
func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
373+
func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
370374
n.lock.Lock()
371375
defer n.lock.Unlock()
372376

@@ -378,9 +382,8 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
378382

379383
handler, exists := n.markRequestFulfilled(requestID)
380384
if !exists {
381-
// Should never happen since the engine should be managing outstanding requests
382-
log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
383-
return nil
385+
log.Debug("forwarding unknown AppResponse to sdk", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
386+
return n.router.AppResponse(ctx, nodeID, requestID, response)
384387
}
385388

386389
// We must release the slot
@@ -395,7 +398,7 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
395398
// - request times out before a response is provided
396399
// error returned by this function is expected to be treated as fatal by the engine
397400
// returns error only when the response handler returns an error
398-
func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error {
401+
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
399402
n.lock.Lock()
400403
defer n.lock.Unlock()
401404

@@ -407,9 +410,8 @@ func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, request
407410

408411
handler, exists := n.markRequestFulfilled(requestID)
409412
if !exists {
410-
// Should never happen since the engine should be managing outstanding requests
411-
log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)
412-
return nil
413+
log.Debug("forwarding unknown AppRequestFailed to sdk", "nodeID", nodeID, "requestID", requestID)
414+
return n.router.AppRequestFailed(ctx, nodeID, requestID)
413415
}
414416

415417
// We must release the slot

peer/network_test.go

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/ava-labs/avalanchego/network/p2p"
1516
"github.com/ava-labs/avalanchego/snow/engine/common"
17+
"github.com/ava-labs/avalanchego/utils/logging"
1618
"github.com/ava-labs/avalanchego/utils/set"
1719
ethcommon "github.com/ethereum/go-ethereum/common"
1820

@@ -49,11 +51,13 @@ var (
4951

5052
_ message.CrossChainRequest = &ExampleCrossChainRequest{}
5153
_ message.CrossChainRequestHandler = &testCrossChainHandler{}
54+
55+
_ p2p.Handler = &testSDKHandler{}
5256
)
5357

5458
func TestNetworkDoesNotConnectToItself(t *testing.T) {
5559
selfNodeID := ids.GenerateTestNodeID()
56-
n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1)
60+
n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), nil, nil, nil, selfNodeID, 1, 1)
5761
assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion))
5862
assert.EqualValues(t, 0, n.Size())
5963
}
@@ -89,7 +93,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) {
8993

9094
codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
9195
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
92-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
96+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
9397
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
9498
client := NewNetworkClient(net)
9599
nodeID := ids.GenerateTestNodeID()
@@ -164,7 +168,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) {
164168

165169
codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
166170
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
167-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
171+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
168172
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
169173
client := NewNetworkClient(net)
170174

@@ -244,7 +248,7 @@ func TestAppRequestOnShutdown(t *testing.T) {
244248

245249
codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
246250
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
247-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
251+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
248252
client := NewNetworkClient(net)
249253
nodeID := ids.GenerateTestNodeID()
250254
require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion))
@@ -293,7 +297,7 @@ func TestRequestMinVersion(t *testing.T) {
293297
}
294298

295299
// passing nil as codec works because the net.AppRequest is never called
296-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
300+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
297301
client := NewNetworkClient(net)
298302
requestMessage := TestMessage{Message: "this is a request"}
299303
requestBytes, err := message.RequestToBytes(codecManager, requestMessage)
@@ -356,7 +360,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) {
356360
processingDuration: 500 * time.Millisecond,
357361
}
358362

359-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
363+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
360364
net.SetRequestHandler(requestHandler)
361365
nodeID := ids.GenerateTestNodeID()
362366

@@ -396,7 +400,7 @@ func TestGossip(t *testing.T) {
396400
}
397401

398402
gossipHandler := &testGossipHandler{}
399-
clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
403+
clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
400404
clientNetwork.SetGossipHandler(gossipHandler)
401405

402406
assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion))
@@ -423,7 +427,7 @@ func TestHandleInvalidMessages(t *testing.T) {
423427
requestID := uint32(1)
424428
sender := testAppSender{}
425429

426-
clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
430+
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
427431
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
428432
clientNetwork.SetRequestHandler(&testRequestHandler{})
429433

@@ -457,12 +461,11 @@ func TestHandleInvalidMessages(t *testing.T) {
457461
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), garbageResponse))
458462
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), emptyResponse))
459463
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), nilResponse))
460-
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
461-
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
462-
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
463-
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
464-
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
465-
assert.NoError(t, clientNetwork.AppRequestFailed(context.Background(), nodeID, requestID))
464+
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
465+
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
466+
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
467+
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
468+
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
466469
}
467470

468471
func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
@@ -473,7 +476,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
473476
requestID := uint32(1)
474477
sender := testAppSender{}
475478

476-
clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
479+
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
477480
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
478481
clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler
479482

@@ -513,7 +516,7 @@ func TestCrossChainAppRequest(t *testing.T) {
513516
},
514517
}
515518

516-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
519+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
517520
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
518521
client := NewNetworkClient(net)
519522

@@ -568,7 +571,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) {
568571

569572
codecManager := buildCodec(t, TestMessage{})
570573
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
571-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
574+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
572575
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
573576
client := NewNetworkClient(net)
574577

@@ -628,7 +631,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
628631
}
629632
codecManager := buildCodec(t, TestMessage{})
630633
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
631-
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
634+
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
632635
client := NewNetworkClient(net)
633636

634637
exampleCrossChainRequest := ExampleCrossChainRequest{
@@ -649,6 +652,48 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
649652
require.True(t, called)
650653
}
651654

655+
func TestSDKRouting(t *testing.T) {
656+
require := require.New(t)
657+
sender := &testAppSender{
658+
sendAppRequestFn: func(s set.Set[ids.NodeID], u uint32, bytes []byte) error {
659+
return nil
660+
},
661+
sendAppResponseFn: func(id ids.NodeID, u uint32, bytes []byte) error {
662+
return nil
663+
},
664+
}
665+
protocol := 0
666+
handler := &testSDKHandler{}
667+
router := p2p.NewRouter(logging.NoLog{}, sender)
668+
_, err := router.RegisterAppProtocol(uint64(protocol), handler)
669+
require.NoError(err)
670+
671+
networkCodec := codec.NewManager(0)
672+
crossChainCodec := codec.NewManager(0)
673+
674+
network := NewNetwork(
675+
router,
676+
nil,
677+
networkCodec,
678+
crossChainCodec,
679+
ids.EmptyNodeID,
680+
1,
681+
1,
682+
)
683+
684+
nodeID := ids.GenerateTestNodeID()
685+
foobar := append([]byte{byte(protocol)}, []byte("foobar")...)
686+
err = network.AppRequest(context.Background(), nodeID, 0, time.Time{}, foobar)
687+
require.NoError(err)
688+
require.True(handler.appRequested)
689+
690+
err = network.AppResponse(context.Background(), ids.GenerateTestNodeID(), 0, foobar)
691+
require.ErrorIs(err, p2p.ErrUnrequestedResponse)
692+
693+
err = network.AppRequestFailed(context.Background(), nodeID, 0)
694+
require.ErrorIs(err, p2p.ErrUnrequestedResponse)
695+
}
696+
652697
func buildCodec(t *testing.T, types ...interface{}) codec.Manager {
653698
codecManager := codec.NewDefaultManager()
654699
c := linearcodec.NewDefault()
@@ -850,3 +895,22 @@ type testCrossChainHandler struct {
850895
func (t *testCrossChainHandler) HandleCrossChainRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, exampleRequest message.CrossChainRequest) ([]byte, error) {
851896
return t.codec.Marshal(message.Version, ExampleCrossChainResponse{Response: "this is an example response"})
852897
}
898+
899+
type testSDKHandler struct {
900+
appRequested bool
901+
}
902+
903+
func (t *testSDKHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
904+
// TODO implement me
905+
panic("implement me")
906+
}
907+
908+
func (t *testSDKHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
909+
t.appRequested = true
910+
return nil, nil
911+
}
912+
913+
func (t *testSDKHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) {
914+
// TODO implement me
915+
panic("implement me")
916+
}

plugin/evm/vm.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"time"
1818

1919
avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics"
20+
"github.com/ava-labs/avalanchego/network/p2p"
2021

2122
"github.com/ava-labs/coreth/consensus/dummy"
2223
corethConstants "github.com/ava-labs/coreth/constants"
@@ -276,6 +277,8 @@ type VM struct {
276277
client peer.NetworkClient
277278
networkCodec codec.Manager
278279

280+
router *p2p.Router
281+
279282
// Metrics
280283
multiGatherer avalanchegoMetrics.MultiGatherer
281284

@@ -506,8 +509,9 @@ func (vm *VM) Initialize(
506509
}
507510

508511
// initialize peer network
512+
vm.router = p2p.NewRouter(vm.ctx.Log, appSender)
509513
vm.networkCodec = message.Codec
510-
vm.Network = peer.NewNetwork(appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
514+
vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
511515
vm.client = peer.NewNetworkClient(vm.Network)
512516

513517
if err := vm.initializeChain(lastAcceptedHash); err != nil {

0 commit comments

Comments
 (0)