Skip to content

Commit 4d79d98

Browse files
lmittmannfjl
authored andcommitted
eth/filters, ethclient/gethclient: add fullTx option to pending tx filter (ethereum#25186)
This PR adds a way to subscribe to the _full_ pending transactions, as opposed to just being notified about hashes. In use cases where client subscribes to newPendingTransactions and gets txhashes only to then request the actual transaction, the caller can now shortcut that flow and obtain the transactions directly. Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent 2738b56 commit 4d79d98

File tree

5 files changed

+91
-43
lines changed

5 files changed

+91
-43
lines changed

eth/filters/api.go

+24-14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type filter struct {
3838
typ Type
3939
deadline *time.Timer // filter is inactive when deadline triggers
4040
hashes []common.Hash
41+
txs []*types.Transaction
4142
crit FilterCriteria
4243
logs []*types.Log
4344
s *Subscription // associated subscription in event system
@@ -96,28 +97,28 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
9697
}
9798
}
9899

99-
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
100+
// NewPendingTransactionFilter creates a filter that fetches pending transactions
100101
// as transactions enter the pending state.
101102
//
102103
// It is part of the filter package because this filter can be used through the
103104
// `eth_getFilterChanges` polling method that is also used for log filters.
104105
func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
105106
var (
106-
pendingTxs = make(chan []common.Hash)
107+
pendingTxs = make(chan []*types.Transaction)
107108
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
108109
)
109110

110111
api.filtersMu.Lock()
111-
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
112+
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
112113
api.filtersMu.Unlock()
113114

