Skip to content

Commit 21c752e

Browse files
author
nicholasguoalgorand
authored
txnsync: Use dynamic maxEncodedTransactionGroups (#3168)
## Summary Change constants in the txnsync around message size limits into variables based on maxTxPoolSize. This will allow for support of larger message sizes when dealing with larger load. ## Test Plan Reran existing tests
1 parent 5eb09af commit 21c752e

File tree

7 files changed

+42
-18
lines changed

7 files changed

+42
-18
lines changed

txnsync/encodedgroups_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func TestBadBitmask(t *testing.T) {
3131
partitiontest.PartitionTest(t)
3232

33-
txnGroups, genesisID, genesisHash, err := txnGroupsData(96)
33+
txnGroups, genesisID, genesisHash, err := txnGroupsData(50)
3434
require.NoError(t, err)
3535

3636
var s syncState
@@ -41,7 +41,7 @@ func TestBadBitmask(t *testing.T) {
4141
require.Equal(t, errIndexNotFound, err)
4242
}
4343

44-
// corrupted bitmask may bcause panic during decoding. This test is to make sure it is an error and not a panic
44+
// corrupted bitmask may cause panic during decoding. This test is to make sure it is an error and not a panic
4545
func badEncodeTransactionGroups(t *testing.T, s *syncState, inTxnGroups []pooldata.SignedTxGroup, dataExchangeRate uint64) (packedTransactionGroups, error) {
4646
txnCount := 0
4747
for _, txGroup := range inTxnGroups {

txnsync/encodedgroupstypes.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ import (
2626
"github.com/algorand/go-algorand/protocol"
2727
)
2828

29-
const maxEncodedTransactionGroups = 30000
30-
const maxEncodedTransactionGroupEntries = 30000
31-
const maxBitmaskSize = (maxEncodedTransactionGroupEntries+7)/8 + 1
32-
const maxSignatureBytes = maxEncodedTransactionGroupEntries * len(crypto.Signature{})
33-
const maxAddressBytes = maxEncodedTransactionGroupEntries * crypto.DigestSize
29+
// set in init() in service.go
30+
var maxEncodedTransactionGroups int
31+
var maxEncodedTransactionGroupEntries int
32+
var maxBitmaskSize int
33+
var maxSignatureBytes int
34+
var maxAddressBytes int
3435

3536
var errInvalidTxType = errors.New("invalid txtype")
3637

txnsync/exchange.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ import (
2222
)
2323

2424
const txnBlockMessageVersion = 1
25-
const maxBloomFilterSize = 100000
2625
const maxAcceptedMsgSeq = 64
27-
const maxEncodedTransactionGroupBytes = 10000000
28-
const maxProposalSize = 350000
26+
27+
// set in init() in service.go
28+
var maxBloomFilterSize int
29+
var maxEncodedTransactionGroupBytes int
30+
31+
var maxProposalSize = 350000
2932

3033
type transactionBlockMessage struct {
3134
_struct struct{} `codec:",omitempty,omitemptyarray"` //nolint:structcheck,unused

txnsync/incoming.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
5959
_, err = incomingMessage.message.UnmarshalMsg(message)
6060
if err != nil {
6161
// if we received a message that we cannot parse, disconnect.
62-
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer.")
62+
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer: %v, bytes: %d", err, len(message))
6363
s.incomingMessagesQ.erase(peer, networkPeer)
6464
return err
6565
}

txnsync/service.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,25 @@ func MakeTransactionSyncService(log logging.Logger, conn NodeConnector, isRelay
5151
}
5252
s.state.service = s
5353
s.state.xorBuilder.MaxIterations = 10
54+
55+
setTransactionSyncVariables(cfg)
5456
return s
5557
}
5658

59+
func setTransactionSyncVariables(cfg config.Local) {
60+
if cfg.TxPoolSize < maxEncodedTransactionGroups {
61+
return
62+
}
63+
maxEncodedTransactionGroups = cfg.TxPoolSize
64+
maxEncodedTransactionGroupEntries = cfg.TxPoolSize
65+
maxBitmaskSize = (maxEncodedTransactionGroupEntries+7)/8 + 1
66+
maxSignatureBytes = maxEncodedTransactionGroupEntries * len(crypto.Signature{})
67+
maxAddressBytes = maxEncodedTransactionGroupEntries * crypto.DigestSize
68+
69+
maxBloomFilterSize = cfg.TxPoolSize * 5 // 32 bit xor uses slightly more than 4 bytes/element.
70+
maxEncodedTransactionGroupBytes = cfg.TxPoolSize * 10000 // assume each transaction takes 10KB, as a worst-case-scenario for bounding purposes only.
71+
}
72+
5773
// Start starts the transaction sync
5874
func (s *Service) Start() {
5975
s.ctx, s.cancelCtx = context.WithCancel(context.Background())
@@ -76,3 +92,7 @@ func (s *Service) Stop() {
7692
func (s *Service) GetIncomingMessageHandler() IncomingMessageHandler {
7793
return s.state.asyncIncomingMessageHandler
7894
}
95+
96+
func init() {
97+
setTransactionSyncVariables(config.GetDefaultLocal())
98+
}

txnsync/txngroups.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func decodeTransactionGroups(ptg packedTransactionGroups, genesisID string, gene
152152
return nil, err
153153
}
154154

155-
if stub.TransactionGroupCount > maxEncodedTransactionGroups {
155+
if stub.TransactionGroupCount > uint64(maxEncodedTransactionGroups) {
156156
return nil, errors.New("invalid TransactionGroupCount")
157157
}
158158

@@ -191,7 +191,7 @@ func decodeTransactionGroups(ptg packedTransactionGroups, genesisID string, gene
191191

192192
func decompressTransactionGroupsBytes(data []byte, lenDecompressedBytes uint64) (decoded []byte, err error) {
193193
compressionRatio := lenDecompressedBytes / uint64(len(data)) // data should have been compressed between 0 and 95%
194-
if lenDecompressedBytes > maxEncodedTransactionGroupBytes || compressionRatio <= 0 || compressionRatio >= maxCompressionRatio {
194+
if lenDecompressedBytes > uint64(maxEncodedTransactionGroupBytes) || compressionRatio <= 0 || compressionRatio >= maxCompressionRatio {
195195
return nil, fmt.Errorf("invalid lenDecompressedBytes: %d, len(data): %d", lenDecompressedBytes, len(data))
196196
}
197197

txnsync/txngroups_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func txnGroupsData(numBlocks int) (txnGroups []pooldata.SignedTxGroup, genesisID
232232
func TestTxnGroupEncodingLarge(t *testing.T) {
233233
partitiontest.PartitionTest(t)
234234

235-
txnGroups, genesisID, genesisHash, err := txnGroupsData(969)
235+
txnGroups, genesisID, genesisHash, err := txnGroupsData(500)
236236
require.NoError(t, err)
237237

238238
var s syncState
@@ -268,10 +268,10 @@ func TestTxnGroupEncodingLarge(t *testing.T) {
268268
}
269269
}
270270
require.Equal(t, 2, len(count))
271-
require.Equal(t, 18351, count["axfer"])
272-
require.Equal(t, 1663, count["pay"])
273-
require.Equal(t, 20005, sigs)
274-
require.Equal(t, 9, msigs)
271+
require.Equal(t, 9834, count["axfer"])
272+
require.Equal(t, 850, count["pay"])
273+
require.Equal(t, 10678, sigs)
274+
require.Equal(t, 6, msigs)
275275
require.Equal(t, 0, lsigs)
276276
}
277277

0 commit comments

Comments
 (0)