Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,8 +1189,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
return p.lookup.exists(hash)
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
func (p *BlobPool) getRLP(hash common.Hash) []byte {
// Track the amount of time waiting to retrieve a fully resolved blob tx from
// the pool and the amount of time actually spent on pulling the data from disk.
getStart := time.Now()
Expand All @@ -1212,14 +1211,31 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err)
return nil
}
return data
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
data := p.getRLP(hash)
if len(data) == 0 {
return nil
}
item := new(types.Transaction)
if err = rlp.DecodeBytes(data, item); err != nil {
log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err)
if err := rlp.DecodeBytes(data, item); err != nil {
id, _ := p.lookup.storeidOfTx(hash)

log.Error("Blobs corrupted for traced transaction",
"hash", hash, "id", id, "err", err)
return nil
}
return item
}

// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
func (p *BlobPool) GetRLP(hash common.Hash) []byte {
return p.getRLP(hash)
}

// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
Expand Down
15 changes: 15 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)

Expand Down Expand Up @@ -1010,6 +1011,20 @@ func (pool *LegacyPool) get(hash common.Hash) *types.Transaction {
return pool.all.Get(hash)
}

// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
func (pool *LegacyPool) GetRLP(hash common.Hash) []byte {
tx := pool.all.Get(hash)
if tx == nil {
return nil
}
encoded, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encoded transaction in legacy pool", "hash", hash, "err", err)
return nil
}
return encoded
}

// GetBlobs is not supported by the legacy transaction pool, it is just here to
// implement the txpool.SubPool interface.
func (pool *LegacyPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) {
Expand Down
3 changes: 3 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type SubPool interface {
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *types.Transaction

// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
GetRLP(hash common.Hash) []byte

// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
Expand Down
11 changes: 11 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction {
return nil
}

// GetRLP returns a RLP-encoded transaction if it is contained in the pool.
func (p *TxPool) GetRLP(hash common.Hash) []byte {
for _, subpool := range p.subpools {
encoded := subpool.GetRLP(hash)
if len(encoded) != 0 {
return encoded
}
}
return nil
}

// GetBlobs returns a number of blobs are proofs for the given versioned hashes.
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
Expand Down
4 changes: 4 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type txPool interface {
// tx hash.
Get(hash common.Hash) *types.Transaction

// GetRLP retrieves the RLP-encoded transaction from local txpool
// with given tx hash.
GetRLP(hash common.Hash) []byte

// Add should add the given transactions to the pool.
Add(txs []*types.Transaction, sync bool) []error

Expand Down
15 changes: 15 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)

Expand Down Expand Up @@ -78,6 +79,20 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
return p.pool[hash]
}

// Get retrieves the transaction from local txpool with given
// tx hash.
func (p *testTxPool) GetRLP(hash common.Hash) []byte {
p.lock.Lock()
defer p.lock.Unlock()

tx := p.pool[hash]
if tx != nil {
blob, _ := rlp.EncodeToBytes(tx)
return blob
}
return nil
}

// Add appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
Expand Down
4 changes: 4 additions & 0 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type Backend interface {
type TxPool interface {
// Get retrieves the transaction from the local txpool with the given hash.
Get(hash common.Hash) *types.Transaction

// GetRLP retrieves the RLP-encoded transaction from the local txpool with
// the given hash.
GetRLP(hash common.Hash) []byte
}

// MakeProtocols constructs the P2P protocol definitions for `eth`.
Expand Down
116 changes: 107 additions & 9 deletions eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package eth

import (
"bytes"
"crypto/sha256"
"math"
"math/big"
"math/rand"
"os"
"testing"
"time"

Expand All @@ -30,15 +32,18 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
)

var (
Expand All @@ -62,12 +67,12 @@ type testBackend struct {

// newTestBackend creates an empty chain and wraps it into a mock backend.
func newTestBackend(blocks int) *testBackend {
return newTestBackendWithGenerator(blocks, false, nil)
return newTestBackendWithGenerator(blocks, false, false, nil)
}

// newTestBackendWithGenerator creates a chain with a number of explicitly defined blocks and
// wraps it into a mock backend.
func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int, *core.BlockGen)) *testBackend {
func newTestBackendWithGenerator(blocks int, shanghai bool, cancun bool, generator func(int, *core.BlockGen)) *testBackend {
var (
// Create a database pre-initialize with a genesis block
db = rawdb.NewMemoryDatabase()
Expand Down Expand Up @@ -99,9 +104,21 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int,
}
}

if cancun {
config.CancunTime = u64(0)
config.BlobScheduleConfig = &params.BlobScheduleConfig{
Cancun: &params.BlobConfig{
Target: 3,
Max: 6,
UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction,
},
}
}

gspec := &core.Genesis{
Config: config,
Alloc: types.GenesisAlloc{testAddr: {Balance: big.NewInt(100_000_000_000_000_000)}},
Config: config,
Alloc: types.GenesisAlloc{testAddr: {Balance: big.NewInt(100_000_000_000_000_000)}},
Difficulty: common.Big0,
}
chain, _ := core.NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil)

Expand All @@ -115,8 +132,12 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int,
txconfig := legacypool.DefaultConfig
txconfig.Journal = "" // Don't litter the disk with test journals

pool := legacypool.New(txconfig, chain)
txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{pool})
storage, _ := os.MkdirTemp("", "blobpool-")
defer os.RemoveAll(storage)

blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain)
legacyPool := legacypool.New(txconfig, chain)
txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{legacyPool, blobPool})

