Skip to content

Commit

Permalink
feat(dot/network): Add warp sync request handler (#4186)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimartiro authored Sep 25, 2024
1 parent 8d277e6 commit 2fdf95a
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 0 deletions.
1 change: 1 addition & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Config struct {
// Service interfaces
BlockState BlockState
Syncer Syncer
WarpSyncProvider WarpSyncProvider
TransactionHandler TransactionHandler

// Used to specify the address broadcasted to other peers, and avoids using pubip.Get
Expand Down
40 changes: 40 additions & 0 deletions dot/network/messages/warp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package messages

import (
"fmt"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
)

// WarpProofRequest is a struct for p2p warp proof request
type WarpProofRequest struct {
Begin common.Hash
}

// Decode decodes the message into a WarpProofRequest
func (wpr *WarpProofRequest) Decode(in []byte) error {
return scale.Unmarshal(in, wpr)
}

// Encode encodes the warp sync request
func (wpr *WarpProofRequest) Encode() ([]byte, error) {
if wpr == nil {
return nil, fmt.Errorf("cannot encode nil WarpProofRequest")
}
return scale.Marshal(*wpr)
}

// String returns the string representation of a WarpProofRequest
func (wpr *WarpProofRequest) String() string {
if wpr == nil {
return "WarpProofRequest=nil"
}

return fmt.Sprintf("WarpProofRequest begin=%v", wpr.Begin)
}

var _ P2PMessage = (*WarpProofRequest)(nil)
55 changes: 55 additions & 0 deletions dot/network/mock_warp_sync_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dot/network/mocks_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ package network
//go:generate mockgen -destination=mock_telemetry_test.go -package $GOPACKAGE . Telemetry
//go:generate mockgen -destination=mock_syncer_test.go -package $GOPACKAGE . Syncer
//go:generate mockgen -destination=mock_block_state_test.go -package $GOPACKAGE . BlockState
//go:generate mockgen -destination=mock_warp_sync_provider_test.go -package $GOPACKAGE . WarpSyncProvider
//go:generate mockgen -destination=mock_transaction_handler_test.go -package $GOPACKAGE . TransactionHandler
//go:generate mockgen -destination=mock_stream_test.go -package $GOPACKAGE github.com/libp2p/go-libp2p/core/network Stream
6 changes: 6 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (

// the following are sub-protocols used by the node
SyncID = "/sync/2"
WarpSyncID = "/sync/warp"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"
Expand Down Expand Up @@ -129,6 +130,7 @@ type Service struct {
blockState BlockState
syncer Syncer
transactionHandler TransactionHandler
warpSyncProvider WarpSyncProvider

// Configuration options
noBootstrap bool
Expand Down Expand Up @@ -215,6 +217,7 @@ func NewService(cfg *Config) (*Service, error) {
noBootstrap: cfg.NoBootstrap,
noMDNS: cfg.NoMDNS,
syncer: cfg.Syncer,
warpSyncProvider: cfg.WarpSyncProvider,
notificationsProtocols: make(map[MessageType]*notificationsProtocol),
lightRequest: make(map[peer.ID]struct{}),
telemetryInterval: cfg.telemetryInterval,
Expand Down Expand Up @@ -252,8 +255,11 @@ func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String())

s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)
s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream)

// register block announce protocol
err := s.RegisterNotificationsProtocol(
Expand Down
79 changes: 79 additions & 0 deletions dot/network/warp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"errors"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/lib/common"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

// WarpSyncProvider is an interface for generating warp sync proofs
type WarpSyncProvider interface {
// Generate proof starting at given block hash. The proof is accumulated until maximum proof
// size is reached.
generate(start common.Hash) (encodedProof []byte, err error)
}

func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) {
// use the backend to generate the warp proof
proof, err := s.warpSyncProvider.generate(req.Begin)
if err != nil {
return nil, err
}
// send the response through pendingResponse channel
return proof, nil
}

func (s *Service) handleWarpSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
return
}

s.readStream(stream, decodeWarpSyncMessage, s.handleWarpSyncMessage, MaxBlockResponseSize)
}

func decodeWarpSyncMessage(in []byte, _ peer.ID, _ bool) (messages.P2PMessage, error) {
msg := new(messages.WarpProofRequest)
err := msg.Decode(in)
return msg, err
}

