Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les, light: implement ODR transaction lookup by hash #19069

Merged
merged 5 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -174,6 +175,11 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction
return b.eth.txPool.Get(hash)
}

func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.eth.ChainDb(), txHash)
return tx, blockHash, blockNumber, index, nil
}

func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
return b.eth.txPool.State().GetNonce(addr), nil
}
Expand Down
23 changes: 15 additions & 8 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,25 +1152,32 @@ func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, addr
}

// GetTransactionByHash returns the transaction for the given hash
func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) *RPCTransaction {
func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
// Try to return an already finalized transaction
if tx, blockHash, blockNumber, index := rawdb.ReadTransaction(s.b.ChainDb(), hash); tx != nil {
return newRPCTransaction(tx, blockHash, blockNumber, index)
tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash)
if err != nil {
return nil, err
}
if tx != nil {
return newRPCTransaction(tx, blockHash, blockNumber, index), nil
}
// No finalized transaction, try to retrieve it from the pool
if tx := s.b.GetPoolTransaction(hash); tx != nil {
return newRPCPendingTransaction(tx)
return newRPCPendingTransaction(tx), nil
}

// Transaction unknown, return as such
return nil
return nil, nil
}

// GetRawTransactionByHash returns the bytes of the transaction for the given hash.
func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
var tx *types.Transaction

// Retrieve a finalized transaction, or a pooled otherwise
if tx, _, _, _ = rawdb.ReadTransaction(s.b.ChainDb(), hash); tx == nil {
tx, _, _, _, err := s.b.GetTransaction(ctx, hash)
if err != nil {
return nil, err
}
if tx == nil {
if tx = s.b.GetPoolTransaction(hash); tx == nil {
// Transaction not found anywhere, abort
return nil, nil
Expand Down
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Backend interface {

// TxPool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
Expand Down
4 changes: 4 additions & 0 deletions les/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (b *LesApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transactio
return b.eth.txPool.GetTransaction(txHash)
}

func (b *LesApiBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
return light.GetTransaction(ctx, b.eth.odr, txHash)
}

func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
return b.eth.txPool.GetNonce(ctx, addr)
}
Expand Down
3 changes: 2 additions & 1 deletion les/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
if leth.config.ULC != nil {
trustedNodes = leth.config.ULC.TrustedServers
}
leth.relay = NewLesTxRelay(peers, leth.reqDist)
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg, trustedNodes)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
leth.relay = NewLesTxRelay(peers, leth.retriever)

leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations)
Expand Down Expand Up @@ -271,6 +271,7 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error {
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
s.odr.Stop()
s.relay.Stop()
s.bloomIndexer.Close()
s.chtIndexer.Close()
s.blockchain.Stop()
Expand Down
17 changes: 12 additions & 5 deletions les/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrRequestRejected, "")
}
go func() {
stats := make([]txStatus, len(req.Txs))
stats := make([]light.TxStatus, len(req.Txs))
for i, tx := range req.Txs {
if i != 0 && !task.waitOrStop() {
return
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrRequestRejected, "")
}
go func() {
stats := make([]txStatus, len(req.Hashes))
stats := make([]light.TxStatus, len(req.Hashes))
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
return
Expand All @@ -1032,14 +1032,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.Log().Trace("Received tx status response")
var resp struct {
ReqID, BV uint64
Status []txStatus
Status []light.TxStatus
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}

p.fcServer.ReceivedReply(resp.ReqID, resp.BV)

p.Log().Trace("Received helper trie proof response")
deliverMsg = &Msg{
MsgType: MsgTxStatus,
ReqID: resp.ReqID,
Obj: resp.Status,
}
zsfelfoldi marked this conversation as resolved.
Show resolved Hide resolved

default:
p.Log().Trace("Received unknown message", "code", msg.Code)
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
Expand Down Expand Up @@ -1097,8 +1104,8 @@ func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
return nil
}

func (pm *ProtocolManager) txStatus(hash common.Hash) txStatus {
var stat txStatus
func (pm *ProtocolManager) txStatus(hash common.Hash) light.TxStatus {
var stat light.TxStatus
stat.Status = pm.txpool.Status([]common.Hash{hash})[0]
// If the transaction is unknown to the pool, try looking it up locally
if stat.Status == core.TxStatusUnknown {
Expand Down
26 changes: 13 additions & 13 deletions les/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func TestTransactionStatusLes2(t *testing.T) {

var reqID uint64

test := func(tx *types.Transaction, send bool, expStatus txStatus) {
test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) {
reqID++
if send {
cost := peer.GetRequestCost(SendTxV2Msg, 1)
Expand All @@ -452,7 +452,7 @@ func TestTransactionStatusLes2(t *testing.T) {
cost := peer.GetRequestCost(GetTxStatusMsg, 1)
sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()})
}
if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []txStatus{expStatus}); err != nil {
if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil {
t.Errorf("transaction status mismatch")
}
}
Expand All @@ -461,20 +461,20 @@ func TestTransactionStatusLes2(t *testing.T) {

// test error status by sending an underpriced transaction
tx0, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
test(tx0, true, txStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()})
test(tx0, true, light.TxStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()})

tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
test(tx1, false, txStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown
test(tx1, true, txStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending
test(tx1, true, txStatus{Status: core.TxStatusPending}) // adding it again should not return an error
test(tx1, false, light.TxStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown
test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending
test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // adding it again should not return an error

tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
tx3, _ := types.SignTx(types.NewTransaction(2, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
// send transactions in the wrong order, tx3 should be queued
test(tx3, true, txStatus{Status: core.TxStatusQueued})
test(tx2, true, txStatus{Status: core.TxStatusPending})
test(tx3, true, light.TxStatus{Status: core.TxStatusQueued})
test(tx2, true, light.TxStatus{Status: core.TxStatusPending})
// query again, now tx3 should be pending too
test(tx3, false, txStatus{Status: core.TxStatusPending})
test(tx3, false, light.TxStatus{Status: core.TxStatusPending})

// generate and add a block with tx1 and tx2 included
gchain, _ := core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) {
Expand All @@ -497,8 +497,8 @@ func TestTransactionStatusLes2(t *testing.T) {

// check if their status is included now
block1hash := rawdb.ReadCanonicalHash(db, 1)
test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}})
test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}})
test(tx1, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}})
test(tx2, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}})

