Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions node/ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type Service struct {

blockReader services.FullBlockReader
txPool txpoolproto.TxpoolClient

ctx context.Context // Cancellable context for graceful shutdown
cancel context.CancelFunc // Cancel function for ctx
}

// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
Expand Down Expand Up @@ -161,6 +164,14 @@ func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, blockRe

// Start implements node.Lifecycle, starting up the monitoring and reporting daemon.
func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())

// Link quitCh with s.ctx: when quitCh closes, cancel s.ctx
go func() {
<-s.quitCh
s.cancel()
}()

go s.loop()

log.Info("Stats daemon started")
Expand All @@ -169,6 +180,9 @@ func (s *Service) Start() error {

// Stop implements node.Lifecycle, terminating the monitoring and reporting daemon.
func (s *Service) Stop() error {
if s.cancel != nil {
s.cancel()
}
log.Info("Stats daemon stopped")
return nil
}
Expand Down Expand Up @@ -388,7 +402,7 @@ func (s *Service) login(conn *connWrapper) error {
}
nodeName := "Erigon"
if len(s.servers) > 0 {
nodeInfo, err := s.servers[0].NodeInfo(context.TODO(), nil)
nodeInfo, err := s.servers[0].NodeInfo(s.ctx, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -516,7 +530,7 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {

// reportBlock retrieves the current chain head and reports it to the stats server.
func (s *Service) reportBlock(conn *connWrapper) error {
roTx, err := s.chaindb.BeginRo(context.Background())
roTx, err := s.chaindb.BeginRo(s.ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -583,7 +597,7 @@ func (s *Service) assembleBlockStats(block *types.Block, td *big.Int) *blockStat
// reportHistory retrieves the most recent batch of blocks and reports it to the
// stats server.
func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
roTx, err := s.chaindb.BeginRo(context.Background())
roTx, err := s.chaindb.BeginRo(s.ctx)
if err != nil {
return err
}
Expand All @@ -597,7 +611,7 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
} else {
// No indexes requested, send back the top ones
headHash := rawdb.ReadHeadBlockHash(roTx)
headNumber, err := s.blockReader.HeaderNumber(context.Background(), roTx, headHash)
headNumber, err := s.blockReader.HeaderNumber(s.ctx, roTx, headHash)
if headNumber == nil || err != nil {
return err
}
Expand All @@ -610,7 +624,7 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
history := make([]*blockStats, len(indexes))
for i, number := range indexes {
// Retrieve the next block if it's known to us
block, err := s.blockReader.BlockByNumber(context.Background(), roTx, number)
block, err := s.blockReader.BlockByNumber(s.ctx, roTx, number)
if err != nil {
return err
}
Expand Down Expand Up @@ -652,7 +666,7 @@ type pendStats struct {
// it to the stats server.
func (s *Service) reportPending(conn *connWrapper) error {
in := new(txpoolproto.StatusRequest)
status, err := s.txPool.Status(context.Background(), in)
status, err := s.txPool.Status(s.ctx, in)
if err != nil {
return err
}
Expand Down Expand Up @@ -684,7 +698,7 @@ type nodeStats struct {
// reportStats retrieves various stats about the node at the networking and
// mining layer and reports it to the stats server.
func (s *Service) reportStats(conn *connWrapper) error {
roTx, err := s.chaindb.BeginRo(context.Background())
roTx, err := s.chaindb.BeginRo(s.ctx)
if err != nil {
return err
}
Expand Down