Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/types: support for optional blob sidecar in BlobTx #27841

Merged
merged 22 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
16dc741
core/types: remove blob methods from TxData interface
fjl Aug 1, 2023
13b7f12
core/types: optional blob sidecar in BlobTx
fjl Aug 2, 2023
d2c86cd
core: verify absence of sidecar when importing blocks
fjl Aug 2, 2023
10b14f4
core/txpool: new blob tx type integration
fjl Aug 3, 2023
bd582b5
core: fix comment
fjl Aug 3, 2023
014d70a
core/types: remove some receiver names in TxData
fjl Aug 3, 2023
394ef8b
all: removing txpool.Transaction
fjl Aug 3, 2023
f756841
miner: strip sidecar in commit
fjl Aug 3, 2023
d6945e9
eth: fix issue in test
fjl Aug 3, 2023
a03f3d1
core/txpool/blobpool: check sidecar is still there after decoding tx
fjl Aug 3, 2023
b84a6ed
core/types: remove unused method blobGasFeeCap
fjl Aug 3, 2023
e9dab44
core/txpool/blobpool: add error message
fjl Aug 7, 2023
67fec27
core: improve validation loop in InsertReceiptChain
fjl Aug 7, 2023
1de750f
core: check absence of blob sidecar in InsertReceiptChain
fjl Aug 7, 2023
f37d471
core/txpool/blobpool: simplify some code in limbo
fjl Aug 7, 2023
9363dd5
eth/downloader: add check for blob absence in DeliverBodies
fjl Aug 8, 2023
d2f7c47
core/types: rename BlobSidecar to BlobTxSidecar
fjl Aug 8, 2023
1995292
core/types: add a method to compute blob hashes from BlobTxSidecar
fjl Aug 9, 2023
3edddbc
core/types: fix crash in copy for BlobTx
fjl Aug 9, 2023
b6e34aa
core/types: add a test to verify hashing of BlobTx ignores the sidecar
fjl Aug 9, 2023
e86e43e
core/types: make Size work correctly for BlobTx with sidecar
fjl Aug 9, 2023
de44d26
core: remove redundant tx type check in block validator
fjl Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
core/txpool: new blob tx type integration
  • Loading branch information
fjl committed Aug 7, 2023
commit 10b14f478df39e65ab0596e0453d86cf4b103383
69 changes: 28 additions & 41 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -83,16 +82,6 @@ const (
limboedTransactionStore = "limbo"
)

// blobTx is a wrapper around types.BlobTx which also contains the literal blob
// data along with all the transaction metadata.
type blobTx struct {
Tx *types.Transaction

Blobs []kzg4844.Blob
Commits []kzg4844.Commitment
Proofs []kzg4844.Proof
}

// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
// schedule the blob transactions into the following blocks. Only ever add the
// bare minimum needed fields to keep the size down (and thus number of entries
Expand Down Expand Up @@ -455,22 +444,22 @@ func (p *BlobPool) Close() error {
// parseTransaction is a callback method on pool creation that gets called for
// each transaction on disk to create the in-memory metadata index.
func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
item := new(blobTx)
if err := rlp.DecodeBytes(blob, item); err != nil {
tx := new(types.Transaction)
if err := rlp.DecodeBytes(blob, tx); err != nil {
// This path is impossible unless the disk data representation changes
// across restarts. For that ever unprobable case, recover gracefully
// by ignoring this data entry.
log.Error("Failed to decode blob pool entry", "id", id, "err", err)
return err
}
meta := newBlobTxMeta(id, size, item.Tx)
meta := newBlobTxMeta(id, size, tx)

sender, err := p.signer.Sender(item.Tx)
sender, err := p.signer.Sender(tx)
if err != nil {
// This path is impossible unless the signature validity changes across
// restarts. For that ever unprobable case, recover gracefully by ignoring
// this data entry.
log.Error("Failed to recover blob tx sender", "id", id, "hash", item.Tx.Hash(), "err", err)
log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err)
return err
}
if _, ok := p.index[sender]; !ok {
Expand Down Expand Up @@ -718,17 +707,17 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
item := new(blobTx)
if err = rlp.DecodeBytes(data, item); err != nil {
var tx types.Transaction
if err = rlp.DecodeBytes(data, tx); err != nil {
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
block, ok := inclusions[item.Tx.Hash()]
block, ok := inclusions[tx.Hash()]
if !ok {
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id)
return
}
if err := p.limbo.push(item.Tx.Hash(), block, item.Blobs, item.Commits, item.Proofs); err != nil {
if err := p.limbo.push(tx.Hash(), block, &tx); err != nil {
log.Warn("Failed to offload blob tx into limbo", "err", err)
return
}
Expand Down Expand Up @@ -760,7 +749,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
for addr, txs := range reinject {
// Blindly push all the lost transactions back into the pool
for _, tx := range txs {
p.reinject(addr, tx)
p.reinject(addr, tx.Hash())
}
// Recheck the account's pooled transactions to drop included and
// invalidated one
Expand Down Expand Up @@ -920,16 +909,19 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
// Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool.
func (p *BlobPool) reinject(addr common.Address, tx *types.Transaction) {
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
blobs, commits, proofs, err := p.limbo.pull(tx.Hash())
tx, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return
}
// Serialize the transaction back into the primary datastore
blob, err := rlp.EncodeToBytes(&blobTx{Tx: tx, Blobs: blobs, Commits: commits, Proofs: proofs})
// TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here.

// Serialize the transaction back into the primary datastore.
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return
Expand All @@ -939,9 +931,9 @@ func (p *BlobPool) reinject(addr common.Address, tx *types.Transaction) {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return
}

// Update the indixes and metrics
meta := newBlobTxMeta(id, p.store.Size(id), tx)

if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
Expand Down Expand Up @@ -1023,7 +1015,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (p *BlobPool) validateTx(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error {
func (p *BlobPool) validateTx(tx *types.Transaction) error {
// Ensure the transaction adheres to basic pool filters (type, size, tip) and
// consensus rules
baseOpts := &txpool.ValidationOptions{
Expand All @@ -1032,7 +1024,7 @@ func (p *BlobPool) validateTx(tx *types.Transaction, blobs []kzg4844.Blob, commi
MaxSize: txMaxSize,
MinTip: p.gasTip.ToBig(),
}
if err := txpool.ValidateTransaction(tx, blobs, commits, proofs, p.head, p.signer, baseOpts); err != nil {
if err := txpool.ValidateTransaction(tx, p.head, p.signer, baseOpts); err != nil {
return err
}
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
Expand Down Expand Up @@ -1117,7 +1109,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *txpool.Transaction {
func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
// Track the amount of time waiting to retrieve a fully resolved blob tx from
// the pool and the amount of time actually spent on pulling the data from disk.
getStart := time.Now()
Expand All @@ -1139,32 +1131,27 @@ func (p *BlobPool) Get(hash common.Hash) *txpool.Transaction {
log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err)
return nil
}
item := new(blobTx)
item := new(types.Transaction)
if err = rlp.DecodeBytes(data, item); err != nil {
log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err)
return nil
}
return &txpool.Transaction{
Tx: item.Tx,
BlobTxBlobs: item.Blobs,
BlobTxCommits: item.Commits,
BlobTxProofs: item.Proofs,
}
return item
}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
func (p *BlobPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errs := make([]error, len(txs))
for i, tx := range txs {
errs[i] = p.add(tx.Tx, tx.BlobTxBlobs, tx.BlobTxCommits, tx.BlobTxProofs)
errs[i] = p.add(tx)
}
return errs
}

// Add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restictions).
func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) (err error) {
func (p *BlobPool) add(tx *types.Transaction) (err error) {
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled form the network, so this method will act as the overload
// protection for fetches.
Expand All @@ -1178,7 +1165,7 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
}(time.Now())

// Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx, blobs, commits, proofs); err != nil {
if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
return err
}
Expand All @@ -1203,7 +1190,7 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
}
// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices
blob, err := rlp.EncodeToBytes(&blobTx{Tx: tx, Blobs: blobs, Commits: commits, Proofs: proofs})
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err
Expand Down
51 changes: 27 additions & 24 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func makeAddressReserver() txpool.AddressReserver {
// with a valid key, only setting the interesting fields from the perspective of
// the blob pool.
func makeTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, key *ecdsa.PrivateKey) *types.Transaction {
tx, _ := types.SignNewTx(key, types.LatestSigner(testChainConfig), makeUnsignedTx(nonce, gasTipCap, gasFeeCap, blobFeeCap))
return tx
blobtx := makeUnsignedTx(nonce, gasTipCap, gasFeeCap, blobFeeCap)
return types.MustSignNewTx(key, types.LatestSigner(testChainConfig), blobtx)
}

// makeUnsignedTx is a utility method to construct a random blob tranasaction
Expand All @@ -209,6 +209,11 @@ func makeUnsignedTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap
BlobFeeCap: uint256.NewInt(blobFeeCap),
BlobHashes: []common.Hash{emptyBlobVHash},
Value: uint256.NewInt(100),
Sidecar: &types.BlobSidecar{
Blobs: []kzg4844.Blob{emptyBlob},
Commitments: []kzg4844.Commitment{emptyBlobCommit},
Proofs: []kzg4844.Proof{emptyBlobProof},
},
}
}

Expand Down Expand Up @@ -341,7 +346,7 @@ func TestOpenDrops(t *testing.T) {
R: new(uint256.Int),
S: new(uint256.Int),
})
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)
badsig, _ := store.Put(blob)

// Insert a sequence of transactions with a nonce gap in between to verify
Expand All @@ -354,7 +359,7 @@ func TestOpenDrops(t *testing.T) {
)
for _, nonce := range []uint64{0, 1, 3, 4, 6, 7} { // first gap at #2, another at #5
tx := makeTx(nonce, 1, 1, 1, gapper)
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
if nonce < 2 {
Expand All @@ -371,7 +376,7 @@ func TestOpenDrops(t *testing.T) {
)
for _, nonce := range []uint64{1, 2, 3} { // first gap at #0, all set dangling
tx := makeTx(nonce, 1, 1, 1, dangler)
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
dangling[id] = struct{}{}
Expand All @@ -384,7 +389,7 @@ func TestOpenDrops(t *testing.T) {
)
for _, nonce := range []uint64{0, 1, 2} { // account nonce at 3, all set filled
tx := makeTx(nonce, 1, 1, 1, filler)
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
filled[id] = struct{}{}
Expand All @@ -397,7 +402,7 @@ func TestOpenDrops(t *testing.T) {
)
for _, nonce := range []uint64{0, 1, 2, 3} { // account nonce at 2, half filled
tx := makeTx(nonce, 1, 1, 1, overlapper)
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
if nonce >= 2 {
Expand All @@ -419,7 +424,7 @@ func TestOpenDrops(t *testing.T) {
} else {
tx = makeTx(uint64(i), 1, 1, 1, underpayer)
}
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
underpaid[id] = struct{}{}
Expand All @@ -438,7 +443,7 @@ func TestOpenDrops(t *testing.T) {
} else {
tx = makeTx(uint64(i), 1, 1, 1, outpricer)
}
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
if i < 2 {
Expand All @@ -460,7 +465,7 @@ func TestOpenDrops(t *testing.T) {
} else {
tx = makeTx(nonce, 1, 1, 1, exceeder)
}
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
exceeded[id] = struct{}{}
Expand All @@ -478,7 +483,7 @@ func TestOpenDrops(t *testing.T) {
} else {
tx = makeTx(nonce, 1, 1, 1, overdrafter)
}
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)

id, _ := store.Put(blob)
if nonce < 1 {
Expand All @@ -494,7 +499,7 @@ func TestOpenDrops(t *testing.T) {
overcapped = make(map[uint64]struct{})
)
for nonce := uint64(0); nonce < maxTxsPerAccount+3; nonce++ {
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: makeTx(nonce, 1, 1, 1, overcapper)})
blob, _ := rlp.EncodeToBytes(makeTx(nonce, 1, 1, 1, overcapper))

id, _ := store.Put(blob)
if nonce < maxTxsPerAccount {
Expand Down Expand Up @@ -625,7 +630,7 @@ func TestOpenIndex(t *testing.T) {
)
for _, i := range []int{5, 3, 4, 2, 0, 1} { // Randomize the tx insertion order to force sorting on load
tx := makeTx(uint64(i), txExecTipCaps[i], txExecFeeCaps[i], txBlobFeeCaps[i], key)
blob, _ := rlp.EncodeToBytes(&blobTx{Tx: tx})
blob, _ := rlp.EncodeToBytes(tx)
store.Put(blob)
}
store.Close()
Expand Down Expand Up @@ -718,9 +723,9 @@ func TestOpenHeap(t *testing.T) {
tx2 = makeTx(0, 1, 800, 70, key2)
tx3 = makeTx(0, 1, 1500, 110, key3)

blob1, _ = rlp.EncodeToBytes(&blobTx{Tx: tx1})
blob2, _ = rlp.EncodeToBytes(&blobTx{Tx: tx2})
blob3, _ = rlp.EncodeToBytes(&blobTx{Tx: tx3})
blob1, _ = rlp.EncodeToBytes(tx1)
blob2, _ = rlp.EncodeToBytes(tx2)
blob3, _ = rlp.EncodeToBytes(tx3)

heapOrder = []common.Address{addr2, addr1, addr3}
heapIndex = map[common.Address]int{addr2: 0, addr1: 1, addr3: 2}
Expand Down Expand Up @@ -794,9 +799,9 @@ func TestOpenCap(t *testing.T) {
tx2 = makeTx(0, 1, 800, 70, key2)
tx3 = makeTx(0, 1, 1500, 110, key3)

blob1, _ = rlp.EncodeToBytes(&blobTx{Tx: tx1, Blobs: []kzg4844.Blob{emptyBlob}, Commits: []kzg4844.Commitment{emptyBlobCommit}, Proofs: []kzg4844.Proof{emptyBlobProof}})
blob2, _ = rlp.EncodeToBytes(&blobTx{Tx: tx2, Blobs: []kzg4844.Blob{emptyBlob}, Commits: []kzg4844.Commitment{emptyBlobCommit}, Proofs: []kzg4844.Proof{emptyBlobProof}})
blob3, _ = rlp.EncodeToBytes(&blobTx{Tx: tx3, Blobs: []kzg4844.Blob{emptyBlob}, Commits: []kzg4844.Commitment{emptyBlobCommit}, Proofs: []kzg4844.Proof{emptyBlobProof}})
blob1, _ = rlp.EncodeToBytes(tx1)
blob2, _ = rlp.EncodeToBytes(tx2)
blob3, _ = rlp.EncodeToBytes(tx3)

keep = []common.Address{addr1, addr3}
drop = []common.Address{addr2}
Expand Down Expand Up @@ -1210,10 +1215,8 @@ func TestAdd(t *testing.T) {

// Sign the seed transactions and store them in the data store
for _, tx := range seed.txs {
var (
signed, _ = types.SignNewTx(keys[acc], types.LatestSigner(testChainConfig), tx)
blob, _ = rlp.EncodeToBytes(&blobTx{Tx: signed, Blobs: []kzg4844.Blob{emptyBlob}, Commits: []kzg4844.Commitment{emptyBlobCommit}, Proofs: []kzg4844.Proof{emptyBlobProof}})
)
signed := types.MustSignNewTx(keys[acc], types.LatestSigner(testChainConfig), tx)
blob, _ := rlp.EncodeToBytes(signed)
store.Put(blob)
}
}
Expand All @@ -1236,7 +1239,7 @@ func TestAdd(t *testing.T) {
// Add each transaction one by one, verifying the pool internals in between
for j, add := range tt.adds {
signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(testChainConfig), add.tx)
if err := pool.add(signed, []kzg4844.Blob{emptyBlob}, []kzg4844.Commitment{emptyBlobCommit}, []kzg4844.Proof{emptyBlobProof}); !errors.Is(err, add.err) {
if err := pool.add(signed); !errors.Is(err, add.err) {
t.Errorf("test %d, tx %d: adding transaction error mismatch: have %v, want %v", i, j, err, add.err)
}
verifyPoolInternals(t, pool)
Expand Down
Loading