return &testBackend{
db: db,
Expand Down Expand Up @@ -351,7 +372,7 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}
}

backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen)
backend := newTestBackendWithGenerator(maxBodiesServe+15, true, false, gen)
defer backend.close()

peer, _ := newTestPeer("peer", protocol, backend)
Expand Down Expand Up @@ -471,7 +492,7 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
}
}
// Assemble the test environment
backend := newTestBackendWithGenerator(4, false, generator)
backend := newTestBackendWithGenerator(4, false, false, generator)
defer backend.close()

peer, _ := newTestPeer("peer", protocol, backend)
Expand Down Expand Up @@ -548,7 +569,7 @@ func setup() (*testBackend, *testPeer) {
block.SetExtra([]byte("yeehaw"))
}
}
backend := newTestBackendWithGenerator(maxBodiesServe+15, true, gen)
backend := newTestBackendWithGenerator(maxBodiesServe+15, true, false, gen)
peer, _ := newTestPeer("peer", ETH68, backend)
// Discard all messages
go func() {
Expand All @@ -573,3 +594,80 @@ func FuzzEthProtocolHandlers(f *testing.F) {
handler(backend, decoder{msg: msg}, peer.Peer)
})
}

func TestGetPooledTransaction(t *testing.T) {
t.Run("blobTx", func(t *testing.T) {
testGetPooledTransaction(t, true)
})
t.Run("legacyTx", func(t *testing.T) {
testGetPooledTransaction(t, false)
})
}

func testGetPooledTransaction(t *testing.T, blobTx bool) {
var (
emptyBlob = kzg4844.Blob{}
emptyBlobs = []kzg4844.Blob{emptyBlob}
emptyBlobCommit, _ = kzg4844.BlobToCommitment(&emptyBlob)
emptyBlobProof, _ = kzg4844.ComputeBlobProof(&emptyBlob, emptyBlobCommit)
emptyBlobHash = kzg4844.CalcBlobHashV1(sha256.New(), &emptyBlobCommit)
)
backend := newTestBackendWithGenerator(0, true, true, nil)
defer backend.close()

peer, _ := newTestPeer("peer", ETH68, backend)
defer peer.close()

var (
tx *types.Transaction
err error
signer = types.NewCancunSigner(params.TestChainConfig.ChainID)
)
if blobTx {
tx, err = types.SignNewTx(testKey, signer, &types.BlobTx{
ChainID: uint256.MustFromBig(params.TestChainConfig.ChainID),
Nonce: 0,
GasTipCap: uint256.NewInt(20_000_000_000),
GasFeeCap: uint256.NewInt(21_000_000_000),
Gas: 21000,
To: testAddr,
BlobHashes: []common.Hash{emptyBlobHash},
BlobFeeCap: uint256.MustFromBig(common.Big1),
Sidecar: &types.BlobTxSidecar{
Blobs: emptyBlobs,
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
},
})
if err != nil {
t.Fatal(err)
}
} else {
tx, err = types.SignTx(
types.NewTransaction(0, testAddr, big.NewInt(10_000), params.TxGas, big.NewInt(1_000_000_000), nil),
signer,
testKey,
)
if err != nil {
t.Fatal(err)
}
}
errs := backend.txpool.Add([]*types.Transaction{tx}, true)
for _, err := range errs {
if err != nil {
t.Fatal(err)
}
}

// Send the hash request and verify the response
p2p.Send(peer.app, GetPooledTransactionsMsg, GetPooledTransactionsPacket{
RequestId: 123,
GetPooledTransactionsRequest: []common.Hash{tx.Hash()},
})
if err := p2p.ExpectMsg(peer.app, PooledTransactionsMsg, PooledTransactionsPacket{
RequestId: 123,
PooledTransactionsResponse: []*types.Transaction{tx},
}); err != nil {
t.Errorf("pooled transaction mismatch: %v", err)
}
}
15 changes: 5 additions & 10 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,18 +397,13 @@ func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsReq
break
}
// Retrieve the requested transaction, skipping if unknown to us
tx := backend.TxPool().Get(hash)
if tx == nil {
encoded := backend.TxPool().GetRLP(hash)
if len(encoded) == 0 {
continue
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(tx); err != nil {
log.Error("Failed to encode transaction", "err", err)
} else {
hashes = append(hashes, hash)
txs = append(txs, encoded)
bytes += len(encoded)
}
hashes = append(hashes, hash)
txs = append(txs, encoded)
bytes += len(encoded)
}
return hashes, txs
}
Expand Down