Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: update position and status as soon as possible #481

Merged
merged 26 commits into from
May 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ChangeFeedRWriter interface {
GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error)

// GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, error)
GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error)
// PutAllChangeFeedStatus the changefeed info to storage such as etcd.
PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error
}
Expand Down Expand Up @@ -267,10 +267,12 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri
}
switch lockStatus {
case model.TableNoLock:
log.Debug("no c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID))
delete(c.waitingConfirmTables, tableID)
case model.TablePLock:
log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID))
case model.TablePLockCommited:
log.Debug("delete the c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID))
delete(c.waitingConfirmTables, tableID)
}
}
Expand Down Expand Up @@ -395,15 +397,15 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
if len(c.taskStatus) > len(c.taskPositions) {
return nil
}
for cid, pInfo := range c.taskPositions {
if pInfo.CheckPointTs != todoDDLJob.BinlogInfo.FinishedTS {
log.Debug("wait checkpoint ts", zap.String("cid", cid),
zap.Uint64("checkpoint ts", pInfo.CheckPointTs),
zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS),
zap.String("ddl query", todoDDLJob.Query))
return nil
}

if c.status.CheckpointTs != todoDDLJob.BinlogInfo.FinishedTS {
log.Debug("wait checkpoint ts",
zap.Uint64("checkpoint ts", c.status.CheckpointTs),
zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS),
zap.String("ddl query", todoDDLJob.Query))
return nil
}

log.Info("apply job", zap.Stringer("job", todoDDLJob),
zap.String("schema", todoDDLJob.SchemaName),
zap.String("query", todoDDLJob.Query),
Expand Down Expand Up @@ -469,12 +471,12 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C

// calcResolvedTs update every changefeed's resolve ts and checkpoint ts.
func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
if c.ddlState != model.ChangeFeedSyncDML {
if c.ddlState != model.ChangeFeedSyncDML && c.ddlState != model.ChangeFeedWaitToExecDDL {
return nil
}

// ProcessorInfos don't contains the whole set table id now.
if len(c.orphanTables) > 0 || len(c.waitingConfirmTables) > 0 {
if len(c.waitingConfirmTables) > 0 {
return nil
}

Expand All @@ -499,6 +501,15 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
}
}

for _, orphanTable := range c.orphanTables {
if minCheckpointTs > orphanTable.StartTs {
minCheckpointTs = orphanTable.StartTs
}
if minResolvedTs > orphanTable.StartTs {
minResolvedTs = orphanTable.StartTs
}
}

// if minResolvedTs is greater than ddlResolvedTs,
// it means that ddlJobHistory in memory is not intact,
// there are some ddl jobs which finishedTs is smaller than minResolvedTs we don't know.
Expand Down Expand Up @@ -537,17 +548,22 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {

if minCheckpointTs > c.status.CheckpointTs {
c.status.CheckpointTs = minCheckpointTs
err := c.sink.EmitCheckpointTs(ctx, minCheckpointTs)
if err != nil {
return errors.Trace(err)
// when the `c.ddlState` is `model.ChangeFeedWaitToExecDDL`,
// some DDL is waiting to executed, we can't ensure whether the DDL has been executed.
// so we can't emit checkpoint to sink
if c.ddlState != model.ChangeFeedWaitToExecDDL {
err := c.sink.EmitCheckpointTs(ctx, minCheckpointTs)
if err != nil {
return errors.Trace(err)
}
}
tsUpdated = true
}

if tsUpdated {
log.Debug("update changefeed", zap.String("id", c.id),
zap.Uint64("checkpoint ts", minCheckpointTs),
zap.Uint64("resolved ts", minResolvedTs))
zap.Uint64("checkpoint ts", c.status.CheckpointTs),
zap.Uint64("resolved ts", c.status.ResolvedTs))
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,18 @@ func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) erro
}

// GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, error) {
func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, int64, error) {
key := GetEtcdKeyJob(id)
resp, err := c.Client.Get(ctx, key)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
if resp.Count == 0 {
return nil, errors.Annotatef(model.ErrChangeFeedNotExists, "query status id %s", id)
return nil, 0, errors.Annotatef(model.ErrChangeFeedNotExists, "query status id %s", id)
}
info := &model.ChangeFeedStatus{}
err = info.Unmarshal(resp.Kvs[0].Value)
return info, errors.Trace(err)
return info, resp.Kvs[0].ModRevision, errors.Trace(err)
}

// GetCaptures returns kv revision and CaptureInfo list
Expand Down
55 changes: 46 additions & 9 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
}

// we find a new changefeed, init changefeed info here.
status, err := o.cfRWriter.GetChangeFeedStatus(ctx, changeFeedID)
status, _, err := o.cfRWriter.GetChangeFeedStatus(ctx, changeFeedID)
if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists {
return err
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
return errors.Trace(err)
}
case model.AdminResume:
cfStatus, err := o.etcdClient.GetChangeFeedStatus(ctx, job.CfID)
cfStatus, _, err := o.etcdClient.GetChangeFeedStatus(ctx, job.CfID)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -513,6 +513,14 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error {
if err := o.throne(ctx); err != nil {
return err
}

ctx1, cancel := context.WithCancel(ctx)
defer cancel()
changedFeeds := o.watchFeedChange(ctx1)

ticker := time.NewTicker(tickTime)
defer ticker.Stop()

loop:
for {
select {
Expand All @@ -521,14 +529,16 @@ loop:
break loop
case <-ctx.Done():
return ctx.Err()
case <-time.After(tickTime):
err := o.run(ctx)
if err != nil {
if errors.Cause(err) != context.Canceled {
log.Error("owner exited with error", zap.Error(err))
}
break loop
case <-changedFeeds:
case <-ticker.C:
}

err := o.run(ctx)
if err != nil {
if errors.Cause(err) != context.Canceled {
log.Error("owner exited with error", zap.Error(err))
}
break loop
}
}
if o.stepDown != nil {
Expand All @@ -540,6 +550,33 @@ loop:
return nil
}

func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
output := make(chan struct{}, 1)
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
wch := o.etcdClient.Client.Watch(ctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix())

for resp := range wch {
if resp.Err() != nil {
log.Error("position watcher restarted with error", zap.Error(resp.Err()))
break
}

// TODO: because the main loop has many serial steps, it is hard to do a partial update without change
// majority logical. For now just to wakeup the main loop ASAP to reduce latency, the efficiency of etcd
// operations should be resolved in future release.
output <- struct{}{}
}
}
}()
return output
}

func (o *Owner) run(ctx context.Context) error {
o.l.Lock()
defer o.l.Unlock()
Expand Down
Loading