Skip to content

Commit 070aaac

Browse files
authored
Parallelize BatchedParseBlock (#3227)
Signed-off-by: Yacov Manevich <yacov.manevich@avalabs.org>
1 parent 5804bad commit 070aaac

File tree

4 files changed

+196
-9
lines changed

4 files changed

+196
-9
lines changed

vms/proposervm/batched_vm.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ func (vm *VM) GetAncestors(
8181
}
8282

8383
func (vm *VM) BatchedParseBlock(ctx context.Context, blks [][]byte) ([]snowman.Block, error) {
84-
if vm.batchedVM == nil {
85-
return nil, block.ErrRemoteVMNotImplemented
86-
}
87-
8884
type partialData struct {
8985
index int
9086
block statelessblock.Block
@@ -97,9 +93,11 @@ func (vm *VM) BatchedParseBlock(ctx context.Context, blks [][]byte) ([]snowman.B
9793
statelessBlockDescs = make([]partialData, 0, len(blks))
9894
innerBlockBytes = make([][]byte, 0, len(blks))
9995
)
96+
97+
parsingResults := statelessblock.ParseBlocks(blks, vm.ctx.ChainID)
98+
10099
for ; blocksIndex < len(blks); blocksIndex++ {
101-
blkBytes := blks[blocksIndex]
102-
statelessBlock, err := statelessblock.Parse(blkBytes, vm.ctx.ChainID)
100+
statelessBlock, err := parsingResults[blocksIndex].Block, parsingResults[blocksIndex].Err
103101
if err != nil {
104102
break
105103
}
@@ -120,7 +118,7 @@ func (vm *VM) BatchedParseBlock(ctx context.Context, blks [][]byte) ([]snowman.B
120118
innerBlockBytes = append(innerBlockBytes, blks[blocksIndex:]...)
121119

122120
// parse all inner blocks at once
123-
innerBlks, err := vm.batchedVM.BatchedParseBlock(ctx, innerBlockBytes)
121+
innerBlks, err := block.BatchedParseBlock(ctx, vm.ChainVM, innerBlockBytes)
124122
if err != nil {
125123
return nil, err
126124
}

vms/proposervm/batched_vm_test.go

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package proposervm
66
import (
77
"bytes"
88
"context"
9+
"crypto"
10+
"encoding/binary"
911
"testing"
1012
"time"
1113

@@ -23,7 +25,10 @@ import (
2325
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
2426
"github.com/ava-labs/avalanchego/snow/snowtest"
2527
"github.com/ava-labs/avalanchego/snow/validators"
28+
"github.com/ava-labs/avalanchego/staking"
2629
"github.com/ava-labs/avalanchego/utils/timer/mockable"
30+
31+
blockbuilder "github.com/ava-labs/avalanchego/vms/proposervm/block"
2732
)
2833

2934
func TestCoreVMNotRemote(t *testing.T) {
@@ -52,8 +57,9 @@ func TestCoreVMNotRemote(t *testing.T) {
5257
require.ErrorIs(err, block.ErrRemoteVMNotImplemented)
5358

5459
var blks [][]byte
55-
_, err = proVM.BatchedParseBlock(context.Background(), blks)
56-
require.ErrorIs(err, block.ErrRemoteVMNotImplemented)
60+
shouldBeEmpty, err := proVM.BatchedParseBlock(context.Background(), blks)
61+
require.NoError(err)
62+
require.Empty(shouldBeEmpty)
5763
}
5864

5965
func TestGetAncestorsPreForkOnly(t *testing.T) {
@@ -583,6 +589,102 @@ func TestBatchedParseBlockPreForkOnly(t *testing.T) {
583589
require.Equal(builtBlk3.ID(), res[2].ID())
584590
}
585591

592+
func TestBatchedParseBlockParallel(t *testing.T) {
593+
parentID := ids.ID{1}
594+
timestamp := time.Unix(123, 0)
595+
pChainHeight := uint64(2)
596+
chainID := ids.GenerateTestID()
597+
598+
vm := VM{
599+
ctx: &snow.Context{ChainID: chainID},
600+
ChainVM: &block.TestVM{
601+
ParseBlockF: func(_ context.Context, rawBlock []byte) (snowman.Block, error) {
602+
return &snowmantest.Block{BytesV: rawBlock}, nil
603+
},
604+
},
605+
}
606+
607+
tlsCert, err := staking.NewTLSCert()
608+
require.NoError(t, err)
609+
610+
cert, err := staking.ParseCertificate(tlsCert.Leaf.Raw)
611+
require.NoError(t, err)
612+
key := tlsCert.PrivateKey.(crypto.Signer)
613+
614+
blockThatCantBeParsed := snowmantest.BuildChild(snowmantest.Genesis)
615+
616+
blocksWithUnparsable := makeParseableBlocks(t, parentID, timestamp, pChainHeight, cert, chainID, key)
617+
blocksWithUnparsable[50] = blockThatCantBeParsed.Bytes()
618+
619+
parsableBlocks := makeParseableBlocks(t, parentID, timestamp, pChainHeight, cert, chainID, key)
620+
621+
for _, testCase := range []struct {
622+
name string
623+
preForkIndex int
624+
rawBlocks [][]byte
625+
}{
626+
{
627+
name: "empty input",
628+
rawBlocks: [][]byte{},
629+
},
630+
{
631+
name: "pre-fork is somewhere in the middle",
632+
rawBlocks: blocksWithUnparsable,
633+
preForkIndex: 50,
634+
},
635+
{
636+
name: "all blocks are post fork",
637+
rawBlocks: parsableBlocks,
638+
preForkIndex: len(parsableBlocks),
639+
},
640+
} {
641+
t.Run(testCase.name, func(t *testing.T) {
642+
require := require.New(t)
643+
blocks, err := vm.BatchedParseBlock(context.Background(), testCase.rawBlocks)
644+
require.NoError(err)
645+
646+
returnedBlockBytes := make([][]byte, len(blocks))
647+
for i, block := range blocks {
648+
returnedBlockBytes[i] = block.Bytes()
649+
}
650+
require.Equal(testCase.rawBlocks, returnedBlockBytes)
651+
652+
for i, block := range blocks {
653+
if i < testCase.preForkIndex {
654+
require.IsType(&postForkBlock{}, block)
655+
} else {
656+
require.IsType(&preForkBlock{}, block)
657+
}
658+
}
659+
})
660+
}
661+
}
662+
663+
func makeParseableBlocks(t *testing.T, parentID ids.ID, timestamp time.Time, pChainHeight uint64, cert *staking.Certificate, chainID ids.ID, key crypto.Signer) [][]byte {
664+
makeSignedBlock := func(i int) []byte {
665+
buff := binary.AppendVarint(nil, int64(i))
666+
667+
signedBlock, err := blockbuilder.Build(
668+
parentID,
669+
timestamp,
670+
pChainHeight,
671+
cert,
672+
buff,
673+
chainID,
674+
key,
675+
)
676+
require.NoError(t, err)
677+
678+
return signedBlock.Bytes()
679+
}
680+
681+
blockBytes := make([][]byte, 100)
682+
for i := range blockBytes {
683+
blockBytes[i] = makeSignedBlock(i)
684+
}
685+
return blockBytes
686+
}
687+
586688
func TestBatchedParseBlockPostForkOnly(t *testing.T) {
587689
require := require.New(t)
588690
var (

vms/proposervm/block/parse.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,36 @@ package block
55

66
import (
77
"fmt"
8+
"sync"
89

910
"github.com/ava-labs/avalanchego/ids"
1011
)
1112

13+
type ParseResult struct {
14+
Block Block
15+
Err error
16+
}
17+
18+
// ParseBlocks parses the given raw blocks into tuples of (Block, error).
19+
// Each ParseResult is returned in the same order as its corresponding bytes in the input.
20+
func ParseBlocks(blks [][]byte, chainID ids.ID) []ParseResult {
21+
results := make([]ParseResult, len(blks))
22+
23+
var wg sync.WaitGroup
24+
wg.Add(len(blks))
25+
26+
for i, blk := range blks {
27+
go func(i int, blkBytes []byte) {
28+
defer wg.Done()
29+
results[i].Block, results[i].Err = Parse(blkBytes, chainID)
30+
}(i, blk)
31+
}
32+
33+
wg.Wait()
34+
35+
return results
36+
}
37+
1238
// Parse a block and verify that the signature attached to the block is valid
1339
// for the certificate provided in the block.
1440
func Parse(bytes []byte, chainID ids.ID) (Block, error) {

vms/proposervm/block/parse_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,69 @@ import (
1414
"github.com/ava-labs/avalanchego/codec"
1515
"github.com/ava-labs/avalanchego/ids"
1616
"github.com/ava-labs/avalanchego/staking"
17+
"github.com/ava-labs/avalanchego/utils/wrappers"
1718
)
1819

20+
func TestParseBlocks(t *testing.T) {
21+
parentID := ids.ID{1}
22+
timestamp := time.Unix(123, 0)
23+
pChainHeight := uint64(2)
24+
innerBlockBytes := []byte{3}
25+
chainID := ids.ID{4}
26+
27+
tlsCert, err := staking.NewTLSCert()
28+
require.NoError(t, err)
29+
30+
cert, err := staking.ParseCertificate(tlsCert.Leaf.Raw)
31+
require.NoError(t, err)
32+
key := tlsCert.PrivateKey.(crypto.Signer)
33+
34+
signedBlock, err := Build(
35+
parentID,
36+
timestamp,
37+
pChainHeight,
38+
cert,
39+
innerBlockBytes,
40+
chainID,
41+
key,
42+
)
43+
require.NoError(t, err)
44+
45+
signedBlockBytes := signedBlock.Bytes()
46+
malformedBlockBytes := make([]byte, len(signedBlockBytes)-1)
47+
copy(malformedBlockBytes, signedBlockBytes)
48+
49+
for _, testCase := range []struct {
50+
name string
51+
input [][]byte
52+
output []ParseResult
53+
}{
54+
{
55+
name: "ValidThenInvalid",
56+
input: [][]byte{signedBlockBytes, malformedBlockBytes},
57+
output: []ParseResult{{Block: &statelessBlock{bytes: signedBlockBytes}}, {Err: wrappers.ErrInsufficientLength}},
58+
},
59+
{
60+
name: "InvalidThenValid",
61+
input: [][]byte{malformedBlockBytes, signedBlockBytes},
62+
output: []ParseResult{{Err: wrappers.ErrInsufficientLength}, {Block: &statelessBlock{bytes: signedBlockBytes}}},
63+
},
64+
} {
65+
t.Run(testCase.name, func(t *testing.T) {
66+
results := ParseBlocks(testCase.input, chainID)
67+
for i := range testCase.output {
68+
if testCase.output[i].Block == nil {
69+
require.Nil(t, results[i].Block)
70+
require.ErrorIs(t, results[i].Err, testCase.output[i].Err)
71+
} else {
72+
require.Equal(t, testCase.output[i].Block.Bytes(), results[i].Block.Bytes())
73+
require.NoError(t, results[i].Err)
74+
}
75+
}
76+
})
77+
}
78+
}
79+
1980
func TestParse(t *testing.T) {
2081
parentID := ids.ID{1}
2182
timestamp := time.Unix(123, 0)

0 commit comments

Comments
 (0)