Skip to content

Commit 9974a69

Browse files
committed
migrate x/sync to p2p
Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>
1 parent 8545a4c commit 9974a69

File tree

7 files changed

+155
-163
lines changed

7 files changed

+155
-163
lines changed

network/p2p/p2ptest/client.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,19 @@ import (
2121

2222
// NewClient generates a client-server pair and returns the client used to
2323
// communicate with a server with the specified handler
24-
func NewClient(
25-
t *testing.T,
26-
ctx context.Context,
27-
handler p2p.Handler,
28-
clientNodeID ids.NodeID,
29-
serverNodeID ids.NodeID,
30-
) *p2p.Client {
24+
func NewClient(t *testing.T, rootCtx context.Context, handler p2p.Handler) *p2p.Client {
3125
clientSender := &enginetest.Sender{}
3226
serverSender := &enginetest.Sender{}
3327

28+
clientNodeID := ids.GenerateTestNodeID()
3429
clientNetwork, err := p2p.NewNetwork(logging.NoLog{}, clientSender, prometheus.NewRegistry(), "")
3530
require.NoError(t, err)
3631

32+
serverNodeID := ids.GenerateTestNodeID()
3733
serverNetwork, err := p2p.NewNetwork(logging.NoLog{}, serverSender, prometheus.NewRegistry(), "")
3834
require.NoError(t, err)
3935

4036
clientSender.SendAppGossipF = func(ctx context.Context, _ common.SendConfig, gossipBytes []byte) error {
41-
// Send the request asynchronously to avoid deadlock when the server
42-
// sends the response back to the client
4337
go func() {
4438
require.NoError(t, serverNetwork.AppGossip(ctx, clientNodeID, gossipBytes))
4539
}()
@@ -58,8 +52,6 @@ func NewClient(
5852
}
5953

6054
serverSender.SendAppResponseF = func(ctx context.Context, _ ids.NodeID, requestID uint32, responseBytes []byte) error {
61-
// Send the request asynchronously to avoid deadlock when the server
62-
// sends the response back to the client
6355
go func() {
6456
require.NoError(t, clientNetwork.AppResponse(ctx, serverNodeID, requestID, responseBytes))
6557
}()
@@ -68,8 +60,6 @@ func NewClient(
6860
}
6961

7062
serverSender.SendAppErrorF = func(ctx context.Context, _ ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error {
71-
// Send the request asynchronously to avoid deadlock when the server
72-
// sends the response back to the client
7363
go func() {
7464
require.NoError(t, clientNetwork.AppRequestFailed(ctx, serverNodeID, requestID, &common.AppError{
7565
Code: errorCode,
@@ -80,10 +70,10 @@ func NewClient(
8070
return nil
8171
}
8272

83-
require.NoError(t, clientNetwork.Connected(ctx, clientNodeID, nil))
84-
require.NoError(t, clientNetwork.Connected(ctx, serverNodeID, nil))
85-
require.NoError(t, serverNetwork.Connected(ctx, clientNodeID, nil))
86-
require.NoError(t, serverNetwork.Connected(ctx, serverNodeID, nil))
73+
require.NoError(t, clientNetwork.Connected(rootCtx, clientNodeID, nil))
74+
require.NoError(t, clientNetwork.Connected(rootCtx, serverNodeID, nil))
75+
require.NoError(t, serverNetwork.Connected(rootCtx, clientNodeID, nil))
76+
require.NoError(t, serverNetwork.Connected(rootCtx, serverNodeID, nil))
8777

8878
require.NoError(t, serverNetwork.AddHandler(0, handler))
8979
return clientNetwork.NewClient(0)

network/p2p/p2ptest/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestNewClient_AppGossip(t *testing.T) {
2727
},
2828
}
2929

30-
client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID())
30+
client := NewClient(t, ctx, testHandler)
3131
require.NoError(client.AppGossip(ctx, common.SendConfig{}, []byte("foobar")))
3232
<-appGossipChan
3333
}
@@ -94,7 +94,7 @@ func TestNewClient_AppRequest(t *testing.T) {
9494
},
9595
}
9696

97-
client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID())
97+
client := NewClient(t, ctx, testHandler)
9898
require.NoError(tt.appRequestF(
9999
ctx,
100100
client,

x/sync/client_test.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ func newDefaultDBConfig() merkledb.Config {
3838
}
3939
}
4040

41-
func newFlakyRangeProofHandler(
41+
func newModifiedRangeProofHandler(
4242
t *testing.T,
4343
db merkledb.MerkleDB,
4444
modifyResponse func(response *merkledb.RangeProof),
4545
) p2p.Handler {
46-
handler := NewGetRangeProofHandler(logging.NoLog{}, db)
46+
handler := NewSyncGetRangeProofHandler(logging.NoLog{}, db)
4747

4848
c := counter{m: 2}
4949
return &p2p.TestHandler{
@@ -74,12 +74,12 @@ func newFlakyRangeProofHandler(
7474
}
7575
}
7676

77-
func newFlakyChangeProofHandler(
77+
func newModifiedChangeProofHandler(
7878
t *testing.T,
7979
db merkledb.MerkleDB,
8080
modifyResponse func(response *merkledb.ChangeProof),
8181
) p2p.Handler {
82-
handler := NewGetChangeProofHandler(logging.NoLog{}, db)
82+
handler := NewSyncGetChangeProofHandler(logging.NoLog{}, db)
8383

8484
c := counter{m: 2}
8585
return &p2p.TestHandler{
@@ -145,14 +145,3 @@ func (c *counter) Inc() int {
145145
c.i++
146146
return result
147147
}
148-
149-
type waitingHandler struct {
150-
p2p.NoOpHandler
151-
handler p2p.Handler
152-
updatedRootChan chan struct{}
153-
}
154-
155-
func (w *waitingHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) {
156-
<-w.updatedRootChan
157-
return w.handler.AppRequest(ctx, nodeID, deadline, requestBytes)
158-
}

x/sync/manager.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ var (
4141
ErrAlreadyStarted = errors.New("cannot start a Manager that has already been started")
4242
ErrAlreadyClosed = errors.New("Manager is closed")
4343
ErrNoRangeProofClientProvided = errors.New("range proof client is a required field of the sync config")
44-
ErrNoChangeProofClientProvided = errors.New("change proof client is a required field of the sync config")
44+
ErrNoChangeProofClientProvided = errors.New("change proofclient is a required field of the sync config")
4545
ErrNoDatabaseProvided = errors.New("sync database is a required field of the sync config")
4646
ErrNoLogProvided = errors.New("log is a required field of the sync config")
4747
ErrZeroWorkLimit = errors.New("simultaneous work limit must be greater than 0")
@@ -305,12 +305,7 @@ func (m *Manager) doWork(ctx context.Context, work *workItem) {
305305
return
306306
}
307307

308-
select {
309-
case <-ctx.Done():
310-
m.finishWorkItem()
311-
return
312-
case <-time.After(waitTime):
313-
}
308+
<-time.After(waitTime)
314309

315310
if work.localRootID == ids.Empty {
316311
// the keys in this range have not been downloaded, so get all key/values
@@ -373,8 +368,7 @@ func (m *Manager) requestChangeProof(ctx context.Context, work *workItem) {
373368
defer m.finishWorkItem()
374369

375370
if err := m.handleChangeProofResponse(ctx, targetRootID, work, request, responseBytes, err); err != nil {
376-
// TODO log responses
377-
m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request))
371+
m.config.Log.Debug("dropping response", zap.Error(err))
378372
m.retryWork(work)
379373
return
380374
}
@@ -431,8 +425,7 @@ func (m *Manager) requestRangeProof(ctx context.Context, work *workItem) {
431425
defer m.finishWorkItem()
432426

433427
if err := m.handleRangeProofResponse(ctx, targetRootID, work, request, responseBytes, appErr); err != nil {
434-
// TODO log responses
435-
m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request))
428+
m.config.Log.Debug("dropping response", zap.Error(err))
436429
m.retryWork(work)
437430
return
438431
}
@@ -468,11 +461,10 @@ func (m *Manager) retryWork(work *workItem) {
468461
m.workLock.Lock()
469462
m.unprocessedWork.Insert(work)
470463
m.workLock.Unlock()
471-
m.unprocessedWorkCond.Signal()
472464
}
473465

474466
// Returns an error if we should drop the response
475-
func (m *Manager) shouldHandleResponse(
467+
func (m *Manager) handleResponse(
476468
bytesLimit uint32,
477469
responseBytes []byte,
478470
err error,
@@ -507,7 +499,7 @@ func (m *Manager) handleRangeProofResponse(
507499
responseBytes []byte,
508500
err error,
509501
) error {
510-
if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil {
502+
if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil {
511503
return err
512504
}
513505

@@ -558,7 +550,7 @@ func (m *Manager) handleChangeProofResponse(
558550
responseBytes []byte,
559551
err error,
560552
) error {
561-
if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil {
553+
if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil {
562554
return err
563555
}
564556

@@ -614,6 +606,7 @@ func (m *Manager) handleChangeProofResponse(
614606

615607
m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, changeProof.EndProof)
616608
case *pb.SyncGetChangeProofResponse_RangeProof:
609+
617610
var rangeProof merkledb.RangeProof
618611
if err := rangeProof.UnmarshalProto(changeProofResp.RangeProof); err != nil {
619612
return err

x/sync/network_server.go

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ var (
4949
errInvalidBounds = errors.New("start key is greater than end key")
5050
errInvalidRootHash = fmt.Errorf("root hash must have length %d", hashing.HashLen)
5151

52-
_ p2p.Handler = (*GetChangeProofHandler)(nil)
53-
_ p2p.Handler = (*GetRangeProofHandler)(nil)
52+
_ p2p.Handler = (*SyncGetChangeProofHandler)(nil)
53+
_ p2p.Handler = (*SyncGetRangeProofHandler)(nil)
5454
)
5555

5656
func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] {
@@ -60,30 +60,30 @@ func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] {
6060
return maybe.Nothing[[]byte]()
6161
}
6262

63-
func NewGetChangeProofHandler(log logging.Logger, db DB) *GetChangeProofHandler {
64-
return &GetChangeProofHandler{
63+
func NewSyncGetChangeProofHandler(log logging.Logger, db DB) *SyncGetChangeProofHandler {
64+
return &SyncGetChangeProofHandler{
6565
log: log,
6666
db: db,
6767
}
6868
}
6969

70-
type GetChangeProofHandler struct {
70+
type SyncGetChangeProofHandler struct {
7171
log logging.Logger
7272
db DB
7373
}
7474

75-
func (*GetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {}
75+
func (*SyncGetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {}
7676

77-
func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) {
78-
req := &pb.SyncGetChangeProofRequest{}
79-
if err := proto.Unmarshal(requestBytes, req); err != nil {
77+
func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) {
78+
request := &pb.SyncGetChangeProofRequest{}
79+
if err := proto.Unmarshal(requestBytes, request); err != nil {
8080
return nil, &common.AppError{
8181
Code: p2p.ErrUnexpected.Code,
8282
Message: fmt.Sprintf("failed to unmarshal request: %s", err),
8383
}
8484
}
8585

86-
if err := validateChangeProofRequest(req); err != nil {
86+
if err := validateChangeProofRequest(request); err != nil {
8787
return nil, &common.AppError{
8888
Code: p2p.ErrUnexpected.Code,
8989
Message: fmt.Sprintf("invalid request: %s", err),
@@ -92,21 +92,21 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _
9292

9393
// override limits if they exceed caps
9494
var (
95-
keyLimit = min(req.KeyLimit, maxKeyValuesLimit)
96-
bytesLimit = min(int(req.BytesLimit), maxByteSizeLimit)
97-
start = maybeBytesToMaybe(req.StartKey)
98-
end = maybeBytesToMaybe(req.EndKey)
95+
keyLimit = min(request.KeyLimit, maxKeyValuesLimit)
96+
bytesLimit = min(int(request.BytesLimit), maxByteSizeLimit)
97+
start = maybeBytesToMaybe(request.StartKey)
98+
end = maybeBytesToMaybe(request.EndKey)
9999
)
100100

101-
startRoot, err := ids.ToID(req.StartRootHash)
101+
startRoot, err := ids.ToID(request.StartRootHash)
102102
if err != nil {
103103
return nil, &common.AppError{
104104
Code: p2p.ErrUnexpected.Code,
105105
Message: fmt.Sprintf("failed to parse start root hash: %s", err),
106106
}
107107
}
108108

109-
endRoot, err := ids.ToID(req.EndRootHash)
109+
endRoot, err := ids.ToID(request.EndRootHash)
110110
if err != nil {
111111
return nil, &common.AppError{
112112
Code: p2p.ErrUnexpected.Code,
@@ -120,7 +120,6 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _
120120
if !errors.Is(err, merkledb.ErrInsufficientHistory) {
121121
// We should only fail to get a change proof if we have insufficient history.
122122
// Other errors are unexpected.
123-
// TODO define custom errors
124123
return nil, &common.AppError{
125124
Code: p2p.ErrUnexpected.Code,
126125
Message: fmt.Sprintf("failed to get change proof: %s", err),
@@ -141,11 +140,11 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _
141140
ctx,
142141
g.db,
143142
&pb.SyncGetRangeProofRequest{
144-
RootHash: req.EndRootHash,
145-
StartKey: req.StartKey,
146-
EndKey: req.EndKey,
147-
KeyLimit: req.KeyLimit,
148-
BytesLimit: req.BytesLimit,
143+
RootHash: request.EndRootHash,
144+
StartKey: request.StartKey,
145+
EndKey: request.EndKey,
146+
KeyLimit: request.KeyLimit,
147+
BytesLimit: request.BytesLimit,
149148
},
150149
func(rangeProof *merkledb.RangeProof) ([]byte, error) {
151150
return proto.Marshal(&pb.SyncGetChangeProofResponse{
@@ -192,44 +191,48 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _
192191
}
193192
}
194193

195-
func NewGetRangeProofHandler(log logging.Logger, db DB) *GetRangeProofHandler {
196-
return &GetRangeProofHandler{
194+
func (*SyncGetChangeProofHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []byte) ([]byte, error) {
195+
return nil, nil
196+
}
197+
198+
func NewSyncGetRangeProofHandler(log logging.Logger, db DB) *SyncGetRangeProofHandler {
199+
return &SyncGetRangeProofHandler{
197200
log: log,
198201
db: db,
199202
}
200203
}
201204

202-
type GetRangeProofHandler struct {
205+
type SyncGetRangeProofHandler struct {
203206
log logging.Logger
204207
db DB
205208
}
206209

207-
func (*GetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {}
210+
func (*SyncGetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {}
208211

209-
func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) {
210-
req := &pb.SyncGetRangeProofRequest{}
211-
if err := proto.Unmarshal(requestBytes, req); err != nil {
212+
func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) {
213+
request := &pb.SyncGetRangeProofRequest{}
214+
if err := proto.Unmarshal(requestBytes, request); err != nil {
212215
return nil, &common.AppError{
213216
Code: p2p.ErrUnexpected.Code,
214217
Message: fmt.Sprintf("failed to unmarshal request: %s", err),
215218
}
216219
}
217220

218-
if err := validateRangeProofRequest(req); err != nil {
221+
if err := validateRangeProofRequest(request); err != nil {
219222
return nil, &common.AppError{
220223
Code: p2p.ErrUnexpected.Code,
221224
Message: fmt.Sprintf("invalid range proof request: %s", err),
222225
}
223226
}
224227

225228
// override limits if they exceed caps
226-
req.KeyLimit = min(req.KeyLimit, maxKeyValuesLimit)
227-
req.BytesLimit = min(req.BytesLimit, maxByteSizeLimit)
229+
request.KeyLimit = min(request.KeyLimit, maxKeyValuesLimit)
230+
request.BytesLimit = min(request.BytesLimit, maxByteSizeLimit)
228231

229232
proofBytes, err := getRangeProof(
230233
ctx,
231-
g.db,
232-
req,
234+
s.db,
235+
request,
233236
func(rangeProof *merkledb.RangeProof) ([]byte, error) {
234237
return proto.Marshal(rangeProof.ToProto())
235238
},
@@ -244,6 +247,10 @@ func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ t
244247
return proofBytes, nil
245248
}
246249

250+
func (*SyncGetRangeProofHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []byte) ([]byte, error) {
251+
return nil, nil
252+
}
253+
247254
// Get the range proof specified by [req].
248255
// If the generated proof is too large, the key limit is reduced
249256
// and the proof is regenerated. This process is repeated until

0 commit comments

Comments
 (0)