-
Notifications
You must be signed in to change notification settings - Fork 20.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
eth/filter: check nil pointer when unsubscribe #16682
Changes from 3 commits
cf133fe
96a79b9
f11ac1e
dec5843
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ import ( | |
"github.com/ethereum/go-ethereum/core" | ||
"github.com/ethereum/go-ethereum/core/types" | ||
"github.com/ethereum/go-ethereum/event" | ||
"github.com/ethereum/go-ethereum/log" | ||
"github.com/ethereum/go-ethereum/rpc" | ||
) | ||
|
||
|
@@ -91,8 +92,21 @@ type EventSystem struct { | |
backend Backend | ||
lightMode bool | ||
lastHead *types.Header | ||
install chan *subscription // install filter for event notification | ||
uninstall chan *subscription // remove filter for event notification | ||
|
||
// Subscriptions | ||
txSub event.Subscription // Subscription for new transaction event | ||
logsSub event.Subscription // Subscription for new log event | ||
rmLogsSub event.Subscription // Subscription for removed log event | ||
chainSub event.Subscription // Subscription for new chain event | ||
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event | ||
|
||
// Channels | ||
install chan *subscription // install filter for event notification | ||
uninstall chan *subscription // remove filter for event notification | ||
txCh chan core.TxPreEvent // Channel to receive new transaction event | ||
logsCh chan []*types.Log // Channel to receive new log event | ||
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event | ||
chainCh chan core.ChainEvent // Channel to receive new chain event | ||
} | ||
|
||
// NewEventSystem creates a new manager that listens for event on the given mux, | ||
|
@@ -108,10 +122,27 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS | |
lightMode: lightMode, | ||
install: make(chan *subscription), | ||
uninstall: make(chan *subscription), | ||
txCh: make(chan core.TxPreEvent, txChanSize), | ||
logsCh: make(chan []*types.Log, logsChanSize), | ||
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), | ||
chainCh: make(chan core.ChainEvent, chainEvChanSize), | ||
} | ||
|
||
go m.eventLoop() | ||
// Subscribe events | ||
m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) | ||
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) | ||
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) | ||
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) | ||
// TODO(rjl493456442): use feed to subscribe pending log event | ||
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) | ||
|
||
// Make sure all the subscriptions are not empty | ||
if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || | ||
m.pendingLogSub.Closed() { | ||
log.Crit("Subscribe for event system failed") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Returning In theory these code paths cannot get called, at least if the initialization code is correct. As such we should either not care about them becoming |
||
|
||
go m.eventLoop() | ||
return m | ||
} | ||
|
||
|
@@ -411,50 +442,36 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. | |
|
||
// eventLoop (un)installs filters and processes mux events. | ||
func (es *EventSystem) eventLoop() { | ||
var ( | ||
index = make(filterIndex) | ||
sub = es.mux.Subscribe(core.PendingLogsEvent{}) | ||
// Subscribe TxPreEvent form txpool | ||
txCh = make(chan core.TxPreEvent, txChanSize) | ||
txSub = es.backend.SubscribeTxPreEvent(txCh) | ||
// Subscribe RemovedLogsEvent | ||
rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) | ||
rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) | ||
// Subscribe []*types.Log | ||
logsCh = make(chan []*types.Log, logsChanSize) | ||
logsSub = es.backend.SubscribeLogsEvent(logsCh) | ||
// Subscribe ChainEvent | ||
chainEvCh = make(chan core.ChainEvent, chainEvChanSize) | ||
chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) | ||
) | ||
|
||
// Unsubscribe all events | ||
defer sub.Unsubscribe() | ||
defer txSub.Unsubscribe() | ||
defer rmLogsSub.Unsubscribe() | ||
defer logsSub.Unsubscribe() | ||
defer chainEvSub.Unsubscribe() | ||
var index = make(filterIndex) | ||
|
||
defer func() { | ||
// Unsubscribe all events | ||
es.pendingLogSub.Unsubscribe() | ||
es.txSub.Unsubscribe() | ||
es.logsSub.Unsubscribe() | ||
es.rmLogsSub.Unsubscribe() | ||
es.chainSub.Unsubscribe() | ||
}() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor suggestions:
func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
es.pendingLogSub.Unsubscribe()
es.txSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()
index := make(filterIndex)
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
} |
||
|
||
for i := UnknownSubscription; i < LastIndexSubscription; i++ { | ||
index[i] = make(map[rpc.ID]*subscription) | ||
} | ||
|
||
for { | ||
select { | ||
case ev, active := <-sub.Chan(): | ||
if !active { // system stopped | ||
return | ||
} | ||
es.broadcast(index, ev) | ||
|
||
// Handle subscribed events | ||
case ev := <-txCh: | ||
case ev := <-es.txCh: | ||
es.broadcast(index, ev) | ||
case ev := <-rmLogsCh: | ||
case ev := <-es.logsCh: | ||
es.broadcast(index, ev) | ||
case ev := <-logsCh: | ||
case ev := <-es.rmLogsCh: | ||
es.broadcast(index, ev) | ||
case ev := <-chainEvCh: | ||
case ev := <-es.chainCh: | ||
es.broadcast(index, ev) | ||
case ev, active := <-es.pendingLogSub.Chan(): | ||
if !active { // system stopped | ||
return | ||
} | ||
es.broadcast(index, ev) | ||
|
||
case f := <-es.install: | ||
|
@@ -466,6 +483,7 @@ func (es *EventSystem) eventLoop() { | |
index[f.typ][f.id] = f | ||
} | ||
close(f.installed) | ||
|
||
case f := <-es.uninstall: | ||
if f.typ == MinedAndPendingLogsSubscription { | ||
// the type are logs and pending logs subscriptions | ||
|
@@ -477,13 +495,13 @@ func (es *EventSystem) eventLoop() { | |
close(f.err) | ||
|
||
// System stopped | ||
case <-txSub.Err(): | ||
case <-es.txSub.Err(): | ||
return | ||
case <-rmLogsSub.Err(): | ||
case <-es.logsSub.Err(): | ||
return | ||
case <-logsSub.Err(): | ||
case <-es.rmLogsSub.Err(): | ||
return | ||
case <-chainEvSub.Err(): | ||
case <-es.chainSub.Err(): | ||
return | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -180,6 +180,10 @@ func (s *TypeMuxSubscription) Unsubscribe() { | |
s.closewait() | ||
} | ||
|
||
func (s *TypeMuxSubscription) Closed() bool { | ||
return s.closed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is racey. You need to add a read lock for |
||
} | ||
|
||
func (s *TypeMuxSubscription) closewait() { | ||
s.closeMu.Lock() | ||
defer s.closeMu.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formulation nitpick,
all the subscriptions are not empty
is not really correct grammatically, please fix tonone of the subscriptions are empty
.