// create a reorg that rolls them back
gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 2, func(i int, block *core.BlockGen) {})
Expand All @@ -516,6 +516,6 @@ func TestTransactionStatusLes2(t *testing.T) {
t.Fatalf("pending count mismatch: have %d, want 3", pending)
}
// check if their status is pending again
test(tx1, false, txStatus{Status: core.TxStatusPending})
test(tx2, false, txStatus{Status: core.TxStatusPending})
test(tx1, false, light.TxStatus{Status: core.TxStatusPending})
test(tx2, false, light.TxStatus{Status: core.TxStatusPending})
}
4 changes: 3 additions & 1 deletion les/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
}
genesis = gspec.MustCommit(db)
chain BlockChain
pool txPool
)
if peers == nil {
peers = newPeerSet()
Expand All @@ -162,13 +163,14 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
panic(err)
}
chain = blockchain
pool = core.NewTxPool(core.DefaultTxPoolConfig, gspec.Config, blockchain)
}

indexConfig := light.TestServerIndexerConfig
if lightSync {
indexConfig = light.TestClientIndexerConfig
}
pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig)
pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, pool, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions les/odr.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const (
MsgReceipts
MsgProofsV2
MsgHelperTrieProofs
MsgTxStatus
)

// Msg encodes a LES message that delivers reply data for a request
Expand Down
40 changes: 40 additions & 0 deletions les/odr_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func LesRequest(req light.OdrRequest) LesOdrRequest {
return (*ChtRequest)(r)
case *light.BloomRequest:
return (*BloomRequest)(r)
case *light.TxStatusRequest:
return (*TxStatusRequest)(r)
default:
return nil
}
Expand Down Expand Up @@ -471,6 +473,44 @@ func (r *BloomRequest) Validate(db ethdb.Database, msg *Msg) error {
return nil
}