func (s *Service) handleWarpSyncMessage(stream libp2pnetwork.Stream, msg messages.P2PMessage) error {
if msg == nil {
return nil
}

defer func() {
err := stream.Close()
if err != nil && errors.Is(err, ErrStreamReset) {
logger.Warnf("failed to close stream: %s", err)
}
}()

if req, ok := msg.(*messages.WarpProofRequest); ok {
resp, err := s.handleWarpSyncRequest(*req)
if err != nil {
logger.Debugf("cannot create response for request: %s", err)
return nil
}

if _, err = stream.Write(resp); err != nil {
logger.Debugf("failed to send WarpSyncResponse message to peer %s: %s", stream.Conn().RemotePeer(), err)
return err
}

logger.Debugf("successfully respond with WarpSyncResponse message to peer %s with proof %v",
stream.Conn().RemotePeer(),
resp,
)
} else {
logger.Debugf("received invalid message in warp sync handler: %v", msg)
}

return nil
}
110 changes: 110 additions & 0 deletions dot/network/warp_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2024 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"fmt"
"testing"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
)

func TestDecodeWarpSyncMessage(t *testing.T) {
t.Parallel()

// Basic WarpProofRequest
testWarpReqMessage := &messages.WarpProofRequest{
Begin: common.MustBlake2bHash([]byte("test")),
}

// Test encoding
reqEnc, err := testWarpReqMessage.Encode()
require.NoError(t, err)

//Expected encoded message compared with substrate impl
require.Equal(t, []byte{
0x92, 0x8b, 0x20, 0x36, 0x69, 0x43, 0xe2, 0xaf, 0xd1, 0x1e, 0xbc,
0xe, 0xae, 0x2e, 0x53, 0xa9, 0x3b, 0xf1, 0x77, 0xa4, 0xfc, 0xf3, 0x5b,
0xcc, 0x64, 0xd5, 0x3, 0x70, 0x4e, 0x65, 0xe2, 0x2,
}, reqEnc)

// Test decoding
testPeer := peer.ID("me")
msg, err := decodeWarpSyncMessage(reqEnc, testPeer, true)
require.NoError(t, err)

req, ok := msg.(*messages.WarpProofRequest)
require.True(t, ok)
require.Equal(t, testWarpReqMessage, req)
}

// createServiceWithWarpSyncHelper creates a basic service with warp sync handler support
func createServiceWithWarpSyncHelper(t *testing.T, warpSyncProvider WarpSyncProvider) *Service {
t.Helper()

config := &Config{
BasePath: t.TempDir(),
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
WarpSyncProvider: warpSyncProvider,
}

srvc := createTestService(t, config)
srvc.noGossip = true
handler := newTestStreamHandler(decodeSyncMessage)
srvc.host.registerStreamHandler(srvc.host.protocolID, handler.handleStream)

return srvc
}

func TestHandleWarpSyncRequestOk(t *testing.T) {
t.Parallel()

// Creates warp sync provider mock to generate proofs with the expected result
expectedProof := []byte{0x01}

ctrl := gomock.NewController(t)
warpSyncProvider := NewMockWarpSyncProvider(ctrl)
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(expectedProof, nil).Times(1)

// Initiate service using the warp sync provider mock
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider)

// Handle request and check resulting proof
req := messages.WarpProofRequest{
Begin: common.EmptyHash,
}

resp, err := srvc.handleWarpSyncRequest(req)
require.NoError(t, err)
require.Equal(t, expectedProof, resp)
}

func TestHandleWarpSyncRequestError(t *testing.T) {
t.Parallel()

// Creates warp sync provider mock to generate proofs with the expected erro
expectedError := fmt.Errorf("error generating proof")
ctrl := gomock.NewController(t)

warpSyncProvider := NewMockWarpSyncProvider(ctrl)
warpSyncProvider.EXPECT().generate(common.EmptyHash).Return(nil, expectedError).Times(1)

// Initiate service using the warp sync provider mock
srvc := createServiceWithWarpSyncHelper(t, warpSyncProvider)

// Handle request and check resulting error
req := messages.WarpProofRequest{
Begin: common.EmptyHash,
}

resp, err := srvc.handleWarpSyncRequest(req)
require.Nil(t, resp)
require.ErrorIs(t, err, expectedError)
}
3 changes: 3 additions & 0 deletions dot/rpc/modules/system_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func newNetworkService(t *testing.T) *network.Service {
blockStateMock.EXPECT().
GetHighestFinalisedHeader().
Return(types.NewEmptyHeader(), nil).AnyTimes()
blockStateMock.EXPECT().
GenesisHash().
Return(common.MustBlake2bHash([]byte("genesis"))).AnyTimes()

syncerMock := NewMockSyncer(ctrl)

Expand Down

0 comments on commit 2fdf95a

Please sign in to comment.