Skip to content

Commit 6026279

Browse files
Add p2p sdk (#1799)
Co-authored-by: Stephen Buttolph <stephen@avalabs.org>
1 parent 6b87540 commit 6026279

File tree

12 files changed

+1430
-9
lines changed

12 files changed

+1430
-9
lines changed

network/p2p/client.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package p2p
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
11+
"github.com/ava-labs/avalanchego/ids"
12+
"github.com/ava-labs/avalanchego/snow/engine/common"
13+
"github.com/ava-labs/avalanchego/utils/set"
14+
)
15+
16+
var (
17+
ErrAppRequestFailed = errors.New("app request failed")
18+
ErrRequestPending = errors.New("request pending")
19+
ErrNoPeers = errors.New("no peers")
20+
)
21+
22+
// AppResponseCallback is called upon receiving an AppResponse for an AppRequest
23+
// issued by Client.
24+
// Callers should check [err] to see whether the AppRequest failed or not.
25+
type AppResponseCallback func(
26+
nodeID ids.NodeID,
27+
responseBytes []byte,
28+
err error,
29+
)
30+
31+
// CrossChainAppResponseCallback is called upon receiving an
32+
// CrossChainAppResponse for a CrossChainAppRequest issued by Client.
33+
// Callers should check [err] to see whether the AppRequest failed or not.
34+
type CrossChainAppResponseCallback func(
35+
chainID ids.ID,
36+
responseBytes []byte,
37+
err error,
38+
)
39+
40+
type Client struct {
41+
handlerPrefix []byte
42+
router *Router
43+
sender common.AppSender
44+
}
45+
46+
// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
47+
// If a specific node needs to be requested, use AppRequest instead.
48+
// See AppRequest for more docs.
49+
func (c *Client) AppRequestAny(
50+
ctx context.Context,
51+
appRequestBytes []byte,
52+
onResponse AppResponseCallback,
53+
) error {
54+
c.router.lock.RLock()
55+
peers := c.router.peers.Sample(1)
56+
c.router.lock.RUnlock()
57+
58+
if len(peers) != 1 {
59+
return ErrNoPeers
60+
}
61+
62+
nodeIDs := set.Set[ids.NodeID]{
63+
peers[0]: struct{}{},
64+
}
65+
return c.AppRequest(ctx, nodeIDs, appRequestBytes, onResponse)
66+
}
67+
68+
// AppRequest issues an arbitrary request to a node.
69+
// [onResponse] is invoked upon an error or a response.
70+
func (c *Client) AppRequest(
71+
ctx context.Context,
72+
nodeIDs set.Set[ids.NodeID],
73+
appRequestBytes []byte,
74+
onResponse AppResponseCallback,
75+
) error {
76+
c.router.lock.Lock()
77+
defer c.router.lock.Unlock()
78+
79+
appRequestBytes = c.prefixMessage(appRequestBytes)
80+
for nodeID := range nodeIDs {
81+
requestID := c.router.requestID
82+
if _, ok := c.router.pendingAppRequests[requestID]; ok {
83+
return fmt.Errorf(
84+
"failed to issue request with request id %d: %w",
85+
requestID,
86+
ErrRequestPending,
87+
)
88+
}
89+
90+
if err := c.sender.SendAppRequest(
91+
ctx,
92+
set.Set[ids.NodeID]{nodeID: struct{}{}},
93+
requestID,
94+
appRequestBytes,
95+
); err != nil {
96+
return err
97+
}
98+
99+
c.router.pendingAppRequests[requestID] = onResponse
100+
c.router.requestID++
101+
}
102+
103+
return nil
104+
}
105+
106+
// AppGossip sends a gossip message to a random set of peers.
107+
func (c *Client) AppGossip(
108+
ctx context.Context,
109+
appGossipBytes []byte,
110+
) error {
111+
return c.sender.SendAppGossip(
112+
ctx,
113+
c.prefixMessage(appGossipBytes),
114+
)
115+
}
116+
117+
// AppGossipSpecific sends a gossip message to a predetermined set of peers.
118+
func (c *Client) AppGossipSpecific(
119+
ctx context.Context,
120+
nodeIDs set.Set[ids.NodeID],
121+
appGossipBytes []byte,
122+
) error {
123+
return c.sender.SendAppGossipSpecific(
124+
ctx,
125+
nodeIDs,
126+
c.prefixMessage(appGossipBytes),
127+
)
128+
}
129+
130+
// CrossChainAppRequest sends a cross chain app request to another vm.
131+
// [onResponse] is invoked upon an error or a response.
132+
func (c *Client) CrossChainAppRequest(
133+
ctx context.Context,
134+
chainID ids.ID,
135+
appRequestBytes []byte,
136+
onResponse CrossChainAppResponseCallback,
137+
) error {
138+
c.router.lock.Lock()
139+
defer c.router.lock.Unlock()
140+
141+
requestID := c.router.requestID
142+
if _, ok := c.router.pendingCrossChainAppRequests[requestID]; ok {
143+
return fmt.Errorf(
144+
"failed to issue request with request id %d: %w",
145+
requestID,
146+
ErrRequestPending,
147+
)
148+
}
149+
150+
if err := c.sender.SendCrossChainAppRequest(
151+
ctx,
152+
chainID,
153+
c.router.requestID,
154+
c.prefixMessage(appRequestBytes),
155+
); err != nil {
156+
return err
157+
}
158+
159+
c.router.pendingCrossChainAppRequests[requestID] = onResponse
160+
c.router.requestID++
161+
162+
return nil
163+
}
164+
165+
// prefixMessage prefixes the original message with the handler identifier
166+
// corresponding to this client.
167+
//
168+
// Only gossip and request messages need to be prefixed.
169+
// Response messages don't need to be prefixed because request ids are tracked
170+
// which map to the expected response handler.
171+
func (c *Client) prefixMessage(src []byte) []byte {
172+
messageBytes := make([]byte, len(c.handlerPrefix)+len(src))
173+
copy(messageBytes, c.handlerPrefix)
174+
copy(messageBytes[len(c.handlerPrefix):], src)
175+
return messageBytes
176+
}