// TxStatusRequest is the ODR request type for transaction status
type TxStatusRequest light.TxStatusRequest

// GetCost returns the cost of the given ODR request according to the serving
// peer's cost table (implementation of LesOdrRequest)
func (r *TxStatusRequest) GetCost(peer *peer) uint64 {
return peer.GetRequestCost(GetTxStatusMsg, len(r.Hashes))
}

// CanSend tells if a certain peer is suitable for serving the given request
func (r *TxStatusRequest) CanSend(peer *peer) bool {
return peer.version >= lpv2
}

// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
func (r *TxStatusRequest) Request(reqID uint64, peer *peer) error {
peer.Log().Debug("Requesting transaction status", "count", len(r.Hashes))
return peer.RequestTxStatus(reqID, r.GetCost(peer), r.Hashes)
}

// Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest)
func (r *TxStatusRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating transaction status", "count", len(r.Hashes))

// Ensure we have a correct message with a single block body
if msg.MsgType != MsgTxStatus {
return errInvalidMessageType
}
status := msg.Obj.([]light.TxStatus)
if len(status) != len(r.Hashes) {
return errInvalidEntryCount
}
r.Status = status
return nil
}

// readTraceDB stores the keys of database reads. We use this to check that received node
// sets contain only the trie nodes necessary to make proofs pass.
type readTraceDB struct {
Expand Down
45 changes: 35 additions & 10 deletions les/odr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (

type odrTestFn func(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte

func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, odrGetBlock) }
func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetBlock) }

func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
var block *types.Block
Expand All @@ -54,7 +54,7 @@ func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainCon
return rlp
}

func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, odrGetReceipts) }
func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetReceipts) }

func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
var receipts types.Receipts
Expand All @@ -74,7 +74,7 @@ func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.Chain
return rlp
}

func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, odrAccounts) }
func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrAccounts) }

func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
dummyAddr := common.HexToAddress("1234567812345678123456781234567812345678")
Expand Down Expand Up @@ -102,7 +102,7 @@ func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainCon
return res
}

func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, odrContractCall) }
func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, true, odrContractCall) }

type callmsg struct {
types.Message
Expand Down Expand Up @@ -151,8 +151,32 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai
return res
}

func TestOdrTxStatusLes2(t *testing.T) { testOdr(t, 2, 1, false, odrTxStatus) }

func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
var txs types.Transactions
if bc != nil {
block := bc.GetBlockByHash(bhash)
txs = block.Transactions()
} else {
if block, _ := lc.GetBlockByHash(ctx, bhash); block != nil {
btxs := block.Transactions()
txs = make(types.Transactions, len(btxs))
for i, tx := range btxs {
var err error
txs[i], _, _, _, err = light.GetTransaction(ctx, lc.Odr(), tx.Hash())
if err != nil {
return nil
}
}
}
}
rlp, _ := rlp.EncodeToBytes(txs)
return rlp
}

// testOdr tests odr requests whose validation guaranteed by block headers.
func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) {
// Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
defer tearDown()
Expand Down Expand Up @@ -193,9 +217,10 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
client.rPeer.hasBlock = func(common.Hash, uint64, bool) bool { return true }
client.peers.lock.Unlock()
test(5)

// still expect all retrievals to pass, now data should be cached locally
client.peers.Unregister(client.rPeer.id)
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
test(5)
if checkCached {
// still expect all retrievals to pass, now data should be cached locally
client.peers.Unregister(client.rPeer.id)
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
test(5)
}
}
2 changes: 1 addition & 1 deletion les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply
}

// ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested.
func (p *peer) ReplyTxStatus(reqID uint64, stats []txStatus) *reply {
func (p *peer) ReplyTxStatus(reqID uint64, stats []light.TxStatus) *reply {
data, _ := rlp.EncodeToBytes(stats)
return &reply{p.rw, TxStatusMsg, reqID, data}
}
Expand Down
Loading