Skip to content

Commit

Permalink
ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965) (#10983)
Browse files Browse the repository at this point in the history
close #10890
  • Loading branch information
ti-chi-bot authored May 13, 2024
1 parent a4962cd commit 3781b9a
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 48 deletions.
36 changes: 23 additions & 13 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,30 @@ func (m *ddlManager) tick(
break
}

if job != nil && job.BinlogInfo != nil {
log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
)
events, err := m.schema.BuildDDLEvents(ctx, job)
if err != nil {
return nil, nil, err
}
if job.BinlogInfo == nil {
continue
}

log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
)
events, err := m.schema.BuildDDLEvents(ctx, job)
if err != nil {
return nil, nil, err
}

for _, event := range events {
tableName := event.TableInfo.TableName
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
}

// Send DDL events to redo log.
if m.redoDDLManager.Enabled() {
for _, event := range events {
// TODO: find a better place to do this check
// check if the ddl event is belong to an ineligible table.
Expand Down
46 changes: 43 additions & 3 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package puller
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,6 +28,7 @@ import (
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/kv/sharedconn"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -119,8 +121,7 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model
if job != nil {
skip, err := p.handleJob(job)
if err != nil {
return cerror.WrapError(cerror.ErrHandleDDLFailed,
err, job.String(), job.Query, job.StartTS, job.StartTS)
return err
}
log.Info("handle ddl job",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down Expand Up @@ -487,7 +488,46 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
p.setResolvedTs(job.BinlogInfo.FinishedTS)
p.schemaVersion = job.BinlogInfo.SchemaVersion

return false, nil
return p.checkIneligibleTableDDL(snap, job)
}

// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL.
// 1. If it is not a table DDL, we shouldn't check it.
// 2. If the table after the DDL is ineligible:
// a. If the table is not exist before the DDL, we should ignore the DDL.
// b. If the table is ineligible before the DDL, we should ignore the DDL.
// c. If the table is eligible before the DDL, we should return an error.
func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) {
if filter.IsSchemaDDL(job.Type) {
return false, nil
}

ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID)
if !ineligible {
return false, nil
}

// If the table is not in the snapshot before the DDL,
// we should ignore the DDL.
_, exist := snapBefore.PhysicalTableByID(job.TableID)
if !exist {
return true, nil
}

// If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL.
// If so, we should return an error to inform the user that it is a
// dangerous operation and should be handled manually.
isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID)
if isBeforeineligible {
log.Warn("ignore the DDL event of ineligible table",
zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job))
return true, nil
}
return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+
"it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+
"pelase pause the changefeed and update the `force-replicate=true` "+
"in the changefeed configuration, "+
"then resume the changefeed.", job.Query))
}

func findDBByName(dbs []*timodel.DBInfo, name string) (*timodel.DBInfo, error) {
Expand Down
Loading

0 comments on commit 3781b9a

Please sign in to comment.