forked from METADIUM/go-metadium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtx_sender_resolver.go
108 lines (92 loc) · 2.36 KB
/
tx_sender_resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// tx_sender_resolver
package core
import (
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core/types"
)
// job structure is needed because param is necessary due to evaluation order
// uncertainty with closure and channel.
type job struct {
f func(interface{})
param interface{}
}
// SenderResolver resolves sender accounts from transactions concurrently
// with worker threads
type SenderResolver struct {
tx2addr *lru.LruCache
jobs chan *job
busy chan interface{}
}
// NewSenderResolver creates a new sender resolver worker pool
func NewSenderResolver(concurrency, cacheSize int) *SenderResolver {
return &SenderResolver{
tx2addr: lru.NewLruCache(cacheSize, true),
jobs: make(chan *job, concurrency),
busy: make(chan interface{}, concurrency),
}
}
// sender resolver main loop
func (s *SenderResolver) Run() {
for {
j, ok := <-s.jobs
if !ok || j == nil {
break
}
go func() {
s.busy <- struct{}{}
defer func() {
<-s.busy
}()
j.f(j.param)
}()
}
}
// Stop stops sender resolver
func (s *SenderResolver) Stop() {
s.jobs <- nil
}
// Post a new sender resolver task
func (s *SenderResolver) Post(f func(interface{}), p interface{}) {
s.jobs <- &job{f: f, param: p}
}
// ResolveSenders resolves sender accounts from given transactions
// concurrently using SenderResolver worker pool.
func (pool *TxPool) ResolveSenders(signer types.Signer, txs []*types.Transaction) {
s := pool.senderResolver
var by_ecrecover, failed int64
var wg sync.WaitGroup
for _, tx := range txs {
hash := tx.Hash()
if addr := types.GetSender(signer, tx); addr != nil {
s.tx2addr.Put(hash, *addr)
continue
}
data := s.tx2addr.Get(hash)
if data != nil {
types.SetSender(signer, tx, data.(common.Address))
continue
}
wg.Add(1)
atomic.AddInt64(&by_ecrecover, 1)
s.Post(func(param interface{}) {
t := param.(*types.Transaction)
if from, err := types.Sender(signer, t); err == nil {
s.tx2addr.Put(t.Hash(), from)
} else {
atomic.AddInt64(&failed, 1)
}
wg.Done()
}, tx)
}
wg.Wait()
}
// ResolveSender resolves sender address from a transaction
func (pool *TxPool) ResolveSender(signer types.Signer, tx *types.Transaction) {
var txs []*types.Transaction
txs = append(txs, tx)
pool.ResolveSenders(signer, txs)
}
// EOF