Skip to content

Commit

Permalink
all: use timer instead of time.After in loops, to avoid memleaks (#97)
Browse files Browse the repository at this point in the history
all: use timer instead of time.After in loops, to avoid memleaks (#91)

Co-authored-by: Ryan He <163962984+ryanmorphl2@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and ryanmorphl2 authored Jun 12, 2024
1 parent f68f84a commit 8fba72b
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 10 deletions.
8 changes: 6 additions & 2 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets []
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()

for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
Expand All @@ -604,15 +607,16 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return

case <-time.After(wait):
// Throttling up, fetch whatever's available
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
// Allocate as much as we can handle and request servicing
Expand Down
6 changes: 5 additions & 1 deletion eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
localHeaders = d.readHeaderRange(tail, int(count))
log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
}
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()

for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones
Expand Down Expand Up @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
}
// State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
case <-d.cancelCh:
return errCanceled
}
Expand Down
12 changes: 10 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
rollbackErr error
mode = d.getMode()
timer = time.NewTimer(time.Second)
)
defer func() {
if rollback > 0 {
Expand All @@ -1284,6 +1285,8 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// Wait for batches of headers to process
gotHeaders := false

defer timer.Stop()

for {
select {
case <-d.cancelCh:
Expand Down Expand Up @@ -1439,11 +1442,12 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
Expand Down Expand Up @@ -1599,7 +1603,10 @@ func (d *Downloader) processSnapSyncContent() error {
var (
oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

for {
// Wait for the next batch of downloaded data to be available, and if the pivot
// block became stale, move the goalpost
Expand Down Expand Up @@ -1673,6 +1680,7 @@ func (d *Downloader) processSnapSyncContent() error {
oldPivot = P
}
// Wait for completion, occasionally checking for pivot staleness
timer.Reset(time.Second)
select {
case <-sync.done:
if sync.err != nil {
Expand All @@ -1683,7 +1691,7 @@ func (d *Downloader) processSnapSyncContent() error {
}
oldPivot = nil

case <-time.After(time.Second):
case <-timer.C:
oldTail = afterP
continue
}
Expand Down
5 changes: 4 additions & 1 deletion ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,10 +541,13 @@ func (s *Service) reportLatency(conn *connWrapper) error {
return err
}
// Wait for the pong request to arrive back
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case <-s.pongCh:
// Pong delivered, report the latency
case <-time.After(5 * time.Second):
case <-timer.C:
// Ping timeout, abort
return errors.New("ping timed out")
}
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error {
go func() {
waitErr <- n.Cmd.Wait()
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case err := <-waitErr:
return err
case <-time.After(5 * time.Second):
case <-timer.C:
return n.Cmd.Process.Kill()
}
}
Expand Down
10 changes: 8 additions & 2 deletions p2p/simulations/mocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
if err != nil {
panic("Could not startup node network for mocker")
}
tick := time.NewTicker(10 * time.Second)
var (
tick = time.NewTicker(10 * time.Second)
timer = time.NewTimer(3 * time.Second)
)
defer tick.Stop()
defer timer.Stop()

for {
select {
case <-quit:
Expand All @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
return
}

timer.Reset(3 * time.Second)
select {
case <-quit:
log.Info("Terminating simulation loop")
return
case <-time.After(3 * time.Second):
case <-timer.C:
}

log.Debug("starting node", "id", id)
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,11 +1032,14 @@ func (net *Network) Load(snap *Snapshot) error {
}
}

timeout := time.NewTimer(snapshotLoadTimeout)
defer timeout.Stop()

select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-time.After(snapshotLoadTimeout):
case <-timeout.C:
return errors.New("snapshot connections not established")
}
return nil
Expand Down

0 comments on commit 8fba72b

Please sign in to comment.