network/p2p/handler.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package p2p
5+
6+
import (
7+
"context"
8+
"time"
9+
10+
"go.uber.org/zap"
11+
12+
"github.com/ava-labs/avalanchego/ids"
13+
"github.com/ava-labs/avalanchego/message"
14+
"github.com/ava-labs/avalanchego/snow/engine/common"
15+
"github.com/ava-labs/avalanchego/utils/logging"
16+
)
17+
18+
// Handler is the server-side logic for virtual machine application protocols.
19+
type Handler interface {
20+
// AppGossip is called when handling an AppGossip message.
21+
AppGossip(
22+
ctx context.Context,
23+
nodeID ids.NodeID,
24+
gossipBytes []byte,
25+
) error
26+
// AppRequest is called when handling an AppRequest message.
27+
// Returns the bytes for the response corresponding to [requestBytes]
28+
AppRequest(
29+
ctx context.Context,
30+
nodeID ids.NodeID,
31+
deadline time.Time,
32+
requestBytes []byte,
33+
) ([]byte, error)
34+
// CrossChainAppRequest is called when handling a CrossChainAppRequest
35+
// message.
36+
// Returns the bytes for the response corresponding to [requestBytes]
37+
CrossChainAppRequest(
38+
ctx context.Context,
39+
chainID ids.ID,
40+
deadline time.Time,
41+
requestBytes []byte,
42+
) ([]byte, error)
43+
}
44+
45+
// responder automatically sends the response for a given request
46+
type responder struct {
47+
handlerID uint64
48+
handler Handler
49+
log logging.Logger
50+
sender common.AppSender
51+
}
52+
53+
func (r *responder) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error {
54+
appResponse, err := r.handler.AppRequest(ctx, nodeID, deadline, request)
55+
if err != nil {
56+
r.log.Debug("failed to handle message",
57+
zap.Stringer("messageOp", message.AppRequestOp),
58+
zap.Stringer("nodeID", nodeID),
59+
zap.Uint32("requestID", requestID),
60+
zap.Time("deadline", deadline),
61+
zap.Uint64("handlerID", r.handlerID),
62+
zap.Binary("message", request),
63+
)
64+
return nil
65+
}
66+
67+
return r.sender.SendAppResponse(ctx, nodeID, requestID, appResponse)
68+
}
69+
70+
func (r *responder) AppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error {
71+
err := r.handler.AppGossip(ctx, nodeID, msg)
72+
if err != nil {
73+
r.log.Debug("failed to handle message",
74+
zap.Stringer("messageOp", message.AppGossipOp),
75+
zap.Stringer("nodeID", nodeID),
76+
zap.Uint64("handlerID", r.handlerID),
77+
zap.Binary("message", msg),
78+
)
79+
}
80+
return nil
81+
}
82+
83+
func (r *responder) CrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, deadline time.Time, request []byte) error {
84+
appResponse, err := r.handler.CrossChainAppRequest(ctx, chainID, deadline, request)
85+
if err != nil {
86+
r.log.Debug("failed to handle message",
87+
zap.Stringer("messageOp", message.CrossChainAppRequestOp),
88+
zap.Stringer("chainID", chainID),
89+
zap.Uint32("requestID", requestID),
90+
zap.Time("deadline", deadline),
91+
zap.Uint64("handlerID", r.handlerID),
92+
zap.Binary("message", request),
93+
)
94+
return nil
95+
}
96+
97+
return r.sender.SendCrossChainAppResponse(ctx, chainID, requestID, appResponse)
98+
}

network/p2p/mocks/mock_handler.go

Lines changed: 84 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)