Skip to content

Commit

Permalink
Merge pull request #6515 from planetscale/ss-hc1-shutdown
Browse files Browse the repository at this point in the history
vttablet: Open and Close healthStreamer
  • Loading branch information
deepthi authored Aug 3, 2020
2 parents c4054aa + 0fdae88 commit 5c5bce2
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 43 deletions.
44 changes: 40 additions & 4 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"vitess.io/vitess/go/history"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
Expand All @@ -46,6 +48,8 @@ type healthStreamer struct {
unhealthyThreshold time.Duration

mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse

Expand All @@ -72,17 +76,45 @@ func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthS
}

func (hs *healthStreamer) InitDBConfig(target querypb.Target) {
hs.state.Target = &target
// Weird test failures happen if we don't instantiate
// a separate variable.
inner := target
hs.state.Target = &inner
}

func (hs *healthStreamer) Open() {
hs.mu.Lock()
defer hs.mu.Unlock()

if hs.cancel != nil {
return
}
hs.ctx, hs.cancel = context.WithCancel(context.TODO())
}

func (hs *healthStreamer) Close() {
hs.mu.Lock()
defer hs.mu.Unlock()

if hs.cancel != nil {
hs.cancel()
hs.cancel = nil
}
}

func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
ch := hs.register()
ch, hsCtx := hs.register()
if hsCtx == nil {
return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "tabletserver is shutdown")
}
defer hs.unregister(ch)

for {
select {
case <-ctx.Done():
return nil
case <-hsCtx.Done():
return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "tabletserver is shutdown")
case shr := <-ch:
if err := callback(shr); err != nil {
if err == io.EOF {
Expand All @@ -94,16 +126,20 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str
}
}

func (hs *healthStreamer) register() chan *querypb.StreamHealthResponse {
func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) {
hs.mu.Lock()
defer hs.mu.Unlock()

if hs.cancel == nil {
return nil, nil
}

ch := make(chan *querypb.StreamHealthResponse, 1)
hs.clients[ch] = struct{}{}

// Send the current state immediately.
ch <- proto.Clone(hs.state).(*querypb.StreamHealthResponse)
return ch
return ch, hs.ctx
}

func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) {
Expand Down
17 changes: 17 additions & 0 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

func TestHealthStreamerClosed(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "ReplTrackerTest")
alias := topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias)
err := hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error {
return nil
})
assert.Contains(t, err.Error(), "tabletserver is shutdown")
}

func TestHealthStreamerBroadcast(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "ReplTrackerTest")
Expand All @@ -37,6 +52,8 @@ func TestHealthStreamerBroadcast(t *testing.T) {
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias)
hs.Open()
defer hs.Close()
target := querypb.Target{}
hs.InitDBConfig(target)

