Skip to content

Commit

Permalink
mark processor down for some time not updating resolve ts (pingcap#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 authored Dec 12, 2019
1 parent abf94c6 commit 7f27685
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 21 deletions.
95 changes: 82 additions & 13 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import (
"github.com/pingcap/log"
pmodel "github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/roles"
"github.com/pingcap/ticdc/cdc/roles/storage"
"github.com/pingcap/ticdc/cdc/schema"
"go.uber.org/zap"
)

var markProcessorDownTime = 1 * time.Minute

// OwnerDDLHandler defines the ddl handler for Owner
// which can pull ddl jobs and execute ddl jobs
type OwnerDDLHandler interface {
Expand All @@ -56,10 +59,11 @@ type changeFeedInfo struct {
detail *model.ChangeFeedDetail
*model.ChangeFeedInfo

schema *schema.Storage
Status model.ChangeFeedStatus
TargetTs uint64
ProcessorInfos model.ProcessorsInfos
schema *schema.Storage
Status model.ChangeFeedStatus
TargetTs uint64
ProcessorInfos model.ProcessorsInfos
processorLastUpdateTime map[string]time.Time

client *clientv3.Client
DDLCurrentIndex int
Expand Down Expand Up @@ -296,7 +300,8 @@ func (c *changeFeedInfo) applyJob(job *pmodel.Job) error {
}

type ownerImpl struct {
changeFeedInfos map[model.ChangeFeedID]*changeFeedInfo
changeFeedInfos map[model.ChangeFeedID]*changeFeedInfo
markDownProcessor map[string]struct{}

cfRWriter ChangeFeedInfoRWriter

Expand Down Expand Up @@ -335,6 +340,7 @@ func NewOwner(pdEndpoints []string, cli *clientv3.Client, manager roles.Manager)
owner := &ownerImpl{
pdEndpoints: pdEndpoints,
pdClient: pdClient,
markDownProcessor: make(map[string]struct{}),
changeFeedInfos: make(map[model.ChangeFeedID]*changeFeedInfo),
cfRWriter: storage.NewChangeFeedInfoEtcdRWriter(cli),
etcdClient: cli,
Expand All @@ -348,11 +354,52 @@ func NewOwner(pdEndpoints []string, cli *clientv3.Client, manager roles.Manager)
}

func (o *ownerImpl) addCapture(info *model.CaptureInfo) {
o.l.Lock()
o.captures[info.ID] = info
o.l.Unlock()
}

func (o *ownerImpl) handleMarkdownProcessor(ctx context.Context) {
var deleted []string
for id := range o.markDownProcessor {
err := DeleteCaptureInfo(ctx, id, o.etcdClient)
if err != nil {
log.Warn("failed to delete key", zap.Error(err))
continue
}

deleted = append(deleted, id)
}

for _, id := range deleted {
delete(o.markDownProcessor, id)
}
}

func (o *ownerImpl) removeCapture(info *model.CaptureInfo) {
o.l.Lock()
defer o.l.Unlock()

delete(o.captures, info.ID)

for _, feed := range o.changeFeedInfos {
pinfo, ok := feed.ProcessorInfos[info.ID]
if !ok {
continue
}

for _, table := range pinfo.TableInfos {
feed.orphanTables[table.ID] = model.ProcessTableInfo{
ID: table.ID,
StartTs: pinfo.CheckPointTs,
}
}

key := kv.GetEtcdKeySubChangeFeed(feed.ID, info.ID)
if _, err := o.etcdClient.Delete(context.Background(), key); err != nil {
log.Warn("failed to delete key", zap.Error(err))
}
}
}

func (o *ownerImpl) handleWatchCapture() error {
Expand Down Expand Up @@ -383,7 +430,26 @@ func (o *ownerImpl) loadChangeFeedInfos(ctx context.Context) error {
var exist bool

if cfInfo, exist = o.changeFeedInfos[changeFeedID]; exist {
for cid, pinfo := range etcdChangeFeedInfo {
if _, ok := cfInfo.processorLastUpdateTime[cid]; !ok {
cfInfo.processorLastUpdateTime[cid] = time.Now()
continue
}

oldPinfo, ok := cfInfo.ProcessorInfos[cid]
if !ok || oldPinfo.ResolvedTs != pinfo.ResolvedTs || oldPinfo.CheckPointTs != pinfo.CheckPointTs {
cfInfo.processorLastUpdateTime[cid] = time.Now()
}
}

cfInfo.ProcessorInfos = etcdChangeFeedInfo

for id := range cfInfo.ProcessorInfos {
lastUpdateTime := cfInfo.processorLastUpdateTime[id]
if time.Since(lastUpdateTime) > markProcessorDownTime {
o.markDownProcessor[id] = struct{}{}
}
}
continue
}

Expand Down Expand Up @@ -431,14 +497,15 @@ func (o *ownerImpl) loadChangeFeedInfos(ctx context.Context) error {
}

o.changeFeedInfos[changeFeedID] = &changeFeedInfo{
detail: detail,
ID: changeFeedID,
client: o.etcdClient,
ddlHandler: ddlHandler,
schema: schemaStorage,
tables: tables,
orphanTables: orphanTables,
toCleanTables: make(map[uint64]struct{}),
detail: detail,
ID: changeFeedID,
client: o.etcdClient,
ddlHandler: ddlHandler,
schema: schemaStorage,
tables: tables,
orphanTables: orphanTables,
toCleanTables: make(map[uint64]struct{}),
processorLastUpdateTime: make(map[string]time.Time),
ChangeFeedInfo: &model.ChangeFeedInfo{
SinkURI: changefeed.SinkURI,
ResolvedTs: 0,
Expand Down Expand Up @@ -644,6 +711,8 @@ func (o *ownerImpl) run(ctx context.Context) error {
o.l.Lock()
defer o.l.Unlock()

o.handleMarkdownProcessor(ctx)

err := o.loadChangeFeedInfos(cctx)
if err != nil {
return errors.Trace(err)
Expand Down
18 changes: 10 additions & 8 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ func (s *ownerSuite) TestPureDML(c *check.C) {

changeFeedInfos := map[model.ChangeFeedID]*changeFeedInfo{
"test_change_feed": {
tables: tables,
ChangeFeedInfo: &model.ChangeFeedInfo{},
TargetTs: 100,
Status: model.ChangeFeedSyncDML,
tables: tables,
ChangeFeedInfo: &model.ChangeFeedInfo{},
processorLastUpdateTime: make(map[string]time.Time),
TargetTs: 100,
Status: model.ChangeFeedSyncDML,
ProcessorInfos: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
Expand Down Expand Up @@ -310,10 +311,11 @@ func (s *ownerSuite) TestDDL(c *check.C) {

changeFeedInfos := map[model.ChangeFeedID]*changeFeedInfo{
"test_change_feed": {
tables: tables,
ChangeFeedInfo: &model.ChangeFeedInfo{},
TargetTs: 100,
Status: model.ChangeFeedSyncDML,
tables: tables,
ChangeFeedInfo: &model.ChangeFeedInfo{},
processorLastUpdateTime: make(map[string]time.Time),
TargetTs: 100,
Status: model.ChangeFeedSyncDML,
ProcessorInfos: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
Expand Down

0 comments on commit 7f27685

Please sign in to comment.