Skip to content

Commit c2f010b

Browse files
Turn list of txs into tx sequence (#683)
* Turn list of txs into tx sequence * Add agent comments * Remove unused code and address comments * add some more comments * address more comments' * lint * Remove :::warning --------- Co-authored-by: Anusha <anushachillara@gmail.com>
1 parent 0fc8ad1 commit c2f010b

File tree

8 files changed

+205
-212
lines changed

8 files changed

+205
-212
lines changed

cmd/simulator/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,19 @@ Once you've built AvalancheGo, open the AvalancheGo directory in a separate term
3838
./build/avalanchego --staking-enabled=false --network-id=local
3939
```
4040

41-
:::warning
41+
WARNING:
42+
4243
The staking-enabled flag is only for local testing. Disabling staking serves two functions explicitly for testing purposes:
4344

4445
1. Ignore stake weight on the P-Chain and count each connected peer as having a stake weight of 1
4546
2. Automatically opts in to validate every Subnet
46-
:::
4747

4848
Once you have AvalancheGo running locally, it will be running an HTTP Server on the default port `9650`. This means that the RPC Endpoint for the C-Chain will be http://127.0.0.1:9650/ext/bc/C/rpc and ws://127.0.0.1:9650/ext/bc/C/ws for WebSocket connections.
4949

5050
Now, we can run the simulator command to simulate some load on the local C-Chain for 30s:
5151

5252
```bash
53-
./simulator --timeout=1m --concurrency=1 --max-fee-cap=300 --max-tip-cap=10 --txs-per-worker=50
53+
./simulator --timeout=1m --workers=1 --max-fee-cap=300 --max-tip-cap=10 --txs-per-worker=50
5454
```
5555

5656
## Command Line Flags

cmd/simulator/config/flags.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const (
2626
KeyDirKey = "key-dir"
2727
VersionKey = "version"
2828
TimeoutKey = "timeout"
29+
BatchSizeKey = "batch-size"
2930
)
3031

3132
var (
@@ -42,6 +43,7 @@ type Config struct {
4243
TxsPerWorker uint64 `json:"txs-per-worker"`
4344
KeyDir string `json:"key-dir"`
4445
Timeout time.Duration `json:"timeout"`
46+
BatchSize uint64 `json:"batch-size"`
4547
}
4648

4749
func BuildConfig(v *viper.Viper) (Config, error) {
@@ -53,6 +55,7 @@ func BuildConfig(v *viper.Viper) (Config, error) {
5355
TxsPerWorker: v.GetUint64(TxsPerWorkerKey),
5456
KeyDir: v.GetString(KeyDirKey),
5557
Timeout: v.GetDuration(TimeoutKey),
58+
BatchSize: v.GetUint64(BatchSizeKey),
5659
}
5760
if len(c.Endpoints) == 0 {
5861
return c, ErrNoEndpoints
@@ -114,4 +117,5 @@ func addSimulatorFlags(fs *pflag.FlagSet) {
114117
fs.String(KeyDirKey, ".simulator/keys", "Specify the directory to save private keys in (INSECURE: only use for testing)")
115118
fs.Duration(TimeoutKey, 5*time.Minute, "Specify the timeout for the simulator to complete (0 indicates no timeout)")
116119
fs.String(LogLevelKey, "info", "Specify the log level to use in the simulator")
120+
fs.Uint64(BatchSizeKey, 100, "Specify the batchsize for the worker to issue and confirm txs")
117121
}

cmd/simulator/load/funder.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"math/big"
1111

1212
"github.com/ava-labs/subnet-evm/cmd/simulator/key"
13+
"github.com/ava-labs/subnet-evm/cmd/simulator/txs"
1314
"github.com/ava-labs/subnet-evm/core/types"
1415
"github.com/ava-labs/subnet-evm/ethclient"
1516
"github.com/ava-labs/subnet-evm/params"
@@ -80,7 +81,7 @@ func DistributeFunds(ctx context.Context, client ethclient.Client, keys []*key.K
8081
signer := types.LatestSignerForChainID(chainID)
8182

8283
// Generate a sequence of transactions to distribute the required funds.
83-
log.Info("Generating distribution transactions")
84+
log.Info("Generating distribution transactions...")
8485
i := 0
8586
txGenerator := func(key *ecdsa.PrivateKey, nonce uint64) (*types.Transaction, error) {
8687
tx, err := types.SignNewTx(key, signer, &types.DynamicFeeTx{
@@ -99,17 +100,18 @@ func DistributeFunds(ctx context.Context, client ethclient.Client, keys []*key.K
99100
i++
100101
return tx, nil
101102
}
102-
txs, err := GenerateTxSequence(ctx, txGenerator, client, maxFundsKey.PrivKey, uint64(len(needFundsAddrs)))
103+
104+
numTxs := uint64(len(needFundsAddrs))
105+
txSequence, err := txs.GenerateTxSequence(ctx, txGenerator, client, maxFundsKey.PrivKey, numTxs)
103106
if err != nil {
104107
return nil, fmt.Errorf("failed to generate fund distribution sequence from %s of length %d", maxFundsKey.Address, len(needFundsAddrs))
105108
}
109+
worker := NewSingleAddressTxWorker(ctx, client, maxFundsKey.Address)
110+
txFunderAgent := txs.NewIssueNAgent[*types.Transaction](txSequence, worker, numTxs)
106111

107-
log.Info("Executing distribution transactions...")
108-
worker := NewWorker(client, maxFundsKey.Address, txs)
109-
if err := worker.Execute(ctx); err != nil {
112+
if err := txFunderAgent.Execute(ctx); err != nil {
110113
return nil, err
111114
}
112-
113115
for _, addr := range needFundsAddrs {
114116
balance, err := client.BalanceAt(ctx, addr, nil)
115117
if err != nil {

cmd/simulator/load/loader.go

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,40 +11,48 @@ import (
1111

1212
"github.com/ava-labs/subnet-evm/cmd/simulator/config"
1313
"github.com/ava-labs/subnet-evm/cmd/simulator/key"
14+
"github.com/ava-labs/subnet-evm/cmd/simulator/txs"
1415
"github.com/ava-labs/subnet-evm/core/types"
1516
"github.com/ava-labs/subnet-evm/ethclient"
1617
"github.com/ava-labs/subnet-evm/params"
1718
"github.com/ethereum/go-ethereum/common"
1819
ethcrypto "github.com/ethereum/go-ethereum/crypto"
1920
"github.com/ethereum/go-ethereum/log"
21+
"golang.org/x/sync/errgroup"
2022
)
2123

22-
// CreateLoader creates a WorkerGroup from [config] to perform the specified simulation.
23-
func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, error) {
24+
// ExecuteLoader creates txSequences from [config] and has txAgents execute the specified simulation.
25+
func ExecuteLoader(ctx context.Context, config config.Config) error {
26+
if config.Timeout > 0 {
27+
var cancel context.CancelFunc
28+
ctx, cancel = context.WithTimeout(ctx, config.Timeout)
29+
defer cancel()
30+
}
31+
2432
// Construct the arguments for the load simulator
2533
clients := make([]ethclient.Client, 0, len(config.Endpoints))
2634
for i := 0; i < config.Workers; i++ {
2735
clientURI := config.Endpoints[i%len(config.Endpoints)]
2836
client, err := ethclient.Dial(clientURI)
2937
if err != nil {
30-
return nil, fmt.Errorf("failed to dial client at %s: %w", clientURI, err)
38+
return fmt.Errorf("failed to dial client at %s: %w", clientURI, err)
3139
}
3240
clients = append(clients, client)
3341
}
3442

3543
keys, err := key.LoadAll(ctx, config.KeyDir)
3644
if err != nil {
37-
return nil, err
45+
return err
3846
}
3947
// Ensure there are at least [config.Workers] keys and save any newly generated ones.
4048
if len(keys) < config.Workers {
4149
for i := 0; len(keys) < config.Workers; i++ {
4250
newKey, err := key.Generate()
4351
if err != nil {
44-
return nil, fmt.Errorf("failed to generate %d new key: %w", i, err)
52+
return fmt.Errorf("failed to generate %d new key: %w", i, err)
4553
}
4654
if err := newKey.Save(config.KeyDir); err != nil {
47-
return nil, fmt.Errorf("failed to save %d new key: %w", i, err)
55+
return fmt.Errorf("failed to save %d new key: %w", i, err)
4856
}
4957
keys = append(keys, newKey)
5058
}
@@ -57,8 +65,9 @@ func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, erro
5765
log.Info("Distributing funds", "numTxsPerWorker", config.TxsPerWorker, "minFunds", minFundsPerAddr)
5866
keys, err = DistributeFunds(ctx, clients[0], keys, config.Workers, minFundsPerAddr)
5967
if err != nil {
60-
return nil, err
68+
return err
6169
}
70+
log.Info("Distributed funds successfully")
6271

6372
pks := make([]*ecdsa.PrivateKey, 0, len(keys))
6473
senders := make([]common.Address, 0, len(keys))
@@ -73,10 +82,11 @@ func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, erro
7382
client := clients[0]
7483
chainID, err := client.ChainID(ctx)
7584
if err != nil {
76-
return nil, fmt.Errorf("failed to fetch chainID: %w", err)
85+
return fmt.Errorf("failed to fetch chainID: %w", err)
7786
}
7887
signer := types.LatestSignerForChainID(chainID)
7988

89+
log.Info("Creating transaction sequences...")
8090
txGenerator := func(key *ecdsa.PrivateKey, nonce uint64) (*types.Transaction, error) {
8191
addr := ethcrypto.PubkeyToAddress(key.PublicKey)
8292
tx, err := types.SignNewTx(key, signer, &types.DynamicFeeTx{
@@ -94,26 +104,30 @@ func CreateLoader(ctx context.Context, config config.Config) (*WorkerGroup, erro
94104
}
95105
return tx, nil
96106
}
97-
txSequences, err := GenerateTxSequences(ctx, txGenerator, clients[0], pks, config.TxsPerWorker)
107+
txSequences, err := txs.GenerateTxSequences(ctx, txGenerator, clients[0], pks, config.TxsPerWorker)
98108
if err != nil {
99-
return nil, err
109+
return err
100110
}
101111

102-
wg := NewWorkerGroup(clients[:config.Workers], senders[:config.Workers], txSequences[:config.Workers])
103-
return wg, nil
104-
}
112+
log.Info("Constructing tx agents...", "numAgents", config.Workers)
113+
agents := make([]txs.Agent[*types.Transaction], 0, config.Workers)
114+
for i := 0; i < config.Workers; i++ {
115+
agents = append(agents, txs.NewIssueNAgent[*types.Transaction](txSequences[i], NewSingleAddressTxWorker(ctx, clients[i], senders[i]), config.BatchSize))
116+
}
105117

106-
// ExecuteLoader runs the load simulation specified by config.
107-
func ExecuteLoader(ctx context.Context, config config.Config) error {
108-
if config.Timeout > 0 {
109-
var cancel context.CancelFunc
110-
ctx, cancel = context.WithTimeout(ctx, config.Timeout)
111-
defer cancel()
118+
log.Info("Starting tx agents...")
119+
eg := errgroup.Group{}
120+
for _, agent := range agents {
121+
agent := agent
122+
eg.Go(func() error {
123+
return agent.Execute(ctx)
124+
})
112125
}
113126

114-
loader, err := CreateLoader(ctx, config)
115-
if err != nil {
127+
log.Info("Waiting for tx agents...")
128+
if err := eg.Wait(); err != nil {
116129
return err
117130
}
118-
return loader.Execute(ctx)
131+
log.Info("Tx agents completed successfully.")
132+
return nil
119133
}

cmd/simulator/load/worker.go

Lines changed: 42 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -10,102 +10,74 @@ import (
1010

1111
"github.com/ava-labs/subnet-evm/core/types"
1212
"github.com/ava-labs/subnet-evm/ethclient"
13+
"github.com/ava-labs/subnet-evm/interfaces"
1314
"github.com/ethereum/go-ethereum/common"
1415
"github.com/ethereum/go-ethereum/log"
1516
)
1617

17-
type Worker struct {
18-
client ethclient.Client
19-
address common.Address
20-
txs []*types.Transaction
21-
}
18+
type singleAddressTxWorker struct {
19+
client ethclient.Client
2220

23-
// NewWorker creates a new worker that will issue the sequence of transactions from the given address
24-
//
25-
// Assumes that all transactions are from the same address, ordered by nonce, and this worker has exclusive access
26-
// to issuance of transactions from the underlying private key.
27-
func NewWorker(client ethclient.Client, address common.Address, txs []*types.Transaction) *Worker {
28-
return &Worker{
29-
client: client,
30-
address: address,
31-
txs: txs,
32-
}
33-
}
21+
acceptedNonce uint64
22+
address common.Address
3423

35-
func (w *Worker) ExecuteTxsFromAddress(ctx context.Context) error {
36-
log.Info("Executing txs", "numTxs", len(w.txs))
37-
for i, tx := range w.txs {
38-
start := time.Now()
39-
err := w.client.SendTransaction(ctx, tx)
40-
if err != nil {
41-
return fmt.Errorf("failed to issue tx %d: %w", i, err)
42-
}
43-
log.Info("execute tx", "tx", tx.Hash(), "nonce", tx.Nonce(), "duration", time.Since(start))
44-
}
45-
return nil
24+
sub interfaces.Subscription
25+
newHeads chan *types.Header
4626
}
4727

48-
// AwaitTxs awaits for the nonce of the last transaction issued by the worker to be confirmed or
49-
// rejected by the network.
50-
//
51-
// Assumes that a non-zero number of transactions were already generated and that they were issued
52-
// by this worker.
53-
func (w *Worker) AwaitTxs(ctx context.Context) error {
54-
nonce := w.txs[len(w.txs)-1].Nonce()
55-
28+
// NewSingleAddressTxWorker creates and returns a singleAddressTxWorker
29+
func NewSingleAddressTxWorker(ctx context.Context, client ethclient.Client, address common.Address) *singleAddressTxWorker {
5630
newHeads := make(chan *types.Header)
57-
defer close(newHeads)
31+
tw := &singleAddressTxWorker{
32+
client: client,
33+
address: address,
34+
newHeads: newHeads,
35+
}
5836

59-
sub, err := w.client.SubscribeNewHead(ctx, newHeads)
37+
sub, err := client.SubscribeNewHead(ctx, newHeads)
6038
if err != nil {
6139
log.Debug("failed to subscribe new heads, falling back to polling", "err", err)
6240
} else {
63-
defer sub.Unsubscribe()
41+
tw.sub = sub
6442
}
6543

44+
return tw
45+
}
46+
47+
func (tw *singleAddressTxWorker) IssueTx(ctx context.Context, tx *types.Transaction) error {
48+
return tw.client.SendTransaction(ctx, tx)
49+
}
50+
51+
func (tw *singleAddressTxWorker) ConfirmTx(ctx context.Context, tx *types.Transaction) error {
52+
txNonce := tx.Nonce()
53+
6654
for {
55+
// If the is less than what has already been accepted, the transaction is confirmed
56+
if txNonce < tw.acceptedNonce {
57+
return nil
58+
}
59+
6760
select {
68-
case <-newHeads:
61+
case <-tw.newHeads:
6962
case <-time.After(time.Second):
7063
case <-ctx.Done():
71-
return fmt.Errorf("failed to await nonce: %w", ctx.Err())
64+
return fmt.Errorf("failed to await tx %s nonce %d: %w", tx.Hash(), txNonce, ctx.Err())
7265
}
7366

74-
currentNonce, err := w.client.NonceAt(ctx, w.address, nil)
67+
// Update the worker's accepted nonce, so we can check on the next iteration
68+
// if the transaction has been accepted.
69+
acceptedNonce, err := tw.client.NonceAt(ctx, tw.address, nil)
7570
if err != nil {
76-
log.Warn("failed to get nonce", "err", err)
77-
}
78-
if currentNonce >= nonce {
79-
return nil
80-
} else {
81-
log.Info("fetched nonce", "awaiting", nonce, "currentNonce", currentNonce)
71+
return fmt.Errorf("failed to await tx %s nonce %d: %w", tx.Hash(), txNonce, err)
8272
}
73+
tw.acceptedNonce = acceptedNonce
8374
}
8475
}
8576

86-
// ConfirmAllTransactions iterates over every transaction of this worker and confirms it
87-
// via eth_getTransactionByHash
88-
func (w *Worker) ConfirmAllTransactions(ctx context.Context) error {
89-
for i, tx := range w.txs {
90-
_, isPending, err := w.client.TransactionByHash(ctx, tx.Hash())
91-
if err != nil {
92-
return fmt.Errorf("failed to confirm tx at index %d: %s", i, tx.Hash())
93-
}
94-
if isPending {
95-
return fmt.Errorf("failed to confirm tx at index %d: pending", i)
96-
}
77+
func (tw *singleAddressTxWorker) Close(ctx context.Context) error {
78+
if tw.sub != nil {
79+
tw.sub.Unsubscribe()
9780
}
98-
log.Info("Confirmed all transactions")
81+
close(tw.newHeads)
9982
return nil
10083
}
101-
102-
// Execute issues and confirms all transactions for the worker.
103-
func (w *Worker) Execute(ctx context.Context) error {
104-
if err := w.ExecuteTxsFromAddress(ctx); err != nil {
105-
return err
106-
}
107-
if err := w.AwaitTxs(ctx); err != nil {
108-
return err
109-
}
110-
return w.ConfirmAllTransactions(ctx)
111-
}

0 commit comments

Comments
 (0)