114115
go func() {
115116
for {
116117
select {
117-
case ph := <-pendingTxs:
118+
case pTx := <-pendingTxs:
118119
api.filtersMu.Lock()
119120
if f, found := api.filters[pendingTxSub.ID]; found {
120-
f.hashes = append(f.hashes, ph...)
121+
f.txs = append(f.txs, pTx...)
121122
}
122123
api.filtersMu.Unlock()
123124
case <-pendingTxSub.Err():
@@ -132,9 +133,10 @@ func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
132133
return pendingTxSub.ID
133134
}
134135

135-
// NewPendingTransactions creates a subscription that is triggered each time a transaction
136-
// enters the transaction pool and was signed from one of the transactions this nodes manages.
137-
func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
136+
// NewPendingTransactions creates a subscription that is triggered each time a
137+
// transaction enters the transaction pool. If fullTx is true the full tx is
138+
// sent to the client, otherwise the hash is sent.
139+
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
138140
notifier, supported := rpc.NotifierFromContext(ctx)
139141
if !supported {
140142
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
@@ -143,16 +145,20 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscrip
143145
rpcSub := notifier.CreateSubscription()
144146

145147
go func() {
146-
txHashes := make(chan []common.Hash, 128)
147-
pendingTxSub := api.events.SubscribePendingTxs(txHashes)
148+
txs := make(chan []*types.Transaction, 128)
149+
pendingTxSub := api.events.SubscribePendingTxs(txs)
148150

149151
for {
150152
select {
151-
case hashes := <-txHashes:
153+
case txs := <-txs:
152154
// To keep the original behaviour, send a single tx hash in one notification.
153155
// TODO(rjl493456442) Send a batch of tx hashes in one notification
154-
for _, h := range hashes {
155-
notifier.Notify(rpcSub.ID, h)
156+
for _, tx := range txs {
157+
if fullTx != nil && *fullTx {
158+
notifier.Notify(rpcSub.ID, tx)
159+
} else {
160+
notifier.Notify(rpcSub.ID, tx.Hash())
161+
}
156162
}
157163
case <-rpcSub.Err():
158164
pendingTxSub.Unsubscribe()
@@ -411,10 +417,14 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
411417
f.deadline.Reset(api.timeout)
412418

413419
switch f.typ {
414-
case PendingTransactionsSubscription, BlocksSubscription:
420+
case BlocksSubscription:
415421
hashes := f.hashes
416422
f.hashes = nil
417423
return returnHashes(hashes), nil
424+
case PendingTransactionsSubscription:
425+
txs := f.txs
426+
f.txs = nil
427+
return txs, nil
418428
case LogsSubscription, MinedAndPendingLogsSubscription:
419429
logs := f.logs
420430
f.logs = nil

eth/filters/filter_system.go

+12-16
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ const (
124124
PendingLogsSubscription
125125
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
126126
MinedAndPendingLogsSubscription
127-
// PendingTransactionsSubscription queries tx hashes for pending
128-
// transactions entering the pending state
127+
// PendingTransactionsSubscription queries for pending transactions entering
128+
// the pending state
129129
PendingTransactionsSubscription
130130
// BlocksSubscription queries hashes for blocks that are imported
131131
BlocksSubscription
@@ -151,7 +151,7 @@ type subscription struct {
151151
created time.Time
152152
logsCrit ethereum.FilterQuery
153153
logs chan []*types.Log
154-
hashes chan []common.Hash
154+
txs chan []*types.Transaction
155155
headers chan *types.Header
156156
installed chan struct{} // closed when the filter is installed
157157
err chan error // closed when the filter is uninstalled
@@ -244,7 +244,7 @@ func (sub *Subscription) Unsubscribe() {
244244
case sub.es.uninstall <- sub.f:
245245
break uninstallLoop
246246
case <-sub.f.logs:
247-
case <-sub.f.hashes:
247+
case <-sub.f.txs:
248248
case <-sub.f.headers:
249249
}
250250
}
@@ -311,7 +311,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
311311
logsCrit: crit,
312312
created: time.Now(),
313313
logs: logs,
314-
hashes: make(chan []common.Hash),
314+
txs: make(chan []*types.Transaction),
315315
headers: make(chan *types.Header),
316316
installed: make(chan struct{}),
317317
err: make(chan error),
@@ -328,7 +328,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
328328
logsCrit: crit,
329329
created: time.Now(),
330330
logs: logs,
331-
hashes: make(chan []common.Hash),
331+
txs: make(chan []*types.Transaction),
332332
headers: make(chan *types.Header),
333333
installed: make(chan struct{}),
334334
err: make(chan error),
@@ -345,7 +345,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
345345
logsCrit: crit,
346346
created: time.Now(),
347347
logs: logs,
348-
hashes: make(chan []common.Hash),
348+
txs: make(chan []*types.Transaction),
349349
headers: make(chan *types.Header),
350350
installed: make(chan struct{}),
351351
err: make(chan error),
@@ -361,23 +361,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
361361
typ: BlocksSubscription,
362362
created: time.Now(),
363363
logs: make(chan []*types.Log),
364-
hashes: make(chan []common.Hash),
364+
txs: make(chan []*types.Transaction),
365365
headers: headers,
366366
installed: make(chan struct{}),
367367
err: make(chan error),
368368
}
369369
return es.subscribe(sub)
370370
}
371371

372-
// SubscribePendingTxs creates a subscription that writes transaction hashes for
372+
// SubscribePendingTxs creates a subscription that writes transactions for
373373
// transactions that enter the transaction pool.
374-
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
374+
func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription {
375375
sub := &subscription{
376376
id: rpc.NewID(),
377377
typ: PendingTransactionsSubscription,
378378
created: time.Now(),
379379
logs: make(chan []*types.Log),
380-
hashes: hashes,
380+
txs: txs,
381381
headers: make(chan *types.Header),
382382
installed: make(chan struct{}),
383383
err: make(chan error),
@@ -421,12 +421,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog
421421
}
422422

423423
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
424-
hashes := make([]common.Hash, 0, len(ev.Txs))
425-
for _, tx := range ev.Txs {
426-
hashes = append(hashes, tx.Hash())
427-
}
428424
for _, f := range filters[PendingTransactionsSubscription] {
429-
f.hashes <- hashes
425+
f.txs <- ev.Txs
430426
}
431427
}
432428

eth/filters/filter_system_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func TestPendingTxFilter(t *testing.T) {
240240
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
241241
}
242242

243-
hashes []common.Hash
243+
txs []*types.Transaction
244244
)
245245

246246
fid0 := api.NewPendingTransactionFilter()
@@ -255,9 +255,9 @@ func TestPendingTxFilter(t *testing.T) {
255255
t.Fatalf("Unable to retrieve logs: %v", err)
256256
}
257257

258-
h := results.([]common.Hash)
259-
hashes = append(hashes, h...)
260-
if len(hashes) >= len(transactions) {
258+
tx := results.([]*types.Transaction)
259+
txs = append(txs, tx...)
260+
if len(txs) >= len(transactions) {
261261
break
262262
}
263263
// check timeout
@@ -268,13 +268,13 @@ func TestPendingTxFilter(t *testing.T) {
268268
time.Sleep(100 * time.Millisecond)
269269
}
270270

271-
if len(hashes) != len(transactions) {
272-
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
271+
if len(txs) != len(transactions) {
272+
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs))
273273
return
274274
}
275-
for i := range hashes {
276-
if hashes[i] != transactions[i].Hash() {
277-
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
275+
for i := range txs {
276+
if txs[i].Hash() != transactions[i].Hash() {
277+
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
278278
}
279279
}
280280
}
@@ -705,11 +705,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
705705
fids[i] = fid
706706
// Wait for at least one tx to arrive in filter
707707
for {
708-
hashes, err := api.GetFilterChanges(fid)
708+
txs, err := api.GetFilterChanges(fid)
709709
if err != nil {
710710
t.Fatalf("Filter should exist: %v\n", err)
711711
}
712-
if len(hashes.([]common.Hash)) > 0 {
712+
if len(txs.([]*types.Transaction)) > 0 {
713713
break
714714
}
715715
runtime.Gosched()

ethclient/gethclient/gethclient.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) {
166166
return &result, err
167167
}
168168

169-
// SubscribePendingTransactions subscribes to new pending transactions.
169+
// SubscribeFullPendingTransactions subscribes to new pending transactions.
170+
func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) {
171+
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true)
172+
}
173+
174+
// SubscribePendingTransactions subscribes to new pending transaction hashes.
170175
func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) {
171176
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions")
172177
}

ethclient/gethclient/gethclient_test.go

+38-1
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,11 @@ func TestGethClient(t *testing.T) {
122122
"TestSetHead",
123123
func(t *testing.T) { testSetHead(t, client) },
124124
}, {
125-
"TestSubscribePendingTxs",
125+
"TestSubscribePendingTxHashes",
126126
func(t *testing.T) { testSubscribePendingTransactions(t, client) },
127+
}, {
128+
"TestSubscribePendingTxs",
129+
func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) },
127130
}, {
128131
"TestCallContract",
129132
func(t *testing.T) { testCallContract(t, client) },
@@ -303,6 +306,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) {
303306
}
304307
}
305308

309+
func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) {
310+
ec := New(client)
311+
ethcl := ethclient.NewClient(client)
312+
// Subscribe to Transactions
313+
ch := make(chan *types.Transaction)
314+
ec.SubscribeFullPendingTransactions(context.Background(), ch)
315+
// Send a transaction
316+
chainID, err := ethcl.ChainID(context.Background())
317+
if err != nil {
318+
t.Fatal(err)
319+
}
320+
// Create transaction
321+
tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil)
322+
signer := types.LatestSignerForChainID(chainID)
323+
signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey)
324+
if err != nil {
325+
t.Fatal(err)
326+
}
327+
signedTx, err := tx.WithSignature(signer, signature)
328+
if err != nil {
329+
t.Fatal(err)
330+
}
331+
// Send transaction
332+
err = ethcl.SendTransaction(context.Background(), signedTx)
333+
if err != nil {
334+
t.Fatal(err)
335+
}
336+
// Check that the transaction was send over the channel
337+
tx = <-ch
338+
if tx.Hash() != signedTx.Hash() {
339+
t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash())
340+
}
341+
}
342+
306343
func testCallContract(t *testing.T, client *rpc.Client) {
307344
ec := New(client)
308345
msg := ethereum.CallMsg{

0 commit comments

Comments
 (0)