Expand Down
13 changes: 5 additions & 8 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type stateManager struct {
// Open must be done in forward order.
// Close must be done in reverse order.
// All Close functions must be called before Open.
hs *healthStreamer
se schemaEngine
rt replTracker
vstreamer subComponent
Expand All @@ -105,10 +106,6 @@ type stateManager struct {
te txEngine
messager subComponent

// notify will be invoked by stateManager on every state change.
// The implementation is provided by healthStreamer.ChangeState.
notify func(topodatapb.TabletType, time.Time, time.Duration, error, bool)

// hcticks starts on initialiazation and runs forever.
hcticks *timer.Timer

Expand Down Expand Up @@ -181,7 +178,7 @@ func (sm *stateManager) Init(env tabletenv.Env, target querypb.Target) {
func (sm *stateManager) SetServingType(tabletType topodatapb.TabletType, terTimestamp time.Time, state servingState, reason string) error {
defer sm.ExitLameduck()

// Start is idempotent.
sm.hs.Open()
sm.hcticks.Start(sm.Broadcast)

if tabletType == topodatapb.TabletType_RESTORE || tabletType == topodatapb.TabletType_BACKUP {
Expand Down Expand Up @@ -313,9 +310,9 @@ func (sm *stateManager) StopService() {
defer close(sm.setTimeBomb())

log.Info("Stopping TabletServer")
// Stop replica tracking because StopService is used by all tests.
sm.hcticks.Stop()
sm.SetServingType(sm.Target().TabletType, time.Time{}, StateNotConnected, "service stopped")
sm.hcticks.Stop()
sm.hs.Close()
}

// StartRequest validates the current state and target and registers
Expand Down Expand Up @@ -569,7 +566,7 @@ func (sm *stateManager) Broadcast() {
defer sm.mu.Unlock()

lag, err := sm.refreshReplHealthLocked()
sm.notify(sm.target.TabletType, sm.terTimestamp, lag, err, sm.isServingLocked())
sm.hs.ChangeState(sm.target.TabletType, sm.terTimestamp, lag, err, sm.isServingLocked())
}

func (sm *stateManager) refreshReplHealthLocked() (time.Duration, error) {
Expand Down
83 changes: 54 additions & 29 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -60,6 +62,7 @@ func TestStateManagerStateByName(t *testing.T) {

func TestStateManagerServeMaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
sm.EnterLameduck()
err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "")
require.NoError(t, err)
Expand Down Expand Up @@ -88,6 +91,7 @@ func TestStateManagerServeMaster(t *testing.T) {

func TestStateManagerServeNonMaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

Expand All @@ -109,6 +113,7 @@ func TestStateManagerServeNonMaster(t *testing.T) {

func TestStateManagerUnserveMaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateNotServing, "")
require.NoError(t, err)

Expand All @@ -132,6 +137,7 @@ func TestStateManagerUnserveMaster(t *testing.T) {

func TestStateManagerUnserveNonmaster(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotServing, "")
require.NoError(t, err)

Expand All @@ -156,6 +162,7 @@ func TestStateManagerUnserveNonmaster(t *testing.T) {

func TestStateManagerClose(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_RDONLY, testNow, StateNotConnected, "")
require.NoError(t, err)

Expand All @@ -177,6 +184,7 @@ func TestStateManagerClose(t *testing.T) {

func TestStateManagerStopService(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

Expand All @@ -190,6 +198,7 @@ func TestStateManagerStopService(t *testing.T) {

func TestStateManagerGracePeriod(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
sm.transitionGracePeriod = 10 * time.Millisecond

alsoAllow := func() topodatapb.TabletType {
Expand Down Expand Up @@ -240,6 +249,8 @@ func (te *testWatcher) Close() {
}

func TestStateManagerSetServingTypeRace(t *testing.T) {
// We don't call StopService because that in turn
// will call Close again on testWatcher.
sm := newTestStateManager(t)
te := &testWatcher{
t: t,
Expand All @@ -258,7 +269,9 @@ func TestStateManagerSetServingTypeRace(t *testing.T) {
}

func TestStateManagerSetServingTypeNoChange(t *testing.T) {
log.Infof("starting")
sm := newTestStateManager(t)
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

Expand Down Expand Up @@ -286,6 +299,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) {
transitionRetryInterval = 10 * time.Millisecond

sm := newTestStateManager(t)
defer sm.StopService()
sm.se.(*testSchemaEngine).failMySQL = true

err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "")
Expand Down Expand Up @@ -317,6 +331,7 @@ func TestStateManagerTransitionFailRetry(t *testing.T) {

func TestStateManagerNotConnectedType(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
sm.EnterLameduck()
err := sm.SetServingType(topodatapb.TabletType_RESTORE, testNow, StateNotServing, "")
require.NoError(t, err)
Expand All @@ -336,6 +351,7 @@ func TestStateManagerCheckMySQL(t *testing.T) {
transitionRetryInterval = 10 * time.Millisecond

sm := newTestStateManager(t)
defer sm.StopService()

err := sm.SetServingType(topodatapb.TabletType_MASTER, testNow, StateServing, "")
require.NoError(t, err)
Expand Down Expand Up @@ -442,6 +458,7 @@ func TestStateManagerValidations(t *testing.T) {

func TestStateManagerWaitForRequests(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
target := &querypb.Target{TabletType: topodatapb.TabletType_MASTER}
sm.target = *target
sm.timebombDuration = 10 * time.Second
Expand Down Expand Up @@ -481,40 +498,46 @@ func TestStateManagerWaitForRequests(t *testing.T) {

func TestStateManagerNotify(t *testing.T) {
sm := newTestStateManager(t)
var (
gotType topodatapb.TabletType
gotts time.Time
gotlag time.Duration
goterr error
gotServing bool
)

ch := make(chan struct{})
sm.notify = func(tabletType topodatapb.TabletType, terTimestamp time.Time, lag time.Duration, err error, serving bool) {
gotType = tabletType
gotts = terTimestamp
gotlag = lag
goterr = err
gotServing = serving
ch <- struct{}{}
}
defer sm.StopService()

blpFunc = testBlpFunc

err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)

<-ch
ch := make(chan *querypb.StreamHealthResponse, 5)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error {
ch <- shr
return nil
})
assert.Contains(t, err.Error(), "tabletserver is shutdown")
}()
defer wg.Wait()

sm.Broadcast()

gotshr := <-ch
// Remove things we don't care about:
gotshr.RealtimeStats = nil
wantshr := &querypb.StreamHealthResponse{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
Serving: true,
TabletAlias: &topodatapb.TabletAlias{},
}
sm.hcticks.Stop()
assert.Equal(t, topodatapb.TabletType_REPLICA, gotType)
assert.Equal(t, testNow, gotts)
assert.Equal(t, 1*time.Second, gotlag)
assert.Equal(t, nil, goterr)
assert.True(t, gotServing)
assert.Equal(t, wantshr, gotshr)
sm.StopService()
}

func TestRefreshReplHealthLocked(t *testing.T) {
sm := newTestStateManager(t)
defer sm.StopService()
rt := sm.rt.(*testReplTracker)

sm.target.TabletType = topodatapb.TabletType_MASTER
Expand Down Expand Up @@ -555,7 +578,10 @@ func verifySubcomponent(t *testing.T, order int64, component interface{}, state

func newTestStateManager(t *testing.T) *stateManager {
order.Set(0)
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "StateManagerTest")
sm := &stateManager{
hs: newHealthStreamer(env, topodatapb.TabletAlias{}),
se: &testSchemaEngine{},
rt: &testReplTracker{lag: 1 * time.Second},
vstreamer: &testSubcomponent{},
Expand All @@ -565,11 +591,10 @@ func newTestStateManager(t *testing.T) *stateManager {
txThrottler: &testTxThrottler{},
te: &testTxEngine{},
messager: &testSubcomponent{},
notify: func(topodatapb.TabletType, time.Time, time.Duration, error, bool) {},
}
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, "StateManagerTest")
sm.Init(env, querypb.Target{})
sm.hs.InitDBConfig(querypb.Target{})
log.Infof("returning sm: %p", sm)
return sm
}

Expand Down
Loading

0 comments on commit 5c5bce2

Please sign in to comment.