Skip to content

Commit be6078a

Browse files
Chain-Foxfjl
andauthored
all: fix a bunch of inconsequential goroutine leaks (ethereum#20667)
The leaks were mostly in unit tests, and could all be resolved by adding suitably-sized channel buffers or by restructuring the test to not send on a channel after an error has occurred. There is an unavoidable goroutine leak in Console.Interactive: when we receive a signal, the line reader cannot be unblocked and will get stuck. This leak is now documented and I've tried to make it slightly less bad by adding a one-element buffer to the output channels of the line-reading loop. Should the reader eventually awake from its blocked state (i.e. when stdin is closed), at least it won't get stuck trying to send to the interpreter loop which has quit long ago. Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent 98eab2d commit be6078a

File tree

6 files changed

+123
-106
lines changed

6 files changed

+123
-106
lines changed

common/prque/lazyqueue_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,22 @@ func TestLazyQueue(t *testing.T) {
7474
q.Push(&items[i])
7575
}
7676

77-
var lock sync.Mutex
78-
stopCh := make(chan chan struct{})
77+
var (
78+
lock sync.Mutex
79+
wg sync.WaitGroup
80+
stopCh = make(chan chan struct{})
81+
)
82+
defer wg.Wait()
83+
wg.Add(1)
7984
go func() {
85+
defer wg.Done()
8086
for {
8187
select {
8288
case <-clock.After(testQueueRefresh):
8389
lock.Lock()
8490
q.Refresh()
8591
lock.Unlock()
86-
case stop := <-stopCh:
87-
close(stop)
92+
case <-stopCh:
8893
return
8994
}
9095
}
@@ -104,6 +109,8 @@ func TestLazyQueue(t *testing.T) {
104109
if rand.Intn(100) == 0 {
105110
p := q.PopItem().(*lazyItem)
106111
if p.p != maxPri {
112+
lock.Unlock()
113+
close(stopCh)
107114
t.Fatalf("incorrect item (best known priority %d, popped %d)", maxPri, p.p)
108115
}
109116
q.Push(p)
@@ -113,7 +120,5 @@ func TestLazyQueue(t *testing.T) {
113120
clock.WaitForTimers(1)
114121
}
115122

116-
stop := make(chan struct{})
117-
stopCh <- stop
118-
<-stop
123+
close(stopCh)
119124
}

console/console.go

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -340,62 +340,61 @@ func (c *Console) Evaluate(statement string) {
340340
// the configured user prompter.
341341
func (c *Console) Interactive() {
342342
var (
343-
prompt = c.prompt // Current prompt line (used for multi-line inputs)
344-
indents = 0 // Current number of input indents (used for multi-line inputs)
345-
input = "" // Current user input
346-
scheduler = make(chan string) // Channel to send the next prompt on and receive the input
343+
prompt = c.prompt // the current prompt line (used for multi-line inputs)
344+
indents = 0 // the current number of input indents (used for multi-line inputs)
345+
input = "" // the current user input
346+
inputLine = make(chan string, 1) // receives user input
347+
inputErr = make(chan error, 1) // receives liner errors
348+
requestLine = make(chan string) // requests a line of input
349+
interrupt = make(chan os.Signal, 1)
347350
)
348-
// Start a goroutine to listen for prompt requests and send back inputs
349-
go func() {
350-
for {
351-
// Read the next user input
352-
line, err := c.prompter.PromptInput(<-scheduler)
353-
if err != nil {
354-
// In case of an error, either clear the prompt or fail
355-
if err == liner.ErrPromptAborted { // ctrl-C
356-
prompt, indents, input = c.prompt, 0, ""
357-
scheduler <- ""
358-
continue
359-
}
360-
close(scheduler)
361-
return
362-
}
363-
// User input retrieved, send for interpretation and loop
364-
scheduler <- line
365-
}
366-
}()
367-
// Monitor Ctrl-C too in case the input is empty and we need to bail
368-
abort := make(chan os.Signal, 1)
369-
signal.Notify(abort, syscall.SIGINT, syscall.SIGTERM)
370351

371-
// Start sending prompts to the user and reading back inputs
352+
// Monitor Ctrl-C. While liner does turn on the relevant terminal mode bits to avoid
353+
// the signal, a signal can still be received for unsupported terminals. Unfortunately
354+
// there is no way to cancel the line reader when this happens. The readLines
355+
// goroutine will be leaked in this case.
356+
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
357+
defer signal.Stop(interrupt)
358+
359+
// The line reader runs in a separate goroutine.
360+
go c.readLines(inputLine, inputErr, requestLine)
361+
defer close(requestLine)
362+
372363
for {
373-
// Send the next prompt, triggering an input read and process the result
374-
scheduler <- prompt
364+
// Send the next prompt, triggering an input read.
365+
requestLine <- prompt
366+
375367
select {
376-
case <-abort:
377-
// User forcefully quite the console
368+
case <-interrupt:
378369
fmt.Fprintln(c.printer, "caught interrupt, exiting")
379370
return
380371

381-
case line, ok := <-scheduler:
382-
// User input was returned by the prompter, handle special cases
383-
if !ok || (indents <= 0 && exit.MatchString(line)) {
372+
case err := <-inputErr:
373+
if err == liner.ErrPromptAborted && indents > 0 {
374+
// When prompting for multi-line input, the first Ctrl-C resets
375+
// the multi-line state.
376+
prompt, indents, input = c.prompt, 0, ""
377+
continue
378+
}
379+
return
380+
381+
case line := <-inputLine:
382+
// User input was returned by the prompter, handle special cases.
383+
if indents <= 0 && exit.MatchString(line) {
384384
return
385385
}
386386
if onlyWhitespace.MatchString(line) {
387387
continue
388388
}
389-
// Append the line to the input and check for multi-line interpretation
389+
// Append the line to the input and check for multi-line interpretation.
390390
input += line + "\n"
391-
392391
indents = countIndents(input)
393392
if indents <= 0 {
394393
prompt = c.prompt
395394
} else {
396395
prompt = strings.Repeat(".", indents*3) + " "
397396
}
398-
// If all the needed lines are present, save the command and run
397+
// If all the needed lines are present, save the command and run it.
399398
if indents <= 0 {
400399
if len(input) > 0 && input[0] != ' ' && !passwordRegexp.MatchString(input) {
401400
if command := strings.TrimSpace(input); len(c.history) == 0 || command != c.history[len(c.history)-1] {
@@ -412,6 +411,18 @@ func (c *Console) Interactive() {
412411
}
413412
}
414413

414+
// readLines runs in its own goroutine, prompting for input.
415+
func (c *Console) readLines(input chan<- string, errc chan<- error, prompt <-chan string) {
416+
for p := range prompt {
417+
line, err := c.prompter.PromptInput(p)
418+
if err != nil {
419+
errc <- err
420+
} else {
421+
input <- line
422+
}
423+
}
424+
}
425+
415426
// countIndents returns the number of identations for the given input.
416427
// In case of invalid input such as var a = } the result can be negative.
417428
func countIndents(input string) int {

event/event_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ func BenchmarkPostConcurrent(b *testing.B) {
203203
// for comparison
204204
func BenchmarkChanSend(b *testing.B) {
205205
c := make(chan interface{})
206+
defer close(c)
206207
closed := make(chan struct{})
207208
go func() {
208209
for range c {

miner/worker_test.go

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package miner
1818

1919
import (
20-
"fmt"
2120
"math/big"
2221
"math/rand"
2322
"sync/atomic"
@@ -210,49 +209,37 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
210209
w, b := newTestWorker(t, chainConfig, engine, db, 0)
211210
defer w.close()
212211

212+
// This test chain imports the mined blocks.
213213
db2 := rawdb.NewMemoryDatabase()
214214
b.genesis.MustCommit(db2)
215215
chain, _ := core.NewBlockChain(db2, nil, b.chain.Config(), engine, vm.Config{}, nil)
216216
defer chain.Stop()
217217

218-
var (
219-
loopErr = make(chan error)
220-
newBlock = make(chan struct{})
221-
subscribe = make(chan struct{})
222-
)
223-
listenNewBlock := func() {
224-
sub := w.mux.Subscribe(core.NewMinedBlockEvent{})
225-
defer sub.Unsubscribe()
226-
227-
subscribe <- struct{}{}
228-
for item := range sub.Chan() {
229-
block := item.Data.(core.NewMinedBlockEvent).Block
230-
_, err := chain.InsertChain([]*types.Block{block})
231-
if err != nil {
232-
loopErr <- fmt.Errorf("failed to insert new mined block:%d, error:%v", block.NumberU64(), err)
233-
}
234-
newBlock <- struct{}{}
235-
}
236-
}
237-
// Ignore empty commit here for less noise
218+
// Ignore empty commit here for less noise.
238219
w.skipSealHook = func(task *task) bool {
239220
return len(task.receipts) == 0
240221
}
241-
go listenNewBlock()
242222

243-
<-subscribe // Ensure the subscription is created
244-
w.start() // Start mining!
223+
// Wait for mined blocks.
224+
sub := w.mux.Subscribe(core.NewMinedBlockEvent{})
225+
defer sub.Unsubscribe()
226+
227+
// Start mining!
228+
w.start()
245229

246230
for i := 0; i < 5; i++ {
247231
b.txPool.AddLocal(b.newRandomTx(true))
248232
b.txPool.AddLocal(b.newRandomTx(false))
249233
w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
250234
w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
235+
251236
select {
252-
case e := <-loopErr:
253-
t.Fatal(e)
254-
case <-newBlock:
255-
case <-time.NewTimer(3 * time.Second).C: // Worker needs 1s to include new changes.
237+
case ev := <-sub.Chan():
238+
block := ev.Data.(core.NewMinedBlockEvent).Block
239+
if _, err := chain.InsertChain([]*types.Block{block}); err != nil {
240+
t.Fatalf("failed to insert new mined block %d: %v", block.NumberU64(), err)
241+
}
242+
case <-time.After(3 * time.Second): // Worker needs 1s to include new changes.
256243
t.Fatalf("timeout")
257244
}
258245
}

rpc/client_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,15 +413,14 @@ func TestClientHTTP(t *testing.T) {
413413
// Launch concurrent requests.
414414
var (
415415
results = make([]echoResult, 100)
416-
errc = make(chan error)
416+
errc = make(chan error, len(results))
417417
wantResult = echoResult{"a", 1, new(echoArgs)}
418418
)
419419
defer client.Close()
420420
for i := range results {
421421
i := i
422422
go func() {
423-
errc <- client.Call(&results[i], "test_echo",
424-
wantResult.String, wantResult.Int, wantResult.Args)
423+
errc <- client.Call(&results[i], "test_echo", wantResult.String, wantResult.Int, wantResult.Args)
425424
}()
426425
}
427426

rpc/subscription_test.go

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -125,22 +125,25 @@ func TestSubscriptions(t *testing.T) {
125125

126126
// This test checks that unsubscribing works.
127127
func TestServerUnsubscribe(t *testing.T) {
128+
p1, p2 := net.Pipe()
129+
defer p2.Close()
130+
128131
// Start the server.
129132
server := newTestServer()
130-
service := &notificationTestService{unsubscribed: make(chan string)}
133+
service := &notificationTestService{unsubscribed: make(chan string, 1)}
131134
server.RegisterName("nftest2", service)
132-
p1, p2 := net.Pipe()
133135
go server.ServeCodec(NewCodec(p1), 0)
134136

135-
p2.SetDeadline(time.Now().Add(10 * time.Second))
136-
137137
// Subscribe.
138+
p2.SetDeadline(time.Now().Add(10 * time.Second))
138139
p2.Write([]byte(`{"jsonrpc":"2.0","id":1,"method":"nftest2_subscribe","params":["someSubscription",0,10]}`))
139140

140141
// Handle received messages.
141-
resps := make(chan subConfirmation)
142-
notifications := make(chan subscriptionResult)
143-
errors := make(chan error)
142+
var (
143+
resps = make(chan subConfirmation)
144+
notifications = make(chan subscriptionResult)
145+
errors = make(chan error, 1)
146+
)
144147
go waitForMessages(json.NewDecoder(p2), resps, notifications, errors)
145148

146149
// Receive the subscription ID.
@@ -173,34 +176,45 @@ type subConfirmation struct {
173176
subid ID
174177
}
175178

179+
// waitForMessages reads RPC messages from 'in' and dispatches them into the given channels.
180+
// It stops if there is an error.
176181
func waitForMessages(in *json.Decoder, successes chan subConfirmation, notifications chan subscriptionResult, errors chan error) {
177182
for {
178-
var msg jsonrpcMessage
179-
if err := in.Decode(&msg); err != nil {
180-
errors <- fmt.Errorf("decode error: %v", err)
183+
resp, notification, err := readAndValidateMessage(in)
184+
if err != nil {
185+
errors <- err
181186
return
187+
} else if resp != nil {
188+
successes <- *resp
189+
} else {
190+
notifications <- *notification
182191
}
183-
switch {
184-
case msg.isNotification():
185-
var res subscriptionResult
186-
if err := json.Unmarshal(msg.Params, &res); err != nil {
187-
errors <- fmt.Errorf("invalid subscription result: %v", err)
188-
} else {
189-
notifications <- res
190-
}
191-
case msg.isResponse():
192-
var c subConfirmation
193-
if msg.Error != nil {
194-
errors <- msg.Error
195-
} else if err := json.Unmarshal(msg.Result, &c.subid); err != nil {
196-
errors <- fmt.Errorf("invalid response: %v", err)
197-
} else {
198-
json.Unmarshal(msg.ID, &c.reqid)
199-
successes <- c
200-
}
201-
default:
202-
errors <- fmt.Errorf("unrecognized message: %v", msg)
203-
return
192+
}
193+
}
194+
195+
func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionResult, error) {
196+
var msg jsonrpcMessage
197+
if err := in.Decode(&msg); err != nil {
198+
return nil, nil, fmt.Errorf("decode error: %v", err)
199+
}
200+
switch {
201+
case msg.isNotification():
202+
var res subscriptionResult
203+
if err := json.Unmarshal(msg.Params, &res); err != nil {
204+
return nil, nil, fmt.Errorf("invalid subscription result: %v", err)
205+
}
206+
return nil, &res, nil
207+
case msg.isResponse():
208+
var c subConfirmation
209+
if msg.Error != nil {
210+
return nil, nil, msg.Error
211+
} else if err := json.Unmarshal(msg.Result, &c.subid); err != nil {
212+
return nil, nil, fmt.Errorf("invalid response: %v", err)
213+
} else {
214+
json.Unmarshal(msg.ID, &c.reqid)
215+
return &c, nil, nil
204216
}
217+
default:
218+
return nil, nil, fmt.Errorf("unrecognized message: %v", msg)
205219
}
206220
}

0 commit comments

Comments
 (0)