Skip to content

Commit c24515d

Browse files
nicholasguoalgorandnicholasguo
authored andcommitted
txnsync: Use dynamic maxEncodedTransactionGroups (algorand#3168)
Change constants in the txnsync around message size limits into variables based on maxTxPoolSize. This will allow for support of larger message sizes when dealing with larger load. Reran existing tests
1 parent 7dc0b70 commit c24515d

File tree

7 files changed

+42
-18
lines changed

7 files changed

+42
-18
lines changed

txnsync/encodedgroups_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func TestBadBitmask(t *testing.T) {
3131
partitiontest.PartitionTest(t)
3232

33-
txnGroups, genesisID, genesisHash, err := txnGroupsData(96)
33+
txnGroups, genesisID, genesisHash, err := txnGroupsData(50)
3434
require.NoError(t, err)
3535

3636
var s syncState
@@ -41,7 +41,7 @@ func TestBadBitmask(t *testing.T) {
4141
require.Equal(t, errIndexNotFound, err)
4242
}
4343

44-
// corrupted bitmask may bcause panic during decoding. This test is to make sure it is an error and not a panic
44+
// corrupted bitmask may cause panic during decoding. This test is to make sure it is an error and not a panic
4545
func badEncodeTransactionGroups(t *testing.T, s *syncState, inTxnGroups []pooldata.SignedTxGroup, dataExchangeRate uint64) (packedTransactionGroups, error) {
4646
txnCount := 0
4747
for _, txGroup := range inTxnGroups {

txnsync/encodedgroupstypes.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ import (
2626
"github.com/algorand/go-algorand/protocol"
2727
)
2828

29-
const maxEncodedTransactionGroups = 30000
30-
const maxEncodedTransactionGroupEntries = 30000
31-
const maxBitmaskSize = (maxEncodedTransactionGroupEntries+7)/8 + 1
32-
const maxSignatureBytes = maxEncodedTransactionGroupEntries * len(crypto.Signature{})
33-
const maxAddressBytes = maxEncodedTransactionGroupEntries * crypto.DigestSize
29+
// set in init() in service.go
30+
var maxEncodedTransactionGroups int
31+
var maxEncodedTransactionGroupEntries int
32+
var maxBitmaskSize int
33+
var maxSignatureBytes int
34+
var maxAddressBytes int
3435

3536
var errInvalidTxType = errors.New("invalid txtype")
3637

txnsync/exchange.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ import (
2222
)
2323

2424
const txnBlockMessageVersion = 1
25-
const maxBloomFilterSize = 100000
2625
const maxAcceptedMsgSeq = 64
27-
const maxEncodedTransactionGroupBytes = 10000000
28-
const maxProposalSize = 3500000 // 10K * 32 + sizeof(block header)-
26+
27+
// set in init() in service.go
28+
var maxBloomFilterSize int
29+
var maxEncodedTransactionGroupBytes int
30+
31+
const maxProposalSize = 3500000 // 10K * 32 + sizeof(block header)
2932

3033
type transactionBlockMessage struct {
3134
_struct struct{} `codec:",omitempty,omitemptyarray"` //nolint:structcheck,unused

txnsync/incoming.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (s *syncState) asyncIncomingMessageHandler(networkPeer interface{}, peer *P
6060
_, err = incomingMessage.message.UnmarshalMsg(message)
6161
if err != nil {
6262
// if we received a message that we cannot parse, disconnect.
63-
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer.")
63+
s.log.Infof("received unparsable transaction sync message from peer. disconnecting from peer: %v, bytes: %d", err, len(message))
6464
s.incomingMessagesQ.erase(peer, networkPeer)
6565
return err
6666
}

txnsync/service.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,25 @@ func MakeTransactionSyncService(log logging.Logger, conn NodeConnector, isRelay
5151
}
5252
s.state.service = s
5353
s.state.xorBuilder.MaxIterations = 10
54+
55+
setTransactionSyncVariables(cfg)
5456
return s
5557
}
5658

59+
func setTransactionSyncVariables(cfg config.Local) {
60+
if cfg.TxPoolSize < maxEncodedTransactionGroups {
61+
return
62+
}
63+
maxEncodedTransactionGroups = cfg.TxPoolSize
64+
maxEncodedTransactionGroupEntries = cfg.TxPoolSize
65+
maxBitmaskSize = (maxEncodedTransactionGroupEntries+7)/8 + 1
66+
maxSignatureBytes = maxEncodedTransactionGroupEntries * len(crypto.Signature{})
67+
maxAddressBytes = maxEncodedTransactionGroupEntries * crypto.DigestSize
68+
69+
maxBloomFilterSize = cfg.TxPoolSize * 5 // 32 bit xor uses slightly more than 4 bytes/element.
70+
maxEncodedTransactionGroupBytes = cfg.TxPoolSize * 10000 // assume each transaction takes 10KB, as a worst-case-scenario for bounding purposes only.
71+
}
72+
5773
// Start starts the transaction sync
5874
func (s *Service) Start() {
5975
s.ctx, s.cancelCtx = context.WithCancel(context.Background())
@@ -76,3 +92,7 @@ func (s *Service) Stop() {
7692
func (s *Service) GetIncomingMessageHandler() IncomingMessageHandler {
7793
return s.state.asyncIncomingMessageHandler
7894
}
95+
96+
func init() {
97+
setTransactionSyncVariables(config.GetDefaultLocal())
98+
}

txnsync/txngroups.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func decodeTransactionGroups(ptg packedTransactionGroups, genesisID string, gene
152152
return nil, err
153153
}
154154

155-
if stub.TransactionGroupCount > maxEncodedTransactionGroups {
155+
if stub.TransactionGroupCount > uint64(maxEncodedTransactionGroups) {
156156
return nil, errors.New("invalid TransactionGroupCount")
157157
}
158158

@@ -191,7 +191,7 @@ func decodeTransactionGroups(ptg packedTransactionGroups, genesisID string, gene
191191

192192
func decompressTransactionGroupsBytes(data []byte, lenDecompressedBytes uint64) (decoded []byte, err error) {
193193
compressionRatio := lenDecompressedBytes / uint64(len(data)) // data should have been compressed between 0 and 95%
194-
if lenDecompressedBytes > maxEncodedTransactionGroupBytes || compressionRatio <= 0 || compressionRatio >= maxCompressionRatio {
194+
if lenDecompressedBytes > uint64(maxEncodedTransactionGroupBytes) || compressionRatio <= 0 || compressionRatio >= maxCompressionRatio {
195195
return nil, fmt.Errorf("invalid lenDecompressedBytes: %d, len(data): %d", lenDecompressedBytes, len(data))
196196
}
197197

txnsync/txngroups_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func txnGroupsData(numBlocks int) (txnGroups []pooldata.SignedTxGroup, genesisID
232232
func TestTxnGroupEncodingLarge(t *testing.T) {
233233
partitiontest.PartitionTest(t)
234234

235-
txnGroups, genesisID, genesisHash, err := txnGroupsData(969)
235+
txnGroups, genesisID, genesisHash, err := txnGroupsData(500)
236236
require.NoError(t, err)
237237

238238
var s syncState
@@ -268,10 +268,10 @@ func TestTxnGroupEncodingLarge(t *testing.T) {
268268
}
269269
}
270270
require.Equal(t, 2, len(count))
271-
require.Equal(t, 18351, count["axfer"])
272-
require.Equal(t, 1663, count["pay"])
273-
require.Equal(t, 20005, sigs)
274-
require.Equal(t, 9, msigs)
271+
require.Equal(t, 9834, count["axfer"])
272+
require.Equal(t, 850, count["pay"])
273+
require.Equal(t, 10678, sigs)
274+
require.Equal(t, 6, msigs)
275275
require.Equal(t, 0, lsigs)
276276
}
277277

0 commit comments

Comments
 (0)