Skip to content

Commit e4e9cd4

Browse files
committed
Parallelize BatchedParseBlock
Make BatchedParseBlock parse blocks (and verify their signature) in parallel. Prior to this code change, batch parsing of blocks in the proposer VM would only work if the wrapped VMs support batch processing. Now, if the wrapped VM doesn't support batch processing, it simply parses them one by one, but the proposer VM still parses in parallel. Signed-off-by: Yacov Manevich <yacov.manevich@avalabs.org>
1 parent f6701a7 commit e4e9cd4

File tree

4 files changed

+184
-9
lines changed

4 files changed

+184
-9
lines changed

vms/proposervm/batched_vm.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ func (vm *VM) GetAncestors(
8181
}
8282

8383
func (vm *VM) BatchedParseBlock(ctx context.Context, blks [][]byte) ([]snowman.Block, error) {
84-
if vm.batchedVM == nil {
85-
return nil, block.ErrRemoteVMNotImplemented
86-
}
87-
8884
type partialData struct {
8985
index int
9086
block statelessblock.Block
@@ -97,9 +93,11 @@ func (vm *VM) BatchedParseBlock(ctx context.Context, blks [][]byte) ([]snowman.B
9793
statelessBlockDescs = make([]partialData, 0, len(blks))
9894
innerBlockBytes = make([][]byte, 0, len(blks))
9995
)
96+
97+
parsingResults := statelessblock.ParseBlocks(blks, vm.ctx.ChainID)
98+
10099
for ; blocksIndex < len(blks); blocksIndex++ {
101-
blkBytes := blks[blocksIndex]
102-
statelessBlock, err := statelessblock.Parse(blkBytes, vm.ctx.ChainID)
100+
statelessBlock, err := parsingResults[blocksIndex].Block, parsingResults[blocksIndex].Err
103101
if err != nil {
104102
break
105103
}
@@ -120,7 +118,7 @@ func (vm *VM) BatchedParseBlock(ctx context.Context, blks [][]byte) ([]snowman.B
120118
innerBlockBytes = append(innerBlockBytes, blks[blocksIndex:]...)
121119

122120
// parse all inner blocks at once
123-
innerBlks, err := vm.batchedVM.BatchedParseBlock(ctx, innerBlockBytes)
121+
innerBlks, err := block.BatchedParseBlock(ctx, vm.ChainVM, innerBlockBytes)
124122
if err != nil {
125123
return nil, err
126124
}

vms/proposervm/batched_vm_test.go

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ package proposervm
66
import (
77
"bytes"
88
"context"
9+
"crypto"
10+
"encoding/binary"
11+
"reflect"
912
"testing"
1013
"time"
1114

@@ -23,7 +26,10 @@ import (
2326
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
2427
"github.com/ava-labs/avalanchego/snow/snowtest"
2528
"github.com/ava-labs/avalanchego/snow/validators"
29+
"github.com/ava-labs/avalanchego/staking"
2630
"github.com/ava-labs/avalanchego/utils/timer/mockable"
31+
32+
blockbuilder "github.com/ava-labs/avalanchego/vms/proposervm/block"
2733
)
2834

2935
func TestCoreVMNotRemote(t *testing.T) {
@@ -52,8 +58,9 @@ func TestCoreVMNotRemote(t *testing.T) {
5258
require.ErrorIs(err, block.ErrRemoteVMNotImplemented)
5359

5460
var blks [][]byte
55-
_, err = proVM.BatchedParseBlock(context.Background(), blks)
56-
require.ErrorIs(err, block.ErrRemoteVMNotImplemented)
61+
shouldBeEmpty, err := proVM.BatchedParseBlock(context.Background(), blks)
62+
require.NoError(err)
63+
require.Empty(shouldBeEmpty)
5764
}
5865

5966
func TestGetAncestorsPreForkOnly(t *testing.T) {
@@ -583,6 +590,105 @@ func TestBatchedParseBlockPreForkOnly(t *testing.T) {
583590
require.Equal(builtBlk3.ID(), res[2].ID())
584591
}
585592

593+
func TestBatchedParseBlockParallel(t *testing.T) {
594+
req := require.New(t)
595+
596+
parentID := ids.ID{1}
597+
timestamp := time.Unix(123, 0)
598+
pChainHeight := uint64(2)
599+
chainID := ids.GenerateTestID()
600+
601+
vm := VM{
602+
ctx: &snow.Context{ChainID: chainID},
603+
ChainVM: &block.TestVM{
604+
ParseBlockF: func(_ context.Context, rawBlock []byte) (snowman.Block, error) {
605+
return &snowmantest.Block{BytesV: rawBlock}, nil
606+
},
607+
},
608+
}
609+
610+
tlsCert, err := staking.NewTLSCert()
611+
req.NoError(err)
612+
613+
cert, err := staking.ParseCertificate(tlsCert.Leaf.Raw)
614+
req.NoError(err)
615+
key := tlsCert.PrivateKey.(crypto.Signer)
616+
617+
blockThatCantBeParsed := snowmantest.BuildChild(snowmantest.Genesis)
618+
619+
blockBatch1 := makeParseAbleBlocks(parentID, timestamp, pChainHeight, cert, chainID, key, req)
620+
blockBatch1[50] = blockThatCantBeParsed.Bytes()
621+
622+
blockBatch2 := makeParseAbleBlocks(parentID, timestamp, pChainHeight, cert, chainID, key, req)
623+
624+
for _, testCase := range []struct {
625+
name string
626+
preForkIndex int
627+
rawBlocks [][]byte
628+
}{
629+
{
630+
name: "empty input",
631+
},
632+
{
633+
name: "pre-fork is somewhere in the middle",
634+
rawBlocks: blockBatch1,
635+
preForkIndex: 50,
636+
},
637+
{
638+
name: "all blocks are post fork",
639+
rawBlocks: blockBatch2,
640+
preForkIndex: 9999,
641+
},
642+
} {
643+
t.Run(testCase.name, func(t *testing.T) {
644+
require := require.New(t)
645+
blocks, err := vm.BatchedParseBlock(context.Background(), testCase.rawBlocks)
646+
require.NoError(err)
647+
648+
var expectedBlocks [][]byte
649+
for _, block := range blocks {
650+
expectedBlocks = append(expectedBlocks, block.Bytes())
651+
}
652+
653+
require.Equal(expectedBlocks, testCase.rawBlocks)
654+
655+
for i, block := range blocks {
656+
if i < testCase.preForkIndex {
657+
require.Equal(reflect.TypeOf(block), reflect.TypeOf(&postForkBlock{}))
658+
} else {
659+
require.Equal(reflect.TypeOf(block), reflect.TypeOf(&preForkBlock{}))
660+
}
661+
}
662+
})
663+
}
664+
}
665+
666+
func makeParseAbleBlocks(parentID ids.ID, timestamp time.Time, pChainHeight uint64, cert *staking.Certificate, chainID ids.ID, key crypto.Signer, req *require.Assertions) [][]byte {
667+
makeSignedBlock := func(i int) []byte {
668+
buff := make([]byte, 2)
669+
binary.BigEndian.PutUint16(buff, uint16(i))
670+
671+
signedBlock, err := blockbuilder.Build(
672+
parentID,
673+
timestamp,
674+
pChainHeight,
675+
cert,
676+
buff,
677+
chainID,
678+
key,
679+
)
680+
req.NoError(err)
681+
682+
return signedBlock.Bytes()
683+
}
684+
685+
var blockBatch1 [][]byte
686+
for i := 0; i < 100; i++ {
687+
blockBatch1 = append(blockBatch1, makeSignedBlock(i))
688+
}
689+
return blockBatch1
690+
}
691+
586692
func TestBatchedParseBlockPostForkOnly(t *testing.T) {
587693
require := require.New(t)
588694
var (

vms/proposervm/block/parse.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,36 @@ package block
55

66
import (
77
"fmt"
8+
"sync"
89

910
"github.com/ava-labs/avalanchego/ids"
1011
)
1112

13+
type ParseResult struct {
14+
Block Block
15+
Err error
16+
}
17+
18+
// ParseBlocks would parse the given raw blocks into tuples of (Block, error).
19+
// Each ParseResult in the output is returned in the same order as its corresponding raw block in the input.
20+
func ParseBlocks(blks [][]byte, chainID ids.ID) []ParseResult {
21+
results := make([]ParseResult, len(blks))
22+
23+
var wg sync.WaitGroup
24+
wg.Add(len(blks))
25+
26+
for i, blk := range blks {
27+
go func(i int, blkBytes []byte) {
28+
defer wg.Done()
29+
results[i].Block, results[i].Err = Parse(blkBytes, chainID)
30+
}(i, blk)
31+
}
32+
33+
wg.Wait()
34+
35+
return results
36+
}
37+
1238
// Parse a block and verify that the signature attached to the block is valid
1339
// for the certificate provided in the block.
1440
func Parse(bytes []byte, chainID ids.ID) (Block, error) {

vms/proposervm/block/parse_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,58 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/ava-labs/avalanchego/utils/wrappers"
13+
1214
"github.com/stretchr/testify/require"
1315

1416
"github.com/ava-labs/avalanchego/codec"
1517
"github.com/ava-labs/avalanchego/ids"
1618
"github.com/ava-labs/avalanchego/staking"
19+
"github.com/ava-labs/avalanchego/utils/wrappers"
1720
)
1821

22+
func TestParseBlocks(t *testing.T) {
23+
parentID := ids.ID{1}
24+
timestamp := time.Unix(123, 0)
25+
pChainHeight := uint64(2)
26+
innerBlockBytes := []byte{3}
27+
chainID := ids.ID{4}
28+
29+
tlsCert, err := staking.NewTLSCert()
30+
require.NoError(t, err)
31+
32+
cert, err := staking.ParseCertificate(tlsCert.Leaf.Raw)
33+
require.NoError(t, err)
34+
key := tlsCert.PrivateKey.(crypto.Signer)
35+
36+
signedBlock, err := Build(
37+
parentID,
38+
timestamp,
39+
pChainHeight,
40+
cert,
41+
innerBlockBytes,
42+
chainID,
43+
key,
44+
)
45+
require.NoError(t, err)
46+
47+
signedBlockBytes := signedBlock.Bytes()
48+
malformedBlockBytes := make([]byte, len(signedBlockBytes)-1)
49+
copy(malformedBlockBytes, signedBlockBytes)
50+
51+
results := ParseBlocks([][]byte{signedBlockBytes, malformedBlockBytes}, chainID)
52+
require.Equal(t, results[0].Block.Bytes(), signedBlockBytes)
53+
require.NoError(t, results[0].Err)
54+
require.ErrorIs(t, results[1].Err, wrappers.ErrInsufficientLength)
55+
require.Nil(t, results[1].Block)
56+
57+
results = ParseBlocks([][]byte{malformedBlockBytes, signedBlockBytes}, chainID)
58+
require.Equal(t, results[1].Block.Bytes(), signedBlockBytes)
59+
require.NoError(t, results[1].Err)
60+
require.ErrorIs(t, results[0].Err, wrappers.ErrInsufficientLength)
61+
require.Nil(t, results[0].Block)
62+
}
63+
1964
func TestParse(t *testing.T) {
2065
parentID := ids.ID{1}
2166
timestamp := time.Unix(123, 0)

0 commit comments

Comments
 (0)