Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log chain shutdown duration #1545

Merged
merged 3 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ type Handler interface {
Start(ctx context.Context, recoverPanic bool)
Push(ctx context.Context, msg Message)
Len() int

Stop(ctx context.Context)
StopWithError(ctx context.Context, err error)
Stopped() chan struct{}
// AwaitStopped returns an error if the call would block and [ctx] is done.
// Even if [ctx] is done when passed into this function, this function will
// return a nil error if it will not block.
AwaitStopped(ctx context.Context) (time.Duration, error)
}

// handler passes incoming messages from the network to the consensus engine.
Expand Down Expand Up @@ -104,6 +108,8 @@ type handler struct {
timeouts chan struct{}

closeOnce sync.Once
startClosingTime time.Time
totalClosingTime time.Duration
closingChan chan struct{}
numDispatchersClosed int
// Closed when this handler and [engine] are done shutting down
Expand Down Expand Up @@ -305,6 +311,8 @@ func (h *handler) RegisterTimeout(d time.Duration) {
// Note: It is possible for Stop to be called before/concurrently with Start.
func (h *handler) Stop(ctx context.Context) {
h.closeOnce.Do(func() {
h.startClosingTime = h.clock.Time()

// Must hold the locks here to ensure there's no race condition in where
// we check the value of [h.closing] after the call to [Signal].
h.syncMessageQueue.Shutdown()
Expand Down Expand Up @@ -340,8 +348,19 @@ func (h *handler) StopWithError(ctx context.Context, err error) {
h.Stop(ctx)
}

func (h *handler) Stopped() chan struct{} {
return h.closed
func (h *handler) AwaitStopped(ctx context.Context) (time.Duration, error) {
select {
case <-h.closed:
return h.totalClosingTime, nil
default:
}

select {
case <-ctx.Done():
return 0, ctx.Err()
case <-h.closed:
return h.totalClosingTime, nil
}
}

func (h *handler) dispatchSync(ctx context.Context) {
Expand Down Expand Up @@ -977,9 +996,17 @@ func (h *handler) shutdown(ctx context.Context) {
if h.onStopped != nil {
go h.onStopped()
}

h.totalClosingTime = h.clock.Time().Sub(h.startClosingTime)
close(h.closed)
}()

// shutdown may be called during Start, so we populate the start closing
// time here in case Stop was never called.
if h.startClosingTime.IsZero() {
h.startClosingTime = h.clock.Time()
}

state := h.ctx.State.Get()
engine, ok := h.engineManager.Get(state.Type).Get(state.State)
if !ok {
Expand Down
29 changes: 15 additions & 14 deletions snow/networking/handler/mock_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 26 additions & 13 deletions snow/networking/router/chain_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,21 @@ func (cr *ChainRouter) Shutdown(ctx context.Context) {
chain.Stop(ctx)
}

ticker := time.NewTicker(cr.closeTimeout)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(ctx, cr.closeTimeout)
defer cancel()

for _, chain := range prevChains {
select {
case <-chain.Stopped():
case <-ticker.C:
cr.log.Warn("timed out while shutting down the chains")
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the return isn't needed here anymore because AwaitStopped will correctly report if the shutdown timed out.

shutdownDuration, err := chain.AwaitStopped(ctx)

chainLog := chain.Context().Log
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is the same as cr.log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the same as cr.log (cr.log is the node global logger, whereas chain.Context().Log returns the chain specific logger)

if err != nil {
chainLog.Warn("timed out while shutting down",
zap.Error(err),
)
} else {
chainLog.Info("chain shutdown",
zap.Duration("shutdownTime", shutdownDuration),
)
}
}
}
Expand Down Expand Up @@ -646,12 +652,19 @@ func (cr *ChainRouter) removeChain(ctx context.Context, chainID ids.ID) {

chain.Stop(ctx)

ticker := time.NewTicker(cr.closeTimeout)
defer ticker.Stop()
select {
case <-chain.Stopped():
case <-ticker.C:
chain.Context().Log.Warn("timed out while shutting down")
ctx, cancel := context.WithTimeout(ctx, cr.closeTimeout)
shutdownDuration, err := chain.AwaitStopped(ctx)
cancel()

chainLog := chain.Context().Log
if err != nil {
chainLog.Warn("timed out while shutting down",
zap.Error(err),
)
} else {
chainLog.Info("chain shutdown",
zap.Duration("shutdownTime", shutdownDuration),
)
}

if cr.onFatal != nil && cr.criticalChains.Contains(chainID) {
Expand Down
43 changes: 24 additions & 19 deletions snow/networking/router/chain_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ const (
)

func TestShutdown(t *testing.T) {
require := require.New(t)

vdrs := validators.NewSet()
err := vdrs.Add(ids.GenerateTestNodeID(), nil, ids.Empty, 1)
require.NoError(t, err)
require.NoError(err)

benchlist := benchlist.NewNoBenchlist()
tm, err := timeout.NewManager(
&timer.AdaptiveTimeoutConfig{
Expand All @@ -58,7 +61,7 @@ func TestShutdown(t *testing.T) {
"",
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(err)
go tm.Dispatch()

chainRouter := ChainRouter{}
Expand All @@ -75,29 +78,30 @@ func TestShutdown(t *testing.T) {
"",
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(err)

shutdownCalled := make(chan struct{}, 1)

ctx := snow.DefaultConsensusContextTest()
chainCtx := snow.DefaultConsensusContextTest()
resourceTracker, err := tracker.NewResourceTracker(
prometheus.NewRegistry(),
resource.NoUsage,
meter.ContinuousFactory{},
time.Second,
)
require.NoError(t, err)
require.NoError(err)

h, err := handler.New(
ctx,
chainCtx,
vdrs,
nil,
time.Second,
testThreadPoolSize,
resourceTracker,
validators.UnhandledSubnetConnector,
subnets.New(ctx.NodeID, subnets.Config{}),
subnets.New(chainCtx.NodeID, subnets.Config{}),
)
require.NoError(t, err)
require.NoError(err)

bootstrapper := &common.BootstrapperTest{
BootstrapableTest: common.BootstrapableTest{
Expand All @@ -110,7 +114,7 @@ func TestShutdown(t *testing.T) {
bootstrapper.Default(true)
bootstrapper.CantGossip = false
bootstrapper.ContextF = func() *snow.ConsensusContext {
return ctx
return chainCtx
}
bootstrapper.ShutdownF = func(context.Context) error {
shutdownCalled <- struct{}{}
Expand All @@ -125,7 +129,7 @@ func TestShutdown(t *testing.T) {
engine.Default(true)
engine.CantGossip = false
engine.ContextF = func() *snow.ConsensusContext {
return ctx
return chainCtx
}
engine.ShutdownF = func(context.Context) error {
shutdownCalled <- struct{}{}
Expand All @@ -147,7 +151,7 @@ func TestShutdown(t *testing.T) {
Consensus: engine,
},
})
ctx.State.Set(snow.EngineState{
chainCtx.State.Set(snow.EngineState{
Type: engineType,
State: snow.NormalOp, // assumed bootstrapping is done
})
Expand All @@ -161,18 +165,19 @@ func TestShutdown(t *testing.T) {

chainRouter.Shutdown(context.Background())

ticker := time.NewTicker(250 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()

select {
case <-ticker.C:
t.Fatalf("Handler shutdown was not called or timed out after 250ms during chainRouter shutdown")
case <-ctx.Done():
require.FailNow("Handler shutdown was not called or timed out after 250ms during chainRouter shutdown")
case <-shutdownCalled:
}

select {
case <-h.Stopped():
default:
t.Fatal("handler shutdown but never closed its closing channel")
}
shutdownDuration, err := h.AwaitStopped(ctx)
require.NoError(err)
require.GreaterOrEqual(shutdownDuration, time.Duration(0))
require.Less(shutdownDuration, 250*time.Millisecond)
}

func TestShutdownTimesOut(t *testing.T) {
Expand Down