Skip to content

Commit

Permalink
owner(ticdc): fix query healthy 500 since changefeed paused. (#7041)
Browse files Browse the repository at this point in the history
ref #4757, close #7043
  • Loading branch information
3AceShowHand authored Sep 12, 2022
1 parent 52a78bd commit a8f91d0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 17 deletions.
38 changes: 26 additions & 12 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,33 +610,47 @@ func (o *ownerImpl) handleQueries(query *Query) error {
}
query.Data = ret
case QueryHealth:
isHealthy, err := o.isHealthy()
if err != nil {
return errors.Trace(err)
}
query.Data = isHealthy
query.Data = o.isHealthy()
}
return nil
}

func (o *ownerImpl) isHealthy() (bool, error) {
func (o *ownerImpl) isHealthy() bool {
if !o.changefeedTicked {
// Owner has not yet tick changefeeds, some changefeeds may be not
// initialized.
return false, nil
log.Warn("owner is not healthy since changefeeds are not ticked")
return false
}
if !o.clusterVersionConsistent(o.captures) {
return false, nil
return false
}
for _, cfReactor := range o.changefeeds {
provider := cfReactor.GetInfoProvider()
for _, changefeed := range o.changefeeds {
if changefeed.state == nil {
log.Warn("isHealthy: changefeed state is nil",
zap.String("namespace", changefeed.id.Namespace),
zap.String("changefeed", changefeed.id.ID))
continue
}
if changefeed.state.Info.State != model.StateNormal {
log.Warn("isHealthy: changefeed not normal",
zap.String("namespace", changefeed.id.Namespace),
zap.String("changefeed", changefeed.id.ID),
zap.Any("state", changefeed.state.Info.State))
continue
}

provider := changefeed.GetInfoProvider()
if provider == nil || !provider.IsInitialized() {
// The scheduler has not been initialized yet, it is considered
// unhealthy, because owner can not schedule tables for now.
return false, nil
log.Warn("isHealthy: changefeed is not initialized",
zap.String("namespace", changefeed.id.Namespace),
zap.String("changefeed", changefeed.id.ID))
return false
}
}
return true, nil
return true
}

func (o *ownerImpl) takeOwnerJobs() []*ownerJob {
Expand Down
60 changes: 55 additions & 5 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,16 +773,60 @@ func TestHandleDrainCapturesSchedulerNotReady(t *testing.T) {
require.Nil(t, <-done)
}

type heathScheduler struct {
type healthScheduler struct {
scheduler.Scheduler
scheduler.InfoProvider
init bool
}

func (h *heathScheduler) IsInitialized() bool {
func (h *healthScheduler) IsInitialized() bool {
return h.init
}

func TestIsHealthyWithAbnormalChangefeeds(t *testing.T) {
t.Parallel()

// There is at least one changefeed not in the normal state, the whole cluster should
// still be healthy, since abnormal changefeeds does not affect other normal changefeeds.
o := &ownerImpl{
changefeeds: make(map[model.ChangeFeedID]*changefeed),
changefeedTicked: true,
}

query := &Query{Tp: QueryHealth}

// no changefeed at the first, should be healthy
err := o.handleQueries(query)
require.NoError(t, err)
require.True(t, query.Data.(bool))

// 1 changefeed, state is nil
cf := &changefeed{}
o.changefeeds[model.ChangeFeedID{ID: "1"}] = cf
err = o.handleQueries(query)
require.NoError(t, err)
require.True(t, query.Data.(bool))

// state is not normal
cf.state = &orchestrator.ChangefeedReactorState{
Info: &model.ChangeFeedInfo{State: model.StateStopped},
}
err = o.handleQueries(query)
require.NoError(t, err)
require.True(t, query.Data.(bool))

// 2 changefeeds, another is normal, and scheduler initialized.
o.changefeeds[model.ChangeFeedID{ID: "2"}] = &changefeed{
state: &orchestrator.ChangefeedReactorState{
Info: &model.ChangeFeedInfo{State: model.StateNormal},
},
scheduler: &healthScheduler{init: true},
}
err = o.handleQueries(query)
require.NoError(t, err)
require.True(t, query.Data.(bool))
}

func TestIsHealthy(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -819,8 +863,11 @@ func TestIsHealthy(t *testing.T) {
require.NoError(t, err)
require.True(t, query.Data.(bool))

// Unhealthy, scheduler is not set.
// changefeed in normal, but the scheduler is not set, Unhealthy.
cf := &changefeed{
state: &orchestrator.ChangefeedReactorState{
Info: &model.ChangeFeedInfo{State: model.StateNormal},
},
scheduler: nil, // scheduler is not set.
}
o.changefeeds[model.ChangeFeedID{ID: "1"}] = cf
Expand All @@ -830,7 +877,7 @@ func TestIsHealthy(t *testing.T) {
require.False(t, query.Data.(bool))

// Healthy, scheduler is set and return true.
cf.scheduler = &heathScheduler{init: true}
cf.scheduler = &healthScheduler{init: true}
o.changefeedTicked = true
err = o.handleQueries(query)
require.NoError(t, err)
Expand All @@ -844,7 +891,10 @@ func TestIsHealthy(t *testing.T) {

// Unhealthy, there is another changefeed is not initialized.
o.changefeeds[model.ChangeFeedID{ID: "1"}] = &changefeed{
scheduler: &heathScheduler{init: false},
state: &orchestrator.ChangefeedReactorState{
Info: &model.ChangeFeedInfo{State: model.StateNormal},
},
scheduler: &healthScheduler{init: false},
}
o.changefeedTicked = true
err = o.handleQueries(query)
Expand Down

0 comments on commit a8f91d0

Please sign